diff --git a/clients/cloud/java-cloud-events/.gitignore b/clients/cloud/java-cloud-events/.gitignore new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/clients/cloud/java-cloud-events/.gitignore @@ -0,0 +1 @@ + diff --git a/clients/cloud/java-cloud-events/README.md b/clients/cloud/java-cloud-events/README.md new file mode 100644 index 000000000..64eb10280 --- /dev/null +++ b/clients/cloud/java-cloud-events/README.md @@ -0,0 +1,57 @@ +## Writing Cloud Events to Confluent Cloud + +[Cloud Events](https://github.com/cloudevents/spec) is a well-known standard for attaching Meta-Data to business relevant events. + +Cloud Events Metadata includes the following attributes amongst others: +* logical producer (event source), +* the subject (the person or thing) that an event pertains to, +* an event identifier, +* the type of an event, +* the time when the event occurred. + +For the full set of attributes see the [specification](https://github.com/cloudevents/spec) + +In this example, use the online order domain as an example for sending Cloud Events data to +Confluent Cloud and registering the schema with Confluent Schema Registry. + +A Cloud Event in the order domain could be an event for the creation of a new order. +This cloud event could be serialized as follows in JSON: + +``` +{ + "id": "614282a3-7818-48bf-8a74-85332e5579c7", + "source": "/v1/orders", + "specVersion": "1.0", + "type": "io.confluent.samples.orders.created", + "datacontenttype": "application/json", + "dataschema": null, + "subject": "60f9a967-077a-43ff-be2c-0c14c09bcb3a", + "timestamp": 1653990696381, + "data": { + "productID": "21c2d736-56b4-4ddf-9dbf-5ebc3c79e126", + "customerID": "68e5bde6-c5d5-488c-8469-8c9853d94589", + "timestamp": 1653990696380 + } +} +``` + +#### Prerequisites + +The following items are required to run this demo: + +* access to a Confluent Cloud cluster +* a Confluent Cloud API Key for Kafka +* a Confluent Cloud API Key for Schema Registry +* a recent version of Java and Javac +* a recent version of Apache Maven +* Access to Maven Central for downloading the dependencies + +#### Running the Demo + +* Create a topic named `order-cloud-events` either via the confluent CLI or via the Confluent Cloud UI. + A single partition is sufficient. +* Copy `src/main/resources/application.properties.template` to `src/main/resources/application.properties`, + and fill in the bootstrap servers url, the schema registry url, your API keys and secrets for Kafka as well as for schema registry. +* Compile the code: `mvn compile` +* run the SampleProducer application: `./run-sample-producer.sh` +* Go to the Confluent Cloud UI of your cluster, and inspect the messages produced to the topic, as well as the associated schema. diff --git a/clients/cloud/java-cloud-events/pom.xml b/clients/cloud/java-cloud-events/pom.xml new file mode 100644 index 000000000..d2f02aaa0 --- /dev/null +++ b/clients/cloud/java-cloud-events/pom.xml @@ -0,0 +1,141 @@ + + + 4.0.0 + + io.confluent.examples + kafka-clients-cloud-events-example + jar + 7.1.1 + + + Confluent, Inc. + http://confluent.io + + http://confluent.io + + Example for writing business events in the Cloud Events Specifification to Kafka and registering the event schema with Confluent Schema Registry. + + + + 3.1.0 + 7.0.1 + 8 + 1.7.6 + UTF-8 + 2.2.4 + http://localhost:8081 + + 1.7.6 + + + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + + + confluent + Confluent + https://packages.confluent.io/maven/ + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + io.confluent + kafka-json-schema-serializer + ${io.confluent.schema-registry.version} + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + com.fasterxml.jackson.core + jackson-databind + 2.11.2 + + + com.google.code.gson + gson + ${gson.version} + + + org.projectlombok + lombok + RELEASE + compile + + + org.projectlombok + lombok + RELEASE + compile + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + true + + 17 + 17 + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + java + + + + + + + diff --git a/clients/cloud/java-cloud-events/run-sample-producer.sh b/clients/cloud/java-cloud-events/run-sample-producer.sh new file mode 100755 index 000000000..cee7dbead --- /dev/null +++ b/clients/cloud/java-cloud-events/run-sample-producer.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +mvn exec:java -Dexec.mainClass="io.confluent.samples.cloud.cloudevents.SampleProducer" diff --git a/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java b/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java new file mode 100644 index 000000000..44050271b --- /dev/null +++ b/clients/cloud/java-cloud-events/src/main/java/io/confluent/samples/cloud/cloudevents/SampleProducer.java @@ -0,0 +1,77 @@ +package io.confluent.samples.cloud.cloudevents; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.io.InputStream; +import java.net.URI; +import java.util.Properties; +import java.util.UUID; + +@Data +@Builder +@AllArgsConstructor +class Order { + UUID productID; + UUID customerID; + long timestamp; +} + +@Data +@AllArgsConstructor +@Builder +class OrderCloudEvent { + private String id; + private URI source; + private String specVersion; + private String type; + private String datacontenttype; + private URI dataschema; + private String subject; + private long timestamp; + private Order data; +} + +@Slf4j +public class SampleProducer { + + @SneakyThrows + private static Properties producerProperties() { + Properties prop = new Properties(); + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + InputStream stream = loader.getResourceAsStream("producer.properties"); + prop.load(stream); + return prop; + } + + public static void main(String [] args) { + produceNonGenericCloudEvent(); + } + + @SneakyThrows + private static void produceNonGenericCloudEvent() { + KafkaProducer kafkaProducer = new KafkaProducer<>(producerProperties()); + Order order = Order.builder().productID(UUID.randomUUID()).customerID(UUID.randomUUID()).timestamp(System.currentTimeMillis()).build(); + OrderCloudEvent orderCloudEvent = OrderCloudEvent.builder() + .data(order) + .id(UUID.randomUUID().toString()) + .specVersion("1.0") + .subject(UUID.randomUUID().toString()) + .type("io.confluent.samples.orders.created") + .datacontenttype("application/json") + .timestamp(System.currentTimeMillis()) + .source(new URI("/v1/orders")) + .build(); + log.info(new ObjectMapper().writeValueAsString(orderCloudEvent)); + var result = kafkaProducer.send( + new ProducerRecord<>("order-cloud-events", orderCloudEvent.getId(), orderCloudEvent) + ); + System.err.println(result.get().toString()); + } +} diff --git a/clients/cloud/java-cloud-events/src/main/resources/.gitignore b/clients/cloud/java-cloud-events/src/main/resources/.gitignore new file mode 100644 index 000000000..85f39cc46 --- /dev/null +++ b/clients/cloud/java-cloud-events/src/main/resources/.gitignore @@ -0,0 +1 @@ +producer.properties diff --git a/clients/cloud/java-cloud-events/src/main/resources/producer.properties.template b/clients/cloud/java-cloud-events/src/main/resources/producer.properties.template new file mode 100644 index 000000000..86233e4fc --- /dev/null +++ b/clients/cloud/java-cloud-events/src/main/resources/producer.properties.template @@ -0,0 +1,11 @@ +bootstrap.servers=<...>.confluent.cloud:9092 +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='' password=''; +sasl.mechanism=PLAIN +acks=all +value.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer +key.serializer=org.apache.kafka.common.serialization.StringSerializer +schema.registry.url=https://<...>.confluent.cloud +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=: +json.oneof.for.nullables=false