22import logging
33import threading
44import time
5- from collections import defaultdict
65from typing import TYPE_CHECKING , Any , Literal , Mapping , Optional , Union
76
87import httpx
98import orjson
9+ from confluent_kafka import TopicPartition
1010
1111from quixstreams .kafka import ConnectionConfig , Consumer
12+ from quixstreams .kafka .exceptions import KafkaConsumerException
1213from quixstreams .models import HeadersMapping , Topic
1314from quixstreams .platforms .quix .env import QUIX_ENVIRONMENT
1415
@@ -119,19 +120,17 @@ def __init__(
119120 self ._version_data , maxsize = cache_size
120121 )
121122
122- self ._consumer = Consumer (
123+ self ._config_consumer = Consumer (
123124 broker_address = broker_address ,
124125 consumer_group = consumer_group ,
125126 auto_offset_reset = "earliest" ,
126127 auto_commit_enable = False ,
127128 extra_config = consumer_extra_config ,
128129 )
129130
130- self ._start ()
131+ self ._start_consumer_thread ()
131132
132- self ._fields_by_type : dict [int , dict [str , dict [str , BaseField ]]] = defaultdict (
133- dict
134- )
133+ self ._fields_by_type : dict [int , dict [str , dict [str , BaseField ]]] = {}
135134
136135 def json_field (
137136 self ,
@@ -203,7 +202,7 @@ def _fetch_version_content(self, version: ConfigurationVersion) -> Optional[byte
203202 version .failed ()
204203 return None
205204
206- def _start (self ) -> None :
205+ def _start_consumer_thread (self ) -> None :
207206 """
208207 Start the enrichment process in a background thread and wait for initialization to complete.
209208 """
@@ -214,26 +213,31 @@ def _start(self) -> None:
214213
215214 def _consumer_thread (self ) -> None :
216215 """
217- Background thread for consuming configuration events from Kafka and updating internal state.
216+ Background thread for consuming configuration events from Kafka
217+ and updating internal state.
218218 """
219- assigned = False
220-
221- def on_assign (consumer : Consumer , partitions : list [tuple [str , int ]]) -> None :
222- """
223- Callback for partition assignment.
224- """
225- nonlocal assigned
226- assigned = True
227-
228219 try :
229- self ._consumer .subscribe (topics = [self ._topic .name ], on_assign = on_assign )
220+ # Assign all available partitions of the config updates topic
221+ # bypassing the consumer group protocol.
222+ tps = [
223+ TopicPartition (topic = self ._topic .name , partition = i )
224+ for i in range (self ._topic .broker_config .num_partitions or 0 )
225+ ]
226+ self ._config_consumer .assign (tps )
230227
231228 while True :
232- message = self ._consumer .poll (timeout = self ._consumer_poll_timeout )
229+ # Check if the consumer processed all the available config messages
230+ # and set the "started" event.
231+ if not self ._started .is_set () and self ._configs_ready ():
232+ self ._started .set ()
233+
234+ message = self ._config_consumer .poll (
235+ timeout = self ._consumer_poll_timeout
236+ )
233237 if message is None :
234- if assigned and not self ._started .is_set ():
235- self ._started .set ()
236238 continue
239+ elif message .error ():
240+ raise KafkaConsumerException (error = message .error ())
237241
238242 value = message .value ()
239243 if value is None :
@@ -267,7 +271,9 @@ def _process_config_event(self, event: Event) -> None:
267271
268272 :raises: RuntimeError: If the event type is unknown.
269273 """
270- logger .info (f"Processing event: { event ['event' ]} for ID: { event ['id' ]} " )
274+ logger .info (
275+ f'Processing update for configuration ID "{ event ["id" ]} " ({ event ["event" ]} )'
276+ )
271277 if event ["event" ] in {"created" , "updated" }:
272278 if event ["id" ] not in self ._configurations :
273279 logger .debug (f"Creating new configuration for ID: { event ['id' ]} " )
@@ -329,28 +335,35 @@ def _find_version(
329335
330336 :returns: The valid configuration version, or None if not found.
331337 """
332- logger .debug (
333- f"Fetching data for type: { type } , on: { on } , timestamp: { timestamp } "
334- )
338+
335339 configuration = self ._configurations .get (self ._config_id (type , on ))
336- if not configuration :
340+ if configuration is None :
337341 logger .debug (
338- f"No configuration found for type: { type } , on: { on } . Trying wildcard."
342+ "No configuration found for type: %s, on: %s. Trying wildcard." ,
343+ type ,
344+ on ,
339345 )
340346 configuration = self ._configurations .get (self ._config_id (type , "*" ))
341- if not configuration :
342- logger .debug (f "No configuration found for type: { type } , on: *" )
347+ if configuration is None :
348+ logger .debug ("No configuration found for type: %s , on: *" , type )
343349 return None
344350
345351 version = configuration .find_valid_version (timestamp )
346352 if version is None :
347353 logger .debug (
348- f"No valid version found for type: { type } , on: { on } , timestamp: { timestamp } "
354+ "No valid configuration version found for type: %s, on: %s, timestamp: %s" ,
355+ type ,
356+ on ,
357+ timestamp ,
349358 )
350359 return None
351360
352361 logger .debug (
353- f"Found valid version '{ version .version } ' for type: { type } , on: { on } , timestamp: { timestamp } "
362+ "Found valid configuration version '%s' for type: %s, on: %s, timestamp: %s" ,
363+ version .version ,
364+ type ,
365+ on ,
366+ timestamp ,
354367 )
355368 return version
356369
@@ -416,18 +429,42 @@ def join(
416429 start = time .time ()
417430 logger .debug (f"Joining message with key: { on } , timestamp: { timestamp } " )
418431
419- fields_by_type = self ._fields_by_type .get (id (fields ))
432+ fields_ids = id (fields )
433+ fields_by_type = self ._fields_by_type .get (fields_ids )
434+
420435 if fields_by_type is None :
421- fields_by_type = defaultdict ( dict )
436+ fields_by_type = {}
422437 for key , field in fields .items ():
423- fields_by_type [field .type ][key ] = field
424- self ._fields_by_type [id (fields )] = fields_by_type
438+ fields_by_type .setdefault (field .type , {})[key ] = field
439+ self ._fields_by_type [fields_ids ] = fields_by_type
440+
441+ for type_ , fields in fields_by_type .items ():
442+ version = self ._find_version (type_ , on , timestamp )
425443
426- for type , fields in fields_by_type .items ():
427- version = self ._find_version (type , on , timestamp )
428444 if version is not None and version .retry_at < start :
429445 self ._version_data_cached .remove (version , fields )
430446
431447 value .update (self ._version_data_cached (version , fields ))
432448
433449 logger .debug ("Join took %.2f ms" , (time .time () - start ) * 1000 )
450+
451+ def _configs_ready (self ) -> bool :
452+ """
453+ Return True if the configs are loaded from the topic until the available HWM.
454+ If there are outstanding messages in the config topic or the assignment is not
455+ available yet, return False.
456+ """
457+
458+ if not self ._config_consumer .assignment ():
459+ return False
460+
461+ positions = self ._config_consumer .position (self ._config_consumer .assignment ())
462+ for position in positions :
463+ # Check if the consumer reached the end of the configuration topic
464+ # for each assigned partition.
465+ _ , hwm = self ._config_consumer .get_watermark_offsets (
466+ partition = position , cached = True
467+ )
468+ if hwm < 0 or (hwm > 0 and position .offset < hwm ):
469+ return False
470+ return True
0 commit comments