Skip to content

Commit ca786bb

Browse files
committed
Asyncio Producer and Consumer implementation
with same API as the sync ones
1 parent 62b1dd8 commit ca786bb

File tree

5 files changed

+459
-150
lines changed

5 files changed

+459
-150
lines changed

examples/asyncio_example.py

100755100644
Lines changed: 150 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#!/usr/bin/env python
2-
3-
# flake8: noqa
2+
#
43
# Copyright 2019 Confluent Inc.
54
#
65
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,168 +14,169 @@
1514
# See the License for the specific language governing permissions and
1615
# limitations under the License.
1716

18-
19-
# Companion code to the blog post "Integrating Kafka With Python
20-
# Asyncio Web Applications"
21-
# https://www.confluent.io/blog/kafka-python-asyncio-integration/
22-
#
23-
# Example Siege [https://github.com/JoeDog/siege] test:
24-
# $ siege -c 400 -r 200 'http://localhost:8000/items1 POST {"name":"testuser"}'
25-
26-
2717
import asyncio
28-
import confluent_kafka
29-
from confluent_kafka import KafkaException
30-
from fastapi import FastAPI, HTTPException
31-
from pydantic import BaseModel
32-
from time import time
33-
from threading import Thread
34-
import uvicorn
35-
36-
37-
class AIOProducer:
38-
def __init__(self, configs, loop=None):
39-
self._loop = loop or asyncio.get_event_loop()
40-
self._producer = confluent_kafka.Producer(configs)
41-
self._cancelled = False
42-
self._poll_thread = Thread(target=self._poll_loop)
43-
self._poll_thread.start()
44-
45-
def _poll_loop(self):
46-
while not self._cancelled:
47-
self._producer.poll(0.1)
48-
49-
def close(self):
50-
self._cancelled = True
51-
self._poll_thread.join()
52-
53-
def produce(self, topic, value):
54-
"""
55-
An awaitable produce method.
56-
"""
57-
result = self._loop.create_future()
58-
59-
def ack(err, msg):
60-
if err:
61-
self._loop.call_soon_threadsafe(result.set_exception, KafkaException(err))
62-
else:
63-
self._loop.call_soon_threadsafe(result.set_result, msg)
64-
self._producer.produce(topic, value, on_delivery=ack)
65-
return result
66-
67-
def produce2(self, topic, value, on_delivery):
68-
"""
69-
A produce method in which delivery notifications are made available
70-
via both the returned future and on_delivery callback (if specified).
71-
"""
72-
result = self._loop.create_future()
73-
74-
def ack(err, msg):
75-
if err:
76-
self._loop.call_soon_threadsafe(
77-
result.set_exception, KafkaException(err))
78-
else:
79-
self._loop.call_soon_threadsafe(
80-
result.set_result, msg)
81-
if on_delivery:
82-
self._loop.call_soon_threadsafe(
83-
on_delivery, err, msg)
84-
self._producer.produce(topic, value, on_delivery=ack)
85-
return result
86-
87-
88-
class Producer:
89-
def __init__(self, configs):
90-
self._producer = confluent_kafka.Producer(configs)
91-
self._cancelled = False
92-
self._poll_thread = Thread(target=self._poll_loop)
93-
self._poll_thread.start()
18+
import sys
19+
from confluent_kafka.aio import AIOProducer
20+
from confluent_kafka.aio import AIOConsumer
21+
import random
22+
import logging
23+
import signal
9424

95-
def _poll_loop(self):
96-
while not self._cancelled:
97-
self._producer.poll(0.1)
25+
logging.basicConfig()
26+
logger = logging.getLogger(__name__)
27+
logger.setLevel(logging.DEBUG)
28+
running = True
9829

99-
def close(self):
100-
self._cancelled = True
101-
self._poll_thread.join()
10230

103-
def produce(self, topic, value, on_delivery=None):
104-
self._producer.produce(topic, value, on_delivery=on_delivery)
31+
async def error_cb(err):
32+
logger.error(f'Kafka error: {err}')
10533

10634

107-
config = {"bootstrap.servers": "localhost:9092"}
35+
async def throttle_cb(event):
36+
logger.warning(f'Kafka throttle event: {event}')
10837

109-
app = FastAPI()
11038

39+
async def stats_cb(stats_json_str):
40+
logger.info(f'Kafka stats: {stats_json_str}')
11141

112-
class Item(BaseModel):
113-
name: str
11442

43+
def configure_common(conf):
44+
bootstrap_servers = sys.argv[1]
45+
conf.update({
46+
'bootstrap.servers': bootstrap_servers,
47+
'logger': logger,
48+
'debug': 'conf',
49+
'error_cb': error_cb,
50+
'throttle_cb': throttle_cb,
51+
'stats_cb': stats_cb,
52+
'statistics.interval.ms': 5000,
53+
})
11554

116-
aio_producer = None
117-
producer = None
55+
return conf
11856

11957

120-
@app.on_event("startup")
121-
async def startup_event():
122-
global producer, aio_producer
123-
aio_producer = AIOProducer(config)
124-
producer = Producer(config)
58+
async def run_producer():
59+
topic = sys.argv[2]
60+
producer = AIOProducer(configure_common(
61+
{
62+
'transactional.id': 'producer1'
63+
}), max_workers=5)
12564

