11import { Kafka } from 'kafkajs' ;
22import { env } from '../config/env.js' ;
33import { clickhouse , connectClickHouse } from '../lib/clickhouse.js' ;
4+ import crypto from 'node:crypto' ;
45
56const kafka = new Kafka ( {
67 clientId : 'logstream-worker' ,
@@ -10,44 +11,60 @@ const kafka = new Kafka({
1011const consumer = kafka . consumer ( { groupId : 'log-processing-group-v3' } ) ;
1112
1213const run = async ( ) => {
13- // connect to infrastructure
14+
1415 await connectClickHouse ( ) ;
1516 await consumer . connect ( ) ;
1617 console . log ( 'Worker connected to Kafka' ) ;
1718
1819 await consumer . subscribe ( { topic : 'app-logs' , fromBeginning : true } ) ;
1920
20- // process messages
21+ // swtiched to batch processing from single message processing for efficiency
2122 await consumer . run ( {
22- eachMessage : async ( { topic, partition, message } ) => {
23- const logString = message . value ?. toString ( ) ;
23+ eachBatchAutoResolve : true ,
24+ eachBatch : async ( { batch, resolveOffset, heartbeat } ) => {
25+ const messages = batch . messages ;
2426
25- if ( ! logString ) return ;
27+ if ( messages . length === 0 ) return ;
2628
29+ console . log ( `Processing batch of ${ messages . length } messages...` ) ;
2730
28- try {
29- const logData = JSON . parse ( logString ) ;
31+ // Transform Kafka Messages to ClickHouse Rows
32+ const rows = messages . map ( ( message ) => {
33+ const logString = message . value ?. toString ( ) ;
34+ if ( ! logString ) return null ;
35+
36+ try {
37+ const logData = JSON . parse ( logString ) ;
38+ return {
39+ id : logData . id || crypto . randomUUID ( ) ,
40+ service : logData . service ,
41+ level : logData . level ,
42+ message : logData . message ,
43+ meta : JSON . stringify ( logData . meta || { } ) ,
44+ timestamp : new Date ( logData . timestamp ) . getTime ( ) ,
45+ } ;
46+ } catch ( err ) {
47+ return null ;
48+ }
49+ } ) . filter ( Boolean ) ;
3050
31- // insert into clickHouse
51+ if ( rows . length === 0 ) return ;
52+
53+ try {
54+ // bulk Insert into ClickHouse
3255 await clickhouse . insert ( {
3356 table : 'app_logs' ,
34- values : [
35- {
36- id : logData . id || crypto . randomUUID ( ) , // Fallback ID
37- service : logData . service ,
38- level : logData . level ,
39- message : logData . message ,
40- meta : JSON . stringify ( logData . meta || { } ) ,
41- timestamp : new Date ( logData . timestamp ) . getTime ( ) ,
42- } ,
43- ] ,
57+ values : rows ,
4458 format : 'JSONEachRow' ,
4559 } ) ;
4660
47- console . log ( `Inserted log from ${ logData . service } ` ) ;
61+ // mark success
62+ await heartbeat ( ) ;
63+
64+ console . log ( `committed ${ rows . length } logs to ClickHouse` ) ;
4865
4966 } catch ( err ) {
50- console . error ( 'Error processing message :' , err ) ;
67+ console . error ( 'Batch Insert Failed :' , err ) ;
5168 }
5269 } ,
5370 } ) ;
0 commit comments