Skip to content

Commit e7341ec

Browse files
committedApr 1, 2016
Port to AsyncIO
1 parent 23aca90 commit e7341ec

File tree

5 files changed

+60
-30
lines changed

5 files changed

+60
-30
lines changed
 

‎aerospike_py/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.1.0'
1+
__version__ = '0.2.0'

‎aerospike_py/client.py

+19-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import socket
1+
import asyncio
22
import hashlib
33

4-
from aerospike_py.connection import SocketConnection
4+
from aerospike_py.connection import AsyncConnection
55
from aerospike_py.info import request_info_keys
66
from aerospike_py.result_code import ASMSGProtocolException
77
from aerospike_py.message import ASIOException
@@ -16,11 +16,11 @@ def hash_key(set='', key=''):
1616

1717

1818
class AerospikeClient:
19-
def __init__(self, sck):
20-
self.sck = sck
19+
def __init__(self, conn):
20+
self.conn = conn
2121

2222
def info(self, keys):
23-
return request_info_keys(self.sck, keys)[1]
23+
return request_info_keys(self.conn, keys)[1]
2424

2525
def _process_bucket(self, asmsg_ops):
2626
buckets = {}
@@ -29,10 +29,11 @@ def _process_bucket(self, asmsg_ops):
2929

3030
return buckets
3131

32+
@asyncio.coroutine
3233
def _submit_message(self, envelope, retry_count=3):
3334
while retry_count:
3435
try:
35-
outer, asmsg_hdr, asmsg_fields, asmsg_ops = aerospike_py.message.submit_message(self.sck, envelope)
36+
outer, asmsg_hdr, asmsg_fields, asmsg_ops = yield from aerospike_py.message.submit_message(self.conn, envelope)
3637
return self._process_bucket(asmsg_ops)
3738
except ASMSGProtocolException as e:
3839
if e.result_code not in (14,):
@@ -45,10 +46,11 @@ def _submit_message(self, envelope, retry_count=3):
4546

4647
return buckets
4748

49+
@asyncio.coroutine
4850
def _submit_batch(self, envelope, retry_count=3):
4951
while retry_count:
5052
try:
51-
messages = aerospike_py.message.submit_multi_message(self.sck, envelope)
53+
messages = yield from aerospike_py.message.submit_multi_message(self.conn, envelope)
5254
return [self._process_bucket(x[3]) for x in messages]
5355
except ASMSGProtocolException as e:
5456
if e.result_code not in (14,):
@@ -61,6 +63,7 @@ def _submit_batch(self, envelope, retry_count=3):
6163

6264
return buckets
6365

66+
@asyncio.coroutine
6467
def get(self, namespace, set='', key='', bins=[], record_ttl=0, retry_count=3):
6568
flags = aerospike_py.message.AS_INFO1_READ
6669
if not bins:
@@ -77,6 +80,7 @@ def get(self, namespace, set='', key='', bins=[], record_ttl=0, retry_count=3):
7780

7881
return self._submit_message(envelope, retry_count)
7982

83+
@asyncio.coroutine
8084
def mget(self, namespace, groups=[], bins={}, record_ttl=0, retry_count=3):
8185
flags = aerospike_py.message.AS_INFO1_READ # | aerospike_py.message.AS_INFO1_BATCH
8286
if not bins:
@@ -98,6 +102,7 @@ def mget(self, namespace, groups=[], bins={}, record_ttl=0, retry_count=3):
98102

99103
return self._submit_batch(envelope, retry_count)
100104