126-
127-
@app.on_event("shutdown")
128-
def shutdown_event():
129-
aio_producer.close()
130-
producer.close()
131-
132-
133-
@app.post("/items1")
134-
async def create_item1(item: Item):
65+
await producer.init_transactions()
66+
# TODO: handle exceptions with transactional API
67+
transaction_active = False
13568
try:
136-
result = await aio_producer.produce("items", item.name)
137-
return {"timestamp": result.timestamp()}
138-
except KafkaException as ex:
139-
raise HTTPException(status_code=500, detail=ex.args[0].str())
140-
141-
cnt = 0
69+
while running:
70+
await producer.begin_transaction()
71+
transaction_active = True
72+
73+
produce_futures = [asyncio.create_task(
74+
producer.produce(topic=topic,
75+
key=f'testkey{i}',
76+
value=f'testvalue{i}'))
77+
for i in range(100)]
78+
results = await asyncio.gather(*produce_futures)
79+
80+
for msg in results:
81+
logger.info(
82+
'Produced to: {} [{}] @ {}'.format(msg.topic(),
83+
msg.partition(),
84+
msg.offset()))
85+
86+
await producer.commit_transaction()
87+
transaction_active = False
88+
await asyncio.sleep(1)
89+
except Exception as e:
90+
logger.error(e)
91+
finally:
92+
if transaction_active:
93+
await producer.abort_transaction()
94+
await producer.stop()
95+
logger.info('Closed producer')
96+
97+
98+
async def run_consumer():
99+
topic = sys.argv[2]
100+
group_id = f'{topic}_{random.randint(1, 1000)}'
101+
consumer = AIOConsumer(configure_common(
102+
{
103+
'group.id': group_id,
104+
'auto.offset.reset': 'latest',
105+
'enable.auto.commit': 'false',
106+
'enable.auto.offset.store': 'false',
107+
'partition.assignment.strategy': 'cooperative-sticky',
108+
}))
109+
110+
async def on_assign(consumer, partitions):
111+
# Calling incremental_assign is necessary pause the assigned partitions
112+
# otherwise it'll be done by the consumer after callback termination.
113+
await consumer.incremental_assign(partitions)
114+
await consumer.pause(partitions)
115+
logger.debug(f'on_assign {partitions}')
116+
# Resume the partitions as it's just a pause example
117+
await consumer.resume(partitions)
118+
119+
async def on_revoke(consumer, partitions):
120+
logger.debug(f'before on_revoke {partitions}', )
121+
try:
122+
await consumer.commit()
123+
except Exception as e:
124+
logger.info(f'Error during commit: {e}')
125+
logger.debug(f'after on_revoke {partitions}')
126+
127+
async def on_lost(consumer, partitions):
128+
logger.debug(f'on_lost {partitions}')
142129

143-
144-
def ack(err, msg):
145-
global cnt
146-
cnt = cnt + 1
147-
148-
149-
@app.post("/items2")
150-
async def create_item2(item: Item):
151-
try:
152-
aio_producer.produce2("items", item.name, on_delivery=ack)
153-
return {"timestamp": time()}
154-
except KafkaException as ex:
155-
raise HTTPException(status_code=500, detail=ex.args[0].str())
156-
157-
158-
@app.post("/items3")
159-
async def create_item3(item: Item):
160130
try:
161-
producer.produce("items", item.name, on_delivery=ack)
162-
return {"timestamp": time()}
163-
except KafkaException as ex:
164-
raise HTTPException(status_code=500, detail=ex.args[0].str())
165-
166-
167-
@app.post("/items4")
168-
async def create_item4(item: Item):
169-
try:
170-
producer.produce("items", item.name)
171-
return {"timestamp": time()}
172-
except KafkaException as ex:
173-
raise HTTPException(status_code=500, detail=ex.args[0].str())
174-
175-
176-
@app.post("/items5")
177-
async def create_item5(item: Item):
178-
return {"timestamp": time()}
179-
180-
181-
if __name__ == '__main__':
182-
uvicorn.run(app, host='127.0.0.1', port=8000)
131+
await consumer.subscribe([topic],
132+
on_assign=on_assign,
133+
on_revoke=on_revoke,
134+
# Remember to set a on_lost callback
135+
# if you're committing on revocation
136+
# as lost partitions cannot be committed
137+
on_lost=on_lost)
138+
i = 0
139+
while running:
140+
message = await consumer.poll(1.0)
141+
if message is None:
142+
continue
143+
144+
if i % 100 == 0:
145+
position = await consumer.position(await consumer.assignment())
146+
logger.info(f'Current position: {position}')
147+
await consumer.commit()
148+
logger.info('Stored offsets were committed')
149+
150+
err = message.error()
151+
if err:
152+
logger.error(f'Error: {err}')
153+
else:
154+
logger.info(f'Consumed: {message.value()}')
155+
await consumer.store_offsets(message=message)
156+
i += 1
157+
finally:
158+
await consumer.unsubscribe()
159+
await consumer.close()
160+
logger.info('Closed consumer')
161+
162+
163+
def signal_handler(*_):
164+
global running
165+
logger.info('Signal received, shutting down...')
166+
running = False
167+
168+
169+
async def main():
170+
signal.signal(signal.SIGINT, signal_handler)
171+
signal.signal(signal.SIGTERM, signal_handler)
172+
173+
producer_task = asyncio.create_task(run_producer())
174+
consumer_task = asyncio.create_task(run_consumer())
175+
await asyncio.gather(producer_task, consumer_task)
176+
177+
try:
178+
asyncio.run(main())
179+
except asyncio.exceptions.CancelledError as e:
180+
logger.warning(f'Asyncio task was cancelled: {e}')
181+
pass
182+
logger.info('End of example')

0 commit comments

Comments
 (0)