From ea0df5363e1161712092ad586b3d86bb59df5097 Mon Sep 17 00:00:00 2001 From: Ramakrishna Pattnaik Date: Wed, 8 Feb 2023 11:34:01 +0530 Subject: [PATCH] feat: add example for java client --- code-examples/kafka-java-maven/.gitignore | 2 + code-examples/kafka-java-maven/pom.xml | 65 +++++++++++++++++++ .../java/org/example/ConsumerExample.java | 47 ++++++++++++++ .../main/java/org/example/KafkaConfig.java | 30 +++++++++ .../java/org/example/ProducerExample.java | 23 +++++++ 5 files changed, 167 insertions(+) create mode 100644 code-examples/kafka-java-maven/.gitignore create mode 100644 code-examples/kafka-java-maven/pom.xml create mode 100644 code-examples/kafka-java-maven/src/main/java/org/example/ConsumerExample.java create mode 100644 code-examples/kafka-java-maven/src/main/java/org/example/KafkaConfig.java create mode 100644 code-examples/kafka-java-maven/src/main/java/org/example/ProducerExample.java diff --git a/code-examples/kafka-java-maven/.gitignore b/code-examples/kafka-java-maven/.gitignore new file mode 100644 index 000000000..e673575a7 --- /dev/null +++ b/code-examples/kafka-java-maven/.gitignore @@ -0,0 +1,2 @@ +.idea/ +target/ \ No newline at end of file diff --git a/code-examples/kafka-java-maven/pom.xml b/code-examples/kafka-java-maven/pom.xml new file mode 100644 index 000000000..7e738fe19 --- /dev/null +++ b/code-examples/kafka-java-maven/pom.xml @@ -0,0 +1,65 @@ + + + 4.0.0 + + org.example + kafka-java-maven + 1.0-SNAPSHOT + + + 11 + 11 + UTF-8 + + + + + + org.apache.kafka + kafka-clients + 3.2.1 + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + 2.8.5 + + + + + + producer + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + org.example.ProducerExample + + + + + + + consumer + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + org.example.ConsumerExample + + + + + + + + \ No newline at end of file diff --git a/code-examples/kafka-java-maven/src/main/java/org/example/ConsumerExample.java b/code-examples/kafka-java-maven/src/main/java/org/example/ConsumerExample.java new file mode 100644 index 000000000..89cd0df0e --- /dev/null +++ b/code-examples/kafka-java-maven/src/main/java/org/example/ConsumerExample.java @@ -0,0 +1,47 @@ +package org.example; + +import java.util.Arrays; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import java.time.Duration; + +public class ConsumerExample { + + public static void main(String[] args) { + + var properties= KafkaConfig.properties(); + int MAX_MESSAGES_CONSUMED = 1; + int messagesCount = 0; + + if(args.length > 0) { + MAX_MESSAGES_CONSUMED = Integer.parseInt(args[0]); + } + + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test_group_2"); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KafkaConsumer consumer = new KafkaConsumer(properties); + + //Subscribing + consumer.subscribe(Arrays.asList("prices")); + + //polling + while(true){ + if(messagesCount >= MAX_MESSAGES_CONSUMED) { + break; + } + ConsumerRecords records=consumer.poll(Duration.ofMillis(100)); + for(ConsumerRecord record: records) { + System.out.println("Key: "+ record.key() + ", Value:" +record.value()); + System.out.println("Partition:" + record.partition()+",Offset:"+record.offset()); + messagesCount ++; + } + } + } +} diff --git a/code-examples/kafka-java-maven/src/main/java/org/example/KafkaConfig.java b/code-examples/kafka-java-maven/src/main/java/org/example/KafkaConfig.java new file mode 100644 index 000000000..fc261e4e5 --- /dev/null +++ b/code-examples/kafka-java-maven/src/main/java/org/example/KafkaConfig.java @@ -0,0 +1,30 @@ +package org.example; + +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Properties; + +public class KafkaConfig { + + static Properties properties() { + + String kafkaHost = System.getenv("KAFKA_HOST"); + String rhoasClientID = System.getenv("RHOAS_SERVICE_ACCOUNT_CLIENT_ID"); + String rhoasClientSecret = System.getenv("RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET"); + String rhoasOauthTokenUrl = System.getenv("RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL"); + + var properties= new Properties(); + + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost); + properties.setProperty("security.protocol", "SASL_SSL"); + properties.setProperty("sasl.mechanism", "OAUTHBEARER"); + + properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"" + rhoasClientID + "\" clientSecret=\"" + rhoasClientSecret + "\" oauth.token.endpoint.uri=\"" + rhoasOauthTokenUrl + "\";"); + + properties.setProperty("sasl.login.callback.handler.class", "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"); + properties.setProperty("sasl.oauthbearer.token.endpoint.url", rhoasOauthTokenUrl); + properties.setProperty("sasl.oauthbearer.scope.claim.name", "api.iam.service_accounts"); + + return properties; + } +} diff --git a/code-examples/kafka-java-maven/src/main/java/org/example/ProducerExample.java b/code-examples/kafka-java-maven/src/main/java/org/example/ProducerExample.java new file mode 100644 index 000000000..013a0da87 --- /dev/null +++ b/code-examples/kafka-java-maven/src/main/java/org/example/ProducerExample.java @@ -0,0 +1,23 @@ +package org.example; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringSerializer; + +public class ProducerExample { + + public static void main(String[] args) { + + //Creating producer properties + var properties= KafkaConfig.properties(); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + KafkaProducer producer= new KafkaProducer(properties); + + producer.send(new ProducerRecord<>("prices", "Test Message")); + producer.flush(); + producer.close(); + } +} \ No newline at end of file