105+
@asyncio.coroutine
101106
def put(self, namespace, set='', key='', bins={}, create_only=False, bin_create_only=False, record_ttl=0, retry_count=3):
102107
flags = aerospike_py.message.AS_INFO2_WRITE
103108
if create_only:
@@ -118,6 +123,7 @@ def put(self, namespace, set='', key='', bins={}, create_only=False, bin_create_
118123

119124
return self._submit_message(envelope, retry_count)
120125

126+
@asyncio.coroutine
121127
def delete(self, namespace, set='', key='', record_ttl=0, retry_count=3):
122128
envelope = aerospike_py.message.pack_asmsg(0, aerospike_py.message.AS_INFO2_WRITE | aerospike_py.message.AS_INFO2_DELETE, 0, 0, record_ttl, 0,
123129
[
@@ -129,6 +135,7 @@ def delete(self, namespace, set='', key='', record_ttl=0, retry_count=3):
129135

130136
return self._submit_message(envelope, retry_count)
131137

138+
@asyncio.coroutine
132139
def incr(self, namespace, set='', key='', bin='', incr_by=0, record_ttl=0, retry_count=3):
133140
flags = aerospike_py.message.AS_INFO2_WRITE
134141

@@ -143,6 +150,7 @@ def incr(self, namespace, set='', key='', bin='', incr_by=0, record_ttl=0, retry
143150

144151
return self._submit_message(envelope, retry_count)
145152

153+
@asyncio.coroutine
146154
def _append_op(self, namespace, set='', key='', bin='', append_blob='', op=aerospike_py.message.AS_MSG_OP_APPEND, record_ttl=0, retry_count=3):
147155
flags = aerospike_py.message.AS_INFO2_WRITE
148156

@@ -158,16 +166,19 @@ def _append_op(self, namespace, set='', key='', bin='', append_blob='', op=aeros
158166

159167
return self._submit_message(envelope, retry_count)
160168

169+
@asyncio.coroutine
161170
def append(self, namespace, set='', key='', bin='', append_blob='', record_ttl=0):
162171
return self._append_op(namespace, set, key, bin, append_blob, aerospike_py.message.AS_MSG_OP_APPEND, record_ttl)
163172

173+
@asyncio.coroutine
164174
def prepend(self, namespace, set='', key='', bin='', append_blob='', record_ttl=0):
165175
return self._append_op(namespace, set, key, bin, append_blob, aerospike_py.message.AS_MSG_OP_PREPEND, record_ttl)
166176

177+
@asyncio.coroutine
167178
def touch(self, namespace, set, key, bin='', record_ttl=0):
168179
return self._append_op(namespace, set, key, bin, None, aerospike_py.message.AS_MSG_OP_TOUCH, record_ttl)
169180

170181

171182
def connect(host: str, port: int) -> AerospikeClient:
172-
return AerospikeClient(SocketConnection(socket.create_connection((host, port))))
183+
return AerospikeClient(AsyncConnection(host, port))
173184

‎aerospike_py/connection.py

+22-13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
import asyncio
2+
from logging import getLogger
3+
4+
5+
LOGGER = getLogger(__name__)
6+
7+
18
class Connection:
29
"""Connection classes simply provide an interface specification for abstracting I/O.
310
They can be used with Twisted, Eventlet, AsyncIO, etc. without problem.
@@ -9,19 +16,21 @@ def write(self, buf):
916
pass
1017

1118

12-
class SocketConnection(Connection):
13-
"""SocketConnections are Connection instances which assume a functional Sockets API."""
14-
def __init__(self, fd):
15-
self._fd = fd
19+
class AsyncConnection(Connection):
20+
"""A Connection subclass which uses AsyncIO."""
21+
def __init__(self, host: str, port: int):
22+
conn = asyncio.open_connection(host, port)
23+
loop = asyncio.get_event_loop()
24+
try:
25+
(self.reader, self.writer) = loop.run_until_complete(conn)
26+
except OSError:
27+
LOGGER.exception("Can't connect to Aerospike")
28+
self.reader = self.writer = None
1629

17-
def read(self, length):
18-
buf = bytearray(length)
19-
view = memoryview(buf)
20-
while length > 0:
21-
nbytes = self._fd.recv_into(view, length)
22-
view = view[nbytes:]
23-
length -= nbytes
24-
return buf
30+
def read(self, length: int):
31+
data = yield from self.reader.readexactly(length)
32+
return data
2533

2634
def write(self, buf):
27-
return self._fd.send(buf)
35+
self.writer.write(buf)
36+
yield from self.writer.drain()

‎aerospike_py/info.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
1+
import asyncio
2+
13
from aerospike_py.connection import Connection
24
from aerospike_py.message import pack_message, unpack_message, AerospikeOuterHeader
35

46

7+
@asyncio.coroutine
58
def request_info_keys(conn: Connection, commands: list) -> (AerospikeOuterHeader, dict):
69
payload = pack_message('\n'.join(commands).encode('UTF-8'), 1)
710
conn.write(payload)
811

9-
hdr_payload = conn.read(8)
12+
hdr_payload = yield from conn.read(8)
1013
header, _ = unpack_message(hdr_payload)
1114

12-
header, payload = unpack_message(hdr_payload + conn.read(header.sz))
15+
message = hdr_payload
16+
message += yield from conn.read(header.sz)
17+
header, payload = unpack_message(message)
1318
lines = payload.decode('UTF-8')
1419

1520
infokeys = {}

‎aerospike_py/message.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from collections import namedtuple
23
import struct
34

@@ -223,18 +224,20 @@ def unpack_asmsg(data: bytes) -> (AerospikeASMSGHeader, list, list):
223224
return asmsg_hdr, fields, ops, data[pos:]
224225

225226

227+
@asyncio.coroutine
226228
def submit_message(conn: Connection, data: bytes) -> (AerospikeOuterHeader, AerospikeASMSGHeader, list, list):
227229
ohdr = AerospikeOuterHeader(2, 3, len(data))
228230
buf = pack_outer_header(ohdr) + data
229-
conn.write(buf)
231+
yield from conn.write(buf)
230232

231-
hdr_payload = conn.read(8)
233+
hdr_payload = yield from conn.read(8)
232234
if not hdr_payload:
233235
raise ASIOException('read')
234236

235237
header, _ = unpack_message(hdr_payload)
236238

237-
data = hdr_payload + conn.read(header.sz)
239+
data = hdr_payload
240+
data += yield from conn.read(header.sz)
238241

239242
header, payload = unpack_message(data)
240243
asmsg_header, asmsg_fields, asmsg_ops, _ = unpack_asmsg(payload)
@@ -245,22 +248,24 @@ def submit_message(conn: Connection, data: bytes) -> (AerospikeOuterHeader, Aero
245248
return header, asmsg_header, asmsg_fields, asmsg_ops
246249

247250

251+
@asyncio.coroutine
248252
def submit_multi_message(conn: Connection, data: bytes) -> list:
249253
ohdr = AerospikeOuterHeader(2, 3, len(data))
250254
buf = pack_outer_header(ohdr) + data
251-
conn.write(buf)
255+
yield from conn.write(buf)
252256

253257
not_last = True
254258
messages = []
255259

256260
while not_last:
257-
hdr_payload = conn.read(8)
261+
hdr_payload = yield from conn.read(8)
258262
header, _ = unpack_message(hdr_payload)
259263

260264
if not hdr_payload:
261265
raise ASIOException('read')
262266

263-
data = hdr_payload + conn.read(header.sz)
267+
data = hdr_payload
268+
data += yield from conn.read(header.sz)
264269

265270
header, payload = unpack_message(data)
266271
while payload:

0 commit comments

Comments
 (0)
Please sign in to comment.