From 82f299308564dfe5ad481cfb5f70bba4252c4140 Mon Sep 17 00:00:00 2001 From: Benedikt Linse Date: Tue, 31 May 2022 07:34:39 +0200 Subject: [PATCH 1/3] first draft for a sample producer for writing data in the CloudEvents format --- clients/cloud/java-cloud-events/pom.xml | 141 ++++++++++++++++++ .../src/main/java/SampleProducer.java | 101 +++++++++++++ .../src/main/resources/.gitignore | 1 + .../resources/producer.properties.template | 11 ++ 4 files changed, 254 insertions(+) create mode 100644 clients/cloud/java-cloud-events/pom.xml create mode 100644 clients/cloud/java-cloud-events/src/main/java/SampleProducer.java create mode 100644 clients/cloud/java-cloud-events/src/main/resources/.gitignore create mode 100644 clients/cloud/java-cloud-events/src/main/resources/producer.properties.template diff --git a/clients/cloud/java-cloud-events/pom.xml b/clients/cloud/java-cloud-events/pom.xml new file mode 100644 index 000000000..d2f02aaa0 --- /dev/null +++ b/clients/cloud/java-cloud-events/pom.xml @@ -0,0 +1,141 @@ + + + 4.0.0 + + io.confluent.examples + kafka-clients-cloud-events-example + jar + 7.1.1 + + + Confluent, Inc. + http://confluent.io + + http://confluent.io + + Example for writing business events in the Cloud Events Specifification to Kafka and registering the event schema with Confluent Schema Registry. + + + + 3.1.0 + 7.0.1 + 8 + 1.7.6 + UTF-8 + 2.2.4 + http://localhost:8081 + + 1.7.6 + + + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + + + confluent + Confluent + https://packages.confluent.io/maven/ + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + io.confluent + kafka-json-schema-serializer + ${io.confluent.schema-registry.version} + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + com.fasterxml.jackson.core + jackson-databind + 2.11.2 + + + com.google.code.gson + gson + ${gson.version} + + + org.projectlombok + lombok + RELEASE + compile + + + org.projectlombok + lombok + RELEASE + compile + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + true + + 17 + 17 + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + java + + + + + + + diff --git a/clients/cloud/java-cloud-events/src/main/java/SampleProducer.java b/clients/cloud/java-cloud-events/src/main/java/SampleProducer.java new file mode 100644 index 000000000..42aa318d5 --- /dev/null +++ b/clients/cloud/java-cloud-events/src/main/java/SampleProducer.java @@ -0,0 +1,101 @@ +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.SneakyThrows; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Properties; +import java.util.UUID; + +@Data +@Builder +@AllArgsConstructor +class CloudEvent { + private String id; + private URI source; + private String specVersion; + private String type; + private String datacontenttype; + private URI dataschema; + private String subject; + private long timestamp; + private T data; +} + +@Data +@Builder +@AllArgsConstructor +class Order { + UUID productID; + UUID customerID; + long timestamp; +} + +@Data +@AllArgsConstructor +@Builder +class OrderCloudEvent { + private String id; + private URI source; + private String specVersion; + private String type; + private String datacontenttype; + private URI dataschema; + private String subject; + private long timestamp; + private Order data; +} + +public class SampleProducer { + + @SneakyThrows + private static Properties producerProperties() throws IOException { + Properties prop = new Properties(); + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + InputStream stream = loader.getResourceAsStream("producer.properties"); + prop.load(stream); + return prop; + } + + public static void main(String [] args) { + // produceGenericCloudEvent(); + produceNonGenericCloudEvent(); + } + + @SneakyThrows + private static void produceGenericCloudEvent() { + KafkaProducer> kafkaProducer = new KafkaProducer<>(producerProperties()); + Order order = Order.builder().productID(UUID.randomUUID()).customerID(UUID.randomUUID()).timestamp(System.currentTimeMillis()).build(); + CloudEvent orderEvent = CloudEvent.builder() + .data(order) + .id(UUID.randomUUID().toString()) + .timestamp(System.currentTimeMillis()) + .source(new URI("https://my.producer.application.org")) + .specVersion("1.0.1") + .build(); + var result = kafkaProducer.send(new ProducerRecord<>("order-topic", orderEvent.getId(), orderEvent)); + System.err.println(result.get().toString()); + } + + @SneakyThrows + private static void produceNonGenericCloudEvent() { + KafkaProducer kafkaProducer = new KafkaProducer<>(producerProperties()); + Order order = Order.builder().productID(UUID.randomUUID()).customerID(UUID.randomUUID()).timestamp(System.currentTimeMillis()).build(); + OrderCloudEvent orderCloudEvent = OrderCloudEvent.builder() + .data(order) + .id(UUID.randomUUID().toString()) + .specVersion("1.0.1") + .source(new URI("https://my.producer.application.org")) + .build(); + var result = kafkaProducer.send( + new ProducerRecord<>("order-cloud-events", orderCloudEvent.getId(), orderCloudEvent) + ); + System.err.println(result.get().toString()); + } +} diff --git a/clients/cloud/java-cloud-events/src/main/resources/.gitignore b/clients/cloud/java-cloud-events/src/main/resources/.gitignore new file mode 100644 index 000000000..85f39cc46 --- /dev/null +++ b/clients/cloud/java-cloud-events/src/main/resources/.gitignore @@ -0,0 +1 @@ +producer.properties diff --git a/clients/cloud/java-cloud-events/src/main/resources/producer.properties.template b/clients/cloud/java-cloud-events/src/main/resources/producer.properties.template new file mode 100644 index 000000000..86233e4fc --- /dev/null +++ b/clients/cloud/java-cloud-events/src/main/resources/producer.properties.template @@ -0,0 +1,11 @@ +bootstrap.servers=<...>.confluent.cloud:9092 +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='' password=''; +sasl.mechanism=PLAIN +acks=all +value.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer +key.serializer=org.apache.kafka.common.serialization.StringSerializer +schema.registry.url=https://<...>.confluent.cloud +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=: +json.oneof.for.nullables=false From 93fe1b8b8077592f30e5e75dbddb6ba8c82916f0 Mon Sep 17 00:00:00 2001 From: Benedikt Linse Date: Tue, 31 May 2022 15:08:39 +0200 Subject: [PATCH 2/3] adding README and a script for running the app --- clients/cloud/java-cloud-events/.gitignore | 1 + clients/cloud/java-cloud-events/README.md | 57 +++++++++++++++++++ .../java-cloud-events/run-sample-producer.sh | 3 + .../cloud/cloudevents}/SampleProducer.java | 22 +++++-- 4 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 clients/cloud/java-cloud-events/.gitignore create mode 100644 clients/cloud/java-cloud-events/README.md create mode 100755 clients/cloud/java-cloud-events/run-sample-producer.sh rename clients/cloud/java-cloud-events/src/main/java/{ => io/confluent/samples/cloud/cloudevents}/SampleProducer.java (78%) diff --git a/clients/cloud/java-cloud-events/.gitignore b/clients/cloud/java-cloud-events/.gitignore new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/clients/cloud/java-cloud-events/.gitignore @@ -0,0 +1 @@ + diff --git a/clients/cloud/java-cloud-events/README.md b/clients/cloud/java-cloud-events/README.md new file mode 100644 index 000000000..64eb10280 --- /dev/null +++ b/clients/cloud/java-cloud-events/README.md @@ -0,0 +1,57 @@ +## Writing Cloud Events to Confluent Cloud + +[Cloud Events](https://github.com/cloudevents/spec) is a well-known standard for attaching Meta-Data to business relevant events. + +Cloud Events Metadata includes the following attributes amongst others: +* logical producer (event source), +* the subject (the person or thing) that an event pertains to, +* an event identifier, +* the type of an event, +* the time when the event occurred. + +For the full set of attributes see the [specification](https://github.com/cloudevents/spec) + +In this example, use the online order domain as an example for sending Cloud Events data to +Confluent Cloud and registering the schema with Confluent Schema Registry. + +A Cloud Event in the order domain could be an event for the creation of a new order. +This cloud event could be serialized as follows in JSON: + +``` +{ + "id": "614282a3-7818-48bf-8a74-85332e5579c7", + "source": "/v1/orders", + "specVersion": "1.0", + "type": "io.confluent.samples.orders.created", + "datacontenttype": "application/json", + "dataschema": null, + "subject": "60f9a967-077a-43ff-be2c-0c14c09bcb3a", + "timestamp": 1653990696381, + "data": { + "productID": "21c2d736-56b4-4ddf-9dbf-5ebc3c79e126", + "customerID": "68e5bde6-c5d5-488c-8469-8c9853d94589", + "timestamp": 1653990696380 + } +} +``` + +#### Prerequisites + +The following items are required to run this demo: + +* access to a Confluent Cloud cluster +* a Confluent Cloud API Key for Kafka +* a Confluent Cloud API Key for Schema Registry +* a recent version of Java and Javac +* a recent version of Apache Maven +* Access to Maven Central for downloading the dependencies + +#### Running the Demo + +* Create a topic named `order-cloud-events` either via the confluent CLI or via the Confluent Cloud UI. + A single partition is sufficient. +* Copy `src/main/resources/application.properties.template` to `src/main/resources/application.properties`, + and fill in the bootstrap servers url, the schema registry url, your API keys and secrets for Kafka as well as for schema registry. +* Compile the code: `mvn compile` +* run the SampleProducer application: `./run-sample-producer.sh` +* Go to the Confluent Cloud UI of your cluster, and inspect the messages produced to the topic, as well as the associated schema. diff --git a/clients/cloud/java-cloud-events/run-sample-producer.sh b/clients/cloud/java-cloud-events/run-sample-producer.sh new file mode 100755 index 000000000..cee7dbead --- /dev/null +++ b/clients/cloud/java-cloud-events/run-sample-producer.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +mvn exec:java -Dexec.mainClass="io.confluent.samples.cloud.cloudevents.SampleProducer" diff --git a/clients/cloud/java-cloud-events/src/main/java/SampleProducer.java b/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java similarity index 78% rename from clients/cloud/java-cloud-events/src/main/java/SampleProducer.java rename to clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java index 42aa318d5..082eb9a5e 100644 --- a/clients/cloud/java-cloud-events/src/main/java/SampleProducer.java +++ b/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java @@ -1,11 +1,14 @@ +package io.confluent.samples.cloud.cloudevents; + +import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; import java.io.IOException; import java.io.InputStream; @@ -52,6 +55,7 @@ class OrderCloudEvent { private Order data; } +@Slf4j public class SampleProducer { @SneakyThrows @@ -76,9 +80,14 @@ private static void produceGenericCloudEvent() { .data(order) .id(UUID.randomUUID().toString()) .timestamp(System.currentTimeMillis()) - .source(new URI("https://my.producer.application.org")) + .subject(UUID.randomUUID().toString()) + .type("io.confluent.samples.orders.created") + .datacontenttype("application/json") + .source(new URI("/v1/orders")) + .timestamp(System.currentTimeMillis()) .specVersion("1.0.1") .build(); + log.info(new ObjectMapper().writeValueAsString(orderEvent)); var result = kafkaProducer.send(new ProducerRecord<>("order-topic", orderEvent.getId(), orderEvent)); System.err.println(result.get().toString()); } @@ -90,9 +99,14 @@ private static void produceNonGenericCloudEvent() { OrderCloudEvent orderCloudEvent = OrderCloudEvent.builder() .data(order) .id(UUID.randomUUID().toString()) - .specVersion("1.0.1") - .source(new URI("https://my.producer.application.org")) + .specVersion("1.0") + .subject(UUID.randomUUID().toString()) + .type("io.confluent.samples.orders.created") + .datacontenttype("application/json") + .timestamp(System.currentTimeMillis()) + .source(new URI("/v1/orders")) .build(); + log.info(new ObjectMapper().writeValueAsString(orderCloudEvent)); var result = kafkaProducer.send( new ProducerRecord<>("order-cloud-events", orderCloudEvent.getId(), orderCloudEvent) ); From 3f0aaf56f180911f77bae25c2a7ad49989b99199 Mon Sep 17 00:00:00 2001 From: Benedikt Linse Date: Tue, 31 May 2022 15:09:23 +0200 Subject: [PATCH 3/3] cleanup --- .../cloud/cloudevents/SampleProducer.java | 40 +------------------ 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java b/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java index 082eb9a5e..44050271b 100644 --- a/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java +++ b/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java @@ -1,7 +1,6 @@ package io.confluent.samples.cloud.cloudevents; import com.fasterxml.jackson.databind.ObjectMapper; -import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -10,27 +9,11 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.Properties; import java.util.UUID; -@Data -@Builder -@AllArgsConstructor -class CloudEvent { - private String id; - private URI source; - private String specVersion; - private String type; - private String datacontenttype; - private URI dataschema; - private String subject; - private long timestamp; - private T data; -} - @Data @Builder @AllArgsConstructor @@ -59,7 +42,7 @@ class OrderCloudEvent { public class SampleProducer { @SneakyThrows - private static Properties producerProperties() throws IOException { + private static Properties producerProperties() { Properties prop = new Properties(); ClassLoader loader = Thread.currentThread().getContextClassLoader(); InputStream stream = loader.getResourceAsStream("producer.properties"); @@ -68,30 +51,9 @@ private static Properties producerProperties() throws IOException { } public static void main(String [] args) { - // produceGenericCloudEvent(); produceNonGenericCloudEvent(); } - @SneakyThrows - private static void produceGenericCloudEvent() { - KafkaProducer> kafkaProducer = new KafkaProducer<>(producerProperties()); - Order order = Order.builder().productID(UUID.randomUUID()).customerID(UUID.randomUUID()).timestamp(System.currentTimeMillis()).build(); - CloudEvent orderEvent = CloudEvent.builder() - .data(order) - .id(UUID.randomUUID().toString()) - .timestamp(System.currentTimeMillis()) - .subject(UUID.randomUUID().toString()) - .type("io.confluent.samples.orders.created") - .datacontenttype("application/json") - .source(new URI("/v1/orders")) - .timestamp(System.currentTimeMillis()) - .specVersion("1.0.1") - .build(); - log.info(new ObjectMapper().writeValueAsString(orderEvent)); - var result = kafkaProducer.send(new ProducerRecord<>("order-topic", orderEvent.getId(), orderEvent)); - System.err.println(result.get().toString()); - } - @SneakyThrows private static void produceNonGenericCloudEvent() { KafkaProducer kafkaProducer = new KafkaProducer<>(producerProperties());