Skip to content

Asyncio Producer and Consumer implementation #1989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 143 additions & 143 deletions examples/asyncio_example.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python

# flake8: noqa
#
# Copyright 2019 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,168 +14,169 @@
# See the License for the specific language governing permissions and
# limitations under the License.


# Companion code to the blog post "Integrating Kafka With Python
# Asyncio Web Applications"
# https://www.confluent.io/blog/kafka-python-asyncio-integration/
#
# Example Siege [https://github.com/JoeDog/siege] test:
# $ siege -c 400 -r 200 'http://localhost:8000/items1 POST {"name":"testuser"}'


import asyncio
import confluent_kafka
from confluent_kafka import KafkaException
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from time import time
from threading import Thread
import uvicorn


class AIOProducer:
def __init__(self, configs, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._producer = confluent_kafka.Producer(configs)
self._cancelled = False
self._poll_thread = Thread(target=self._poll_loop)
self._poll_thread.start()

def _poll_loop(self):
while not self._cancelled:
self._producer.poll(0.1)

def close(self):
self._cancelled = True
self._poll_thread.join()

def produce(self, topic, value):
"""
An awaitable produce method.
"""
result = self._loop.create_future()

def ack(err, msg):
if err:
self._loop.call_soon_threadsafe(result.set_exception, KafkaException(err))
else:
self._loop.call_soon_threadsafe(result.set_result, msg)
self._producer.produce(topic, value, on_delivery=ack)
return result

def produce2(self, topic, value, on_delivery):
"""
A produce method in which delivery notifications are made available
via both the returned future and on_delivery callback (if specified).
"""
result = self._loop.create_future()

def ack(err, msg):
if err:
self._loop.call_soon_threadsafe(
result.set_exception, KafkaException(err))
else:
self._loop.call_soon_threadsafe(
result.set_result, msg)
if on_delivery:
self._loop.call_soon_threadsafe(
on_delivery, err, msg)
self._producer.produce(topic, value, on_delivery=ack)
return result


class Producer:
def __init__(self, configs):
self._producer = confluent_kafka.Producer(configs)
self._cancelled = False
self._poll_thread = Thread(target=self._poll_loop)
self._poll_thread.start()
import sys
from confluent_kafka.aio import AIOProducer
from confluent_kafka.aio import AIOConsumer
import random
import logging
import signal

def _poll_loop(self):
while not self._cancelled:
self._producer.poll(0.1)
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
running = True

def close(self):
self._cancelled = True
self._poll_thread.join()

def produce(self, topic, value, on_delivery=None):
self._producer.produce(topic, value, on_delivery=on_delivery)
async def error_cb(err):
logger.error(f'Kafka error: {err}')


config = {"bootstrap.servers": "localhost:9092"}
async def throttle_cb(event):
logger.warning(f'Kafka throttle event: {event}')

app = FastAPI()

async def stats_cb(stats_json_str):
logger.info(f'Kafka stats: {stats_json_str}')

class Item(BaseModel):
name: str

def configure_common(conf):
bootstrap_servers = sys.argv[1]
conf.update({
'bootstrap.servers': bootstrap_servers,
'logger': logger,
'debug': 'conf',
'error_cb': error_cb,
'throttle_cb': throttle_cb,
'stats_cb': stats_cb,
'statistics.interval.ms': 5000,
})

aio_producer = None
producer = None
return conf


@app.on_event("startup")
async def startup_event():
global producer, aio_producer
aio_producer = AIOProducer(config)
producer = Producer(config)
async def run_producer():
topic = sys.argv[2]
producer = AIOProducer(configure_common(
{
'transactional.id': 'producer1'
}), max_workers=5)


@app.on_event("shutdown")
def shutdown_event():
aio_producer.close()
producer.close()


@app.post("/items1")
async def create_item1(item: Item):
await producer.init_transactions()
# TODO: handle exceptions with transactional API
transaction_active = False
try:
result = await aio_producer.produce("items", item.name)
return {"timestamp": result.timestamp()}
except KafkaException as ex:
raise HTTPException(status_code=500, detail=ex.args[0].str())

cnt = 0


def ack(err, msg):
global cnt
cnt = cnt + 1
while running:
await producer.begin_transaction()
transaction_active = True

produce_futures = [asyncio.create_task(
producer.produce(topic=topic,
key=f'testkey{i}',
value=f'testvalue{i}'))
for i in range(100)]
results = await asyncio.gather(*produce_futures)

for msg in results:
logger.info(
'Produced to: {} [{}] @ {}'.format(msg.topic(),
msg.partition(),
msg.offset()))

await producer.commit_transaction()
transaction_active = False
await asyncio.sleep(1)
except Exception as e:
logger.error(e)
finally:
if transaction_active:
await producer.abort_transaction()
await producer.stop()
logger.info('Closed producer')


async def run_consumer():
topic = sys.argv[2]
group_id = f'{topic}_{random.randint(1, 1000)}'
consumer = AIOConsumer(configure_common(
{
'group.id': group_id,
'auto.offset.reset': 'latest',
'enable.auto.commit': 'false',
'enable.auto.offset.store': 'false',
'partition.assignment.strategy': 'cooperative-sticky',
}))

async def on_assign(consumer, partitions):
# Calling incremental_assign is necessary to pause the assigned partitions
# otherwise it'll be done by the consumer after callback termination.
await consumer.incremental_assign(partitions)
await consumer.pause(partitions)
logger.debug(f'on_assign {partitions}')
# Resume the partitions as it's just a pause example
await consumer.resume(partitions)

async def on_revoke(consumer, partitions):
logger.debug(f'before on_revoke {partitions}', )
try:
await consumer.commit()
except Exception as e:
logger.info(f'Error during commit: {e}')
logger.debug(f'after on_revoke {partitions}')

async def on_lost(consumer, partitions):
logger.debug(f'on_lost {partitions}')


@app.post("/items2")
async def create_item2(item: Item):
try:
aio_producer.produce2("items", item.name, on_delivery=ack)
return {"timestamp": time()}
except KafkaException as ex:
raise HTTPException(status_code=500, detail=ex.args[0].str())

await consumer.subscribe([topic],
on_assign=on_assign,
on_revoke=on_revoke,
# Remember to set a on_lost callback
# if you're committing on revocation
# as lost partitions cannot be committed
on_lost=on_lost)
i = 0
while running:
message = await consumer.poll(1.0)
if message is None:
continue

if i % 100 == 0:
position = await consumer.position(await consumer.assignment())
logger.info(f'Current position: {position}')
await consumer.commit()
logger.info('Stored offsets were committed')

err = message.error()
if err:
logger.error(f'Error: {err}')
else:
logger.info(f'Consumed: {message.value()}')
await consumer.store_offsets(message=message)
i += 1
finally:
await consumer.unsubscribe()
await consumer.close()
logger.info('Closed consumer')

@app.post("/items3")
async def create_item3(item: Item):
try:
producer.produce("items", item.name, on_delivery=ack)
return {"timestamp": time()}
except KafkaException as ex:
raise HTTPException(status_code=500, detail=ex.args[0].str())

def signal_handler(*_):
global running
logger.info('Signal received, shutting down...')
running = False

@app.post("/items4")
async def create_item4(item: Item):
try:
producer.produce("items", item.name)
return {"timestamp": time()}
except KafkaException as ex:
raise HTTPException(status_code=500, detail=ex.args[0].str())

async def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

@app.post("/items5")
async def create_item5(item: Item):
return {"timestamp": time()}
producer_task = asyncio.create_task(run_producer())
consumer_task = asyncio.create_task(run_consumer())
await asyncio.gather(producer_task, consumer_task)

try:
asyncio.run(main())
except asyncio.exceptions.CancelledError as e:
logger.warning(f'Asyncio task was cancelled: {e}')

if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=8000)
logger.info('End of example')
Loading