diff --git a/README.md b/README.md index 2115d06..b406200 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,30 @@ Soon, we will be introducing deep correlation and insights with producer and con ![kafka-otel-signoz](assets/kafka-otel-signoz.png) -**Note:** -1) This guide is a reference for a production-grade kafka monitoring and instrumentation, this repo is intended to familiarize you with the involved complexities. -2) All the tools used are open source and are licensed under Apache and MIT license. +### Quick Start using docker +#### start kafka +```bash +# from docker directory, cd docker +docker compose -f kafka.yaml up +``` +#### collector sending data to signoz +```bash +# from docker directory, cd docker +docker compose -f collector.yaml up +``` + +#### metrics collector +```bash +# from collector directory, cd collector +./otelcol-contrib --config collector-contrib-config.yaml +``` + +#### start producer and consumers +```bash +# from docker directory, cd docker +docker compose -f producer-consumer.yaml up +``` + --- ### Steps to follow (ready-to-use guide) We will follow the following steps: @@ -139,7 +160,7 @@ The last section contains some Troubleshooting tips!! -Dotel.instrumentation.kafka.producer-propagation.enabled=true \ -Dotel.instrumentation.kafka.experimental-span-attributes=true \ -Dotel.instrumentation.kafka.metric-reporter.enabled=true \ - -jar ${PWD}/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar + -jar ${PWD}/kafka-app-otel/kafka-consumer/target/kafka-consumer.jar ``` --- ### Step 4: SigNoz setup @@ -166,10 +187,11 @@ e.g. if you're running the binary on your host machine, place it the root of the ```bash ./otelcol-contrib --config ${PWD}/collector/collector-contrib-config.yaml ``` ---- -### Troubleshooting - --- ### Contributing If you have some suggestions or want to improve the experience of the repo feel free to create an issue or open a pull request. + +**Note:** +1) This guide is a reference for a production-grade kafka monitoring and instrumentation, this repo is intended to familiarize you with the involved complexities. +2) All the tools used are open source and are licensed under Apache and MIT license. \ No newline at end of file diff --git a/collector/collector-contrib-config.yaml b/collector/collector-contrib-config.yaml index b618fe1..42a65ae 100644 --- a/collector/collector-contrib-config.yaml +++ b/collector/collector-contrib-config.yaml @@ -1,4 +1,12 @@ receivers: + otlp: + protocols: + grpc: + # OTeL receiver endpoint (grpc) + endpoint: 127.0.0.1:4317 + http: + # OTeL receiver endpoint (http) + endpoint: 127.0.0.1:4318 kafkametrics: brokers: - localhost:9092 @@ -37,7 +45,7 @@ receivers: exporters: otlp: - endpoint: localhost:4317 + endpoint: localhost:51375 tls: insecure: true debug: @@ -46,5 +54,11 @@ exporters: service: pipelines: metrics: - receivers: [kafkametrics, jmx/1, jmx/2, jmx/3] + receivers: [kafkametrics, jmx/1, jmx/2, jmx/3, otlp] + exporters: [otlp] + traces: + receivers: [otlp] + exporters: [otlp, debug] + logs: + receivers: [otlp] exporters: [otlp] diff --git a/docker/consumer/Dockerfile b/docker/consumer/Dockerfile index 633bfde..2a70885 100644 --- a/docker/consumer/Dockerfile +++ b/docker/consumer/Dockerfile @@ -1,5 +1,5 @@ # Use an OpenJDK base image -FROM openjdk:22-jdk-slim +FROM openjdk:22-slim-bullseye # Create a directory for the application RUN mkdir -p /opt diff --git a/docker/consumer/kafka-consumer.jar b/docker/consumer/kafka-consumer.jar index 181f9f9..9054d08 100644 Binary files a/docker/consumer/kafka-consumer.jar and b/docker/consumer/kafka-consumer.jar differ diff --git a/docker/producer-consumer.yaml b/docker/producer-consumer.yaml index a159680..d040da8 100644 --- a/docker/producer-consumer.yaml +++ b/docker/producer-consumer.yaml @@ -11,9 +11,9 @@ services: OTEL_TRACES_EXPORTER: "otlp" OTEL_METRICS_EXPORTER: "otlp" OTEL_LOGS_EXPORTER: "otlp" - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" kafka-producer2: image: shivanshu1333/kafka-producer:latest @@ -27,9 +27,9 @@ services: OTEL_TRACES_EXPORTER: "otlp" OTEL_METRICS_EXPORTER: "otlp" OTEL_LOGS_EXPORTER: "otlp" - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" kafka-producer3: image: shivanshu1333/kafka-producer:latest @@ -42,9 +42,9 @@ services: OTEL_TRACES_EXPORTER: "otlp" OTEL_METRICS_EXPORTER: "otlp" OTEL_LOGS_EXPORTER: "otlp" - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" kafka-producer4: image: shivanshu1333/kafka-producer:latest @@ -57,9 +57,9 @@ services: OTEL_TRACES_EXPORTER: "otlp" OTEL_METRICS_EXPORTER: "otlp" OTEL_LOGS_EXPORTER: "otlp" - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" kafka-producer5: image: shivanshu1333/kafka-producer:latest @@ -72,9 +72,9 @@ services: OTEL_TRACES_EXPORTER: "otlp" OTEL_METRICS_EXPORTER: "otlp" OTEL_LOGS_EXPORTER: "otlp" - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" kafka-producer6: image: shivanshu1333/kafka-producer:latest @@ -87,96 +87,96 @@ services: OTEL_TRACES_EXPORTER: "otlp" OTEL_METRICS_EXPORTER: "otlp" OTEL_LOGS_EXPORTER: "otlp" - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" -# kafka-consumer1: -# image: shivanshu1333/kafka-consumer:latest -# container_name: kafka-consumer1 -# environment: -# BOOTSTRAP_SERVERS: "broker1:9092,broker2:19093,broker3:19094" -# CONSUMER_GROUP: "cg1" -# TOPIC: "topic1" -# OTEL_SERVICE_NAME: "consumer-svc" -# OTEL_TRACES_EXPORTER: "otlp" -# OTEL_METRICS_EXPORTER: "none" -# OTEL_LOGS_EXPORTER: "otlp" -# OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" -# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" -# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" -# -# kafka-consumer2: -# image: shivanshu1333/kafka-consumer:latest -# container_name: kafka-consumer2 -# environment: -# BOOTSTRAP_SERVERS: "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" -# CONSUMER_GROUP: "cg1" -# TOPIC: "topic1" -# OTEL_SERVICE_NAME: "consumer-svc1" -# OTEL_TRACES_EXPORTER: "otlp" -# OTEL_METRICS_EXPORTER: "otlp" -# OTEL_LOGS_EXPORTER: "otlp" -# OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" -# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" -# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" -## -# kafka-consumer3: -# image: shivanshu1333/kafka-consumer:latest -# container_name: kafka-consumer3 -# environment: -# BOOTSTRAP_SERVERS: "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" -# CONSUMER_GROUP: "cg2" -# TOPIC: "topic2" -# OTEL_SERVICE_NAME: "consumer-svc2" -# OTEL_TRACES_EXPORTER: "otlp" -# OTEL_METRICS_EXPORTER: "otlp" -# OTEL_LOGS_EXPORTER: "otlp" -# OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" -# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" -# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" -## -# kafka-consumer4: -# image: shivanshu1333/kafka-consumer:latest -# container_name: kafka-consumer4 -# environment: -# BOOTSTRAP_SERVERS: "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" -# CONSUMER_GROUP: "cg2" -# TOPIC: "topic2" -# OTEL_SERVICE_NAME: "consumer-svc3" -# OTEL_TRACES_EXPORTER: "otlp" -# OTEL_METRICS_EXPORTER: "otlp" -# OTEL_LOGS_EXPORTER: "otlp" -# OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" -# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" -# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + kafka-consumer1: + image: shivanshu1333/kafka-consumer:latest + container_name: consumer1 + environment: + BOOTSTRAP_SERVERS: "broker1:19092,broker2:19093,broker3:19094" + CONSUMER_GROUP: "cg1" + TOPIC: "topic1" + OTEL_SERVICE_NAME: "consumer-svc" + OTEL_TRACES_EXPORTER: "none" + OTEL_METRICS_EXPORTER: "none" + OTEL_LOGS_EXPORTER: "none" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" + + kafka-consumer2: + image: shivanshu1333/kafka-consumer:latest + container_name: consumer2 + environment: + BOOTSTRAP_SERVERS: "broker1:19092,broker2:19093,broker3:19094" + CONSUMER_GROUP: "cg1" + TOPIC: "topic1" + OTEL_SERVICE_NAME: "consumer-svc1" + OTEL_TRACES_EXPORTER: "otlp" + OTEL_METRICS_EXPORTER: "otlp" + OTEL_LOGS_EXPORTER: "otlp" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" # -# kafka-consumer5: -# image: shivanshu1333/kafka-consumer:latest -# container_name: kafka-consumer5 -# environment: -# BOOTSTRAP_SERVERS: "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" -# CONSUMER_GROUP: "cg3" -# TOPIC: "topic3" -# OTEL_SERVICE_NAME: "consumer-svc3" -# OTEL_TRACES_EXPORTER: "otlp" -# OTEL_METRICS_EXPORTER: "otlp" -# OTEL_LOGS_EXPORTER: "otlp" -# OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" -# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" -# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" + kafka-consumer3: + image: shivanshu1333/kafka-consumer:latest + container_name: consumer3 + environment: + BOOTSTRAP_SERVERS: "broker1:19092,broker2:19093,broker3:19094" + CONSUMER_GROUP: "cg2" + TOPIC: "topic2" + OTEL_SERVICE_NAME: "consumer-svc2" + OTEL_TRACES_EXPORTER: "otlp" + OTEL_METRICS_EXPORTER: "otlp" + OTEL_LOGS_EXPORTER: "otlp" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" # -# kafka-consumer6: -# image: shivanshu1333/kafka-consumer:latest -# container_name: kafka-consumer6 -# environment: -# BOOTSTRAP_SERVERS: "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" -# CONSUMER_GROUP: "cg3" -# TOPIC: "topic3" -# OTEL_SERVICE_NAME: "consumer-svc3" -# OTEL_TRACES_EXPORTER: "otlp" -# OTEL_METRICS_EXPORTER: "otlp" -# OTEL_LOGS_EXPORTER: "otlp" -# OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://otel-collector:4318/v1/logs" -# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://otel-collector:4318/v1/metrics" -# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://otel-collector:4318/v1/traces" \ No newline at end of file + kafka-consumer4: + image: shivanshu1333/kafka-consumer:latest + container_name: consumer4 + environment: + BOOTSTRAP_SERVERS: "broker1:19092,broker2:19093,broker3:19094" + CONSUMER_GROUP: "cg2" + TOPIC: "topic2" + OTEL_SERVICE_NAME: "consumer-svc3" + OTEL_TRACES_EXPORTER: "otlp" + OTEL_METRICS_EXPORTER: "otlp" + OTEL_LOGS_EXPORTER: "otlp" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" + + kafka-consumer5: + image: shivanshu1333/kafka-consumer:latest + container_name: consumer5 + environment: + BOOTSTRAP_SERVERS: "broker1:19092,broker2:19093,broker3:19094" + CONSUMER_GROUP: "cg3" + TOPIC: "topic3" + OTEL_SERVICE_NAME: "consumer-svc3" + OTEL_TRACES_EXPORTER: "otlp" + OTEL_METRICS_EXPORTER: "otlp" + OTEL_LOGS_EXPORTER: "otlp" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" + + kafka-consumer6: + image: shivanshu1333/kafka-consumer:latest + container_name: consumer6 + environment: + BOOTSTRAP_SERVERS: "broker1:19092,broker2:19093,broker3:19094" + CONSUMER_GROUP: "cg3" + TOPIC: "topic3" + OTEL_SERVICE_NAME: "consumer-svc3" + OTEL_TRACES_EXPORTER: "otlp" + OTEL_METRICS_EXPORTER: "otlp" + OTEL_LOGS_EXPORTER: "otlp" + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: "http://host.docker.internal:4318/v1/logs" + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "http://host.docker.internal:4318/v1/metrics" + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://host.docker.internal:4318/v1/traces" diff --git a/docker/readme.md b/docker/readme.md index fad7938..51454b7 100644 --- a/docker/readme.md +++ b/docker/readme.md @@ -15,13 +15,13 @@ cd collector nohup ./otelcol-contrib --config collector-contrib-config.yaml > /dev/null 2>&1 & ``` -## start producer +## start producer and consumers ```bash cd docker nohup sudo docker compose -f producer-consumer.yaml up > /dev/null 2>&1 & ``` -## start consumer +## (optional) start producer or consumer via script, this doesn't use the docker image ```bash cd scripts nohup ./consumer1.sh > /dev/null 2>&1 & diff --git a/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java b/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java index 690cc80..b2c629d 100644 --- a/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java +++ b/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java @@ -22,7 +22,7 @@ public class BaseConsumer { private static final String DEFAULT_TOPIC = "topic1"; private static final String DEFAULT_CONSUMER_GROUP = "my-consumer-group"; - private static final Logger log = LogManager.getLogger(BaseConsumer.class); + public static final Logger log = LogManager.getLogger(BaseConsumer.class); protected String bootstrapServers; protected String consumerGroup; @@ -32,25 +32,53 @@ public class BaseConsumer { protected AtomicBoolean running = new AtomicBoolean(true); public void run(CountDownLatch latch) { - log.info("Subscribe to topic [{}]", this.topic); + log.info("Subscribing to topic [{}]", this.topic); this.consumer.subscribe(List.of(this.topic)); + try { - log.info("Polling ..."); + log.info("Polling for records..."); while (this.running.get()) { - ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - log.info("Received message key = [{}], value = [{}], offset = [{}]", record.key(), record.value(), record.offset()); + try { + ConsumerRecords records = this.consumer.poll(Duration.ofMillis(10000)); + + for (ConsumerRecord record : records) { + log.info("Received message key = [{}], value = [{}], offset = [{}]", record.key(), record.value(), record.offset()); + + try { + this.consumer.commitSync(); + log.info("Successfully committed offset for record key = [{}]", record.key()); + } catch (CommitFailedException cfe) { + log.error("CommitFailedException while committing offset for record key = [{}].", record.key(), cfe); + } + } + } catch (WakeupException we) { + if (running.get()) { + log.warn("Received WakeupException while polling, but consumer is not shutting down.", we); + throw we; + } + log.info("Consumer shutdown initiated, WakeupException caught and handled."); + } catch (Exception e) { + log.error("Unexpected error during polling. Consumer will continue polling.", e); } } - } catch (WakeupException we) { - // Ignore exception if closing - if (running.get()) throw we; + } catch (Exception e) { + log.error("Error in Kafka consumer run method, shutting down consumer.", e); } finally { - this.consumer.close(); - latch.countDown(); + try { + log.info("Closing Kafka consumer..."); + this.consumer.unsubscribe(); + this.consumer.close(); + log.info("Kafka consumer closed."); + } catch (Exception e) { + log.error("Error occurred while closing Kafka consumer.", e); + } finally { + latch.countDown(); + log.info("CountDownLatch decremented, consumer run method exiting."); + } } } + public void loadConfiguration(Map map) { this.bootstrapServers = map.getOrDefault(BOOTSTRAP_SERVERS_ENV_VAR, DEFAULT_BOOTSTRAP_SERVERS); this.consumerGroup = map.getOrDefault(CONSUMER_GROUP_ENV_VAR, DEFAULT_CONSUMER_GROUP); @@ -64,6 +92,12 @@ public Properties loadKafkaConsumerProperties() { props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(CommonClientConfigs.GROUP_ID_CONFIG, this.consumerGroup); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); + props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); + props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); + return props; } diff --git a/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseProducer.java b/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseProducer.java index aae6fd7..c2917a7 100644 --- a/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseProducer.java +++ b/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseProducer.java @@ -5,6 +5,9 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,38 +39,87 @@ public abstract class BaseProducer { public void run() { try { + createKafkaProducer(loadKafkaProducerProperties()); + int messageCount = 0; - while (true) { // Infinite loop to send messages periodically + while (messageCount < numMessages) { String message = "my-value-" + messageCount++; ProducerRecord record = new ProducerRecord<>(this.topic, this.producerKey, message); - this.producer.send(record); - log.info("Message [{}] sent to topic [{}]", message, this.topic); + + try { + this.producer.send(record, (metadata, exception) -> { + if (exception == null) { + log.info("Message [{}] sent to topic [{}] partition [{}] with offset [{}]", + message, metadata.topic(), metadata.partition(), metadata.offset()); + } else { + log.error("Error sending message [{}] to topic [{}]", message, this.topic, exception); + } + }); + } catch (SerializationException | TimeoutException e) { + log.error("Error serializing or sending the record", e); + } catch (ProducerFencedException e) { + log.error("Producer encountered an unrecoverable error", e); + break; + } + Thread.sleep(this.delay); } } catch (InterruptedException e) { log.error("Producer was interrupted", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("Unexpected error occurred", e); } finally { - this.producer.close(); + if (this.producer != null) { + try { + this.producer.close(); + } catch (Exception e) { + log.error("Error closing the producer", e); + } + } } } public void loadConfiguration(Map map) { - this.bootstrapServers = map.getOrDefault(BOOTSTRAP_SERVERS_ENV_VAR, DEFAULT_BOOTSTRAP_SERVERS); - this.topic = map.getOrDefault(TOPIC_ENV_VAR, DEFAULT_TOPIC); - this.producerKey = map.getOrDefault(PARTITION_KEY_ENV_VAR, PARTITION_KEY); - this.numMessages = Integer.parseInt(map.getOrDefault(NUM_MESSAGES_ENV_VAR, DEFAULT_NUM_MESSAGES)); - this.delay = Long.parseLong(map.getOrDefault(DELAY_ENV_VAR, DEFAULT_DELAY)); + try { + this.bootstrapServers = map.getOrDefault(BOOTSTRAP_SERVERS_ENV_VAR, DEFAULT_BOOTSTRAP_SERVERS); + this.topic = map.getOrDefault(TOPIC_ENV_VAR, DEFAULT_TOPIC); + this.producerKey = map.getOrDefault(PARTITION_KEY_ENV_VAR, PARTITION_KEY); + this.numMessages = Integer.parseInt(map.getOrDefault(NUM_MESSAGES_ENV_VAR, DEFAULT_NUM_MESSAGES)); + this.delay = Long.parseLong(map.getOrDefault(DELAY_ENV_VAR, DEFAULT_DELAY)); + log.info("Configuration loaded: bootstrapServers={}, topic={}, producerKey={}, numMessages={}, delay={}", + bootstrapServers, topic, producerKey, numMessages, delay); + } catch (NumberFormatException e) { + log.error("Invalid number format in configuration", e); + throw new IllegalArgumentException("Configuration error: Invalid number format", e); + } catch (Exception e) { + log.error("Error loading configuration", e); + throw new RuntimeException("Configuration error", e); + } } public Properties loadKafkaProducerProperties() { Properties props = new Properties(); - props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + try { + props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + log.info("Kafka Producer properties loaded successfully."); + } catch (Exception e) { + log.error("Error loading Kafka producer properties", e); + throw new RuntimeException("Error in Kafka Producer properties", e); + } return props; } public void createKafkaProducer(Properties props) { - this.producer = new KafkaProducer<>(props); + try { + this.producer = new KafkaProducer<>(props); + log.info("Kafka Producer created successfully."); + } catch (Exception e) { + log.error("Error creating Kafka Producer", e); + throw new RuntimeException("Failed to create Kafka Producer", e); + } } + } diff --git a/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class b/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class index 7bb55c5..e60a399 100644 Binary files a/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class and b/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class differ diff --git a/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar b/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar index 6f33ea8..8150d5b 100644 Binary files a/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar and b/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar differ diff --git a/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java b/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java index d56f315..b366c6b 100644 --- a/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java +++ b/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java @@ -7,19 +7,55 @@ public class Consumer extends BaseConsumer { - public static void main(String[] args) throws IOException, InterruptedException { + public static void main(String[] args) { Consumer consumer = new Consumer(); - consumer.loadConfiguration(System.getenv()); - Properties props = consumer.loadKafkaConsumerProperties(); - consumer.createKafkaConsumer(props); - CountDownLatch latch = new CountDownLatch(1); - Thread consumerThread = new Thread(() -> consumer.run(latch)); - consumerThread.start(); - System.in.read(); - consumer.running.set(false); - latch.await(10000, TimeUnit.MILLISECONDS); + try { + consumer.loadConfiguration(System.getenv()); + + Properties props = consumer.loadKafkaConsumerProperties(); + consumer.createKafkaConsumer(props); + + Thread consumerThread = new Thread(() -> { + try { + consumer.run(latch); + } catch (Exception e) { + log.error("Error occurred while running Kafka consumer: ", e); + } + }); + consumerThread.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutdown hook triggered, stopping the consumer..."); + consumer.running.set(false); + + try { + if (!latch.await(10000, TimeUnit.MILLISECONDS)) { + log.warn("Consumer did not shut down gracefully within the timeout."); + } else { + log.info("Consumer shut down gracefully."); + } + } catch (InterruptedException e) { + log.error("Interrupted while waiting for consumer to shut down.", e); + Thread.currentThread().interrupt(); + } + })); + + log.info("Application is running. Press Ctrl+C to exit."); + latch.await(); + } catch (Exception e) { + log.error("Unexpected error in main method: ", e); + } finally { + try { + latch.countDown(); + } catch (Exception e) { + log.error("Error during final latch countdown: ", e); + } + log.info("Application has exited."); + } } } + + diff --git a/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class b/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class index d32aede..6b570cd 100644 Binary files a/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class and b/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class differ diff --git a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar index 181f9f9..9054d08 100644 Binary files a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar and b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar differ diff --git a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar index c571d23..0c5199c 100644 Binary files a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar and b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar differ diff --git a/kafka-app-otel/kafka-producer/src/main/java/io/shivanshuraj1333/kafka/otel/Producer.java b/kafka-app-otel/kafka-producer/src/main/java/io/shivanshuraj1333/kafka/otel/Producer.java index 48d1025..ae0cbfe 100644 --- a/kafka-app-otel/kafka-producer/src/main/java/io/shivanshuraj1333/kafka/otel/Producer.java +++ b/kafka-app-otel/kafka-producer/src/main/java/io/shivanshuraj1333/kafka/otel/Producer.java @@ -1,26 +1,59 @@ package io.shivanshuraj1333.kafka.otel; +import java.util.Map; import java.util.Properties; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class Producer extends BaseProducer { - public static void main(String[] args) throws InterruptedException { + private static final Logger log = LogManager.getLogger(Producer.class); + + public static void main(String[] args) { Producer producer = new Producer(); - producer.loadConfiguration(System.getenv()); - Properties props = producer.loadKafkaProducerProperties(); - producer.createKafkaProducer(props); + try { + Map configMap = loadEnvironmentVariables(); + producer.loadConfiguration(configMap); + + Properties props = producer.loadKafkaProducerProperties(); + producer.createKafkaProducer(props); + + Thread producerThread = new Thread(producer::run, "Kafka-Producer-Thread"); + producerThread.start(); + log.info("Kafka Producer started successfully."); + + addShutdownHook(producer, producerThread); - Thread producerThread = new Thread(producer::run); - producerThread.start(); + } catch (Exception e) { + log.error("Failed to initialize and start the Kafka Producer", e); + } + } + + private static Map loadEnvironmentVariables() { + return System.getenv(); + } - // Let the producer run indefinitely or for a fixed time before stopping. + private static void addShutdownHook(Producer producer, Thread producerThread) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutdown initiated. Attempting to stop Kafka Producer..."); try { producerThread.interrupt(); producerThread.join(); + log.info("Kafka Producer stopped successfully."); } catch (InterruptedException e) { - // Handle exception + log.warn("Shutdown interrupted while waiting for producer thread to stop.", e); + Thread.currentThread().interrupt(); // Restore interrupt status + } catch (Exception e) { + log.error("Error during Kafka Producer shutdown", e); + } finally { + if (producer.producer != null) { + try { + producer.producer.close(); + } catch (Exception e) { + log.error("Error closing the Kafka Producer", e); + } + } } - })); + }, "Kafka-Producer-Shutdown-Hook")); } } diff --git a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar index 5edcd83..e2ddbe0 100644 Binary files a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar and b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar differ diff --git a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar index 0157b0d..f7c2dda 100644 Binary files a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar and b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar differ