diff --git a/src/piotr_yuxuan/slava.clj b/src/piotr_yuxuan/slava.clj index 61b8bee..b0ce200 100644 --- a/src/piotr_yuxuan/slava.clj +++ b/src/piotr_yuxuan/slava.clj @@ -1,51 +1,90 @@ (ns piotr-yuxuan.slava "FIXME add cljdoc" - (:require [piotr-yuxuan.slava.encode :refer [encode]] + (:require [piotr-yuxuan.slava.config :as config] [piotr-yuxuan.slava.decode :refer [decode]] - [piotr-yuxuan.slava.config :as config]) - (:import (io.confluent.kafka.schemaregistry.avro AvroSchema) + [piotr-yuxuan.slava.encode :refer [encode]]) + (:import (clojure.lang Atom Obj) + (io.confluent.kafka.schemaregistry.avro AvroSchema) (io.confluent.kafka.schemaregistry.client SchemaRegistryClient) (io.confluent.kafka.serializers KafkaAvroSerializer KafkaAvroSerializerConfig KafkaAvroDeserializer) (io.confluent.kafka.serializers.subject.strategy SubjectNameStrategy) + (java.nio ByteBuffer) (java.util Map) (org.apache.avro Schema) + (org.apache.avro.generic GenericContainer) (org.apache.kafka.common.serialization Serializer Deserializer Serdes Serde) - (clojure.lang Atom) - (org.apache.avro.generic GenericContainer))) + (java.util.concurrent ConcurrentHashMap))) -(defn subject-name +(defn ^String subject-name "FIXME add cljdoc" [{:keys [key? ^SubjectNameStrategy subject-name-strategy]} ^String topic] (.subjectName subject-name-strategy topic key? nil)) -(defn resolve-subject-name +(defn ^String resolve-subject-name "FIXME add cljdoc" [config ^String topic m] - (if (contains? (meta m) :piotr-yuxuan.slava/subject-name) - (get (meta m) :piotr-yuxuan.slava/subject-name) + (if (contains? (meta m) ::subject-name) + (get (meta m) ::subject-name) (subject-name config topic))) +(defn cached-schema! + "FIXME add cljdoc" + [^SchemaRegistryClient inner-client schema-id] + (.rawSchema ^AvroSchema (.getSchemaById inner-client schema-id))) + +(defn default-subject-name->id + [subject-name->id inner-client subject-name] + (if (contains? @subject-name->id subject-name) + (get @subject-name->id subject-name) + (let [retrieved-id (.getId (.getLatestSchemaMetadata inner-client subject-name))] + (swap! subject-name->id assoc subject-name retrieved-id) + retrieved-id))) + +(defn subject-name->id + [config inner-client subject-name] + (let [subject-name->id (get-in config [:subject-name->id :ref]) + through (get-in config [:subject-name->id :through] default-subject-name->id)] + (through subject-name->id subject-name) + (get @subject-name->id subject-name + (let [retrieved-id (.getId (.getLatestSchemaMetadata inner-client subject-name))] + (swap! subject-name->id assoc subject-name retrieved-id) + retrieved-id)))) + (defn schema-id! "FIXME add cljdoc" - ;; FIXME on every serialization. Should be cached. - [^SchemaRegistryClient inner-client ^String subject-name] - (.getId (.getLatestSchemaMetadata inner-client subject-name))) + [config topic ^Map m] + (let [subject-name->id (get-in config [:subject-name->id :through] subject-name->id)] + (subject-name->id (resolve-subject-name config topic m)))) (defn resolve-schema-id "FIXME add cljdoc" - [inner-client ^Map m ^String subject-name] - (if (contains? (meta m) :piotr-yuxuan.slava/schema-id) - (get (meta m) :piotr-yuxuan.slava/schema-id) - (schema-id! inner-client subject-name))) + [config topic ^Map m] + (if (contains? (meta m) ::schema-id) + (get (meta m) ::schema-id) + (schema-id! config topic m))) -(defn resolve-schema +(defn ^Schema resolve-schema "FIXME add cljdoc" - ^Schema [^SchemaRegistryClient inner-client ^Map m schema-id] - (cond (contains? (meta m) :piotr-yuxuan.slava/writer-schema) (get (meta m) :piotr-yuxuan.slava/writer-schema) - (contains? (meta m) :piotr-yuxuan.slava/reader-schema) (get (meta m) :piotr-yuxuan.slava/reader-schema) - :else (.rawSchema ^AvroSchema (.getSchemaById inner-client schema-id)))) + [config ^SchemaRegistryClient inner-client topic ^Map m] + (cond (contains? (meta m) ::schema) (get (meta m) ::schema) ; User-defined, takes precedence. + (contains? (meta m) ::writer-schema) (get (meta m) ::writer-schema) + (contains? (meta m) ::reader-schema) (get (meta m) ::reader-schema) + :else (cached-schema! inner-client (resolve-schema-id config topic m)))) + +(defn subject-name->id + [inner-client value] + (let [found (get value :subject-name->id)] + (if (= :default found) + (let [subject-name->id (atom {})] + {:ref subject-name->id + :through (fn stub-through [subject-name] + (get @subject-name->id subject-name + (let [retrieved-id (.getId (.getLatestSchemaMetadata inner-client subject-name))] + (swap! subject-name->id assoc subject-name retrieved-id) + retrieved-id)))}) + found))) -(defn subject-name-strategy +(defn ^SubjectNameStrategy subject-name-strategy "FIXME add cljdoc" [inner-config key?] (let [inner-config-obj (KafkaAvroSerializerConfig. inner-config)] @@ -55,11 +94,12 @@ (defn configure! "FIXME add cljdoc" - [{:keys [config inner]} value key?] + [{:keys [config inner inner-client]} value key?] (let [inner-config (->> value (remove (comp config/slava-key? key)) (into {}))] (reset! config (assoc value + :subject-name->id (subject-name->id inner-client value) :key? key? :subject-name-strategy (subject-name-strategy inner-config key?))) ;; Reflection warning: either a KafkaAvroSerializer or a KafkaAvroDeserializer. @@ -71,13 +111,20 @@ Serializer (configure [this value key?] (configure! this value key?)) (serialize [_ topic m] - (->> (resolve-subject-name @config topic m) - (resolve-schema-id inner-client m) - (resolve-schema inner-client m) + (->> (resolve-schema config inner-client topic m) (encode @config m) (.serialize inner topic))) (close [_] (.close inner))) +(def int-size + "In the JVM, an int always uses 4 bytes." + 4) + +(defn schema-id + "Extract the schema id as known in the schema registry." + [data] + (.getInt (ByteBuffer/wrap data 0 int-size))) + (defrecord ClojureDeserializer [^Atom config ^KafkaAvroDeserializer inner ^SchemaRegistryClient inner-client] @@ -86,15 +133,15 @@ (deserialize [_ topic data] (let [^GenericContainer generic-container (.deserialize inner topic data) reader-schema (.getSchema generic-container) - m (decode @config generic-container reader-schema) - subject-name (resolve-subject-name @config topic m)] + m (decode @config generic-container reader-schema)] (vary-meta m assoc - :piotr-yuxuan.slava/reader-schema reader-schema - :piotr-yuxuan.slava/subject-name subject-name - :piotr-yuxuan.slava/schema-id (resolve-schema-id inner-client m subject-name)))) + ::reader-schema reader-schema + ::subject-name (subject-name @config topic) + ::schema-id (schema-id data)))) (close [_] (.close inner))) (defn ^ClojureSerializer serializer + "FIXME add cljdoc" ([inner-client] (ClojureSerializer. (atom nil) (KafkaAvroSerializer. inner-client) inner-client)) ([inner-client config key?] @@ -102,6 +149,7 @@ (.configure config key?)))) (defn ^ClojureDeserializer deserializer + "FIXME add cljdoc" ([inner-client] (ClojureDeserializer. (atom nil) (KafkaAvroDeserializer. inner-client) inner-client)) ([inner-client config key?] @@ -109,6 +157,7 @@ (.configure config key?)))) (defn ^Serde clojure-serde + "FIXME add cljdoc" ([inner-client] (Serdes/serdeFrom (serializer inner-client) (deserializer inner-client))) diff --git a/src/piotr_yuxuan/slava/config.clj b/src/piotr_yuxuan/slava/config.clj index 57ad566..14a4a0c 100644 --- a/src/piotr_yuxuan/slava/config.clj +++ b/src/piotr_yuxuan/slava/config.clj @@ -90,7 +90,8 @@ (merge {:record-key-fn (constantly nil) :clojure-types clojure-types - :generic-concrete-types generic-concrete-types} + :generic-concrete-types generic-concrete-types + :subject-name->id :default} avro-decoders avro-encoders)) diff --git a/test/piotr_yuxuan/slava_test.clj b/test/piotr_yuxuan/slava_test.clj index a1fe90a..343da28 100644 --- a/test/piotr_yuxuan/slava_test.clj +++ b/test/piotr_yuxuan/slava_test.clj @@ -62,24 +62,6 @@ (.name "field") .type .intType .noDefault ^GenericData$Record .endRecord)) -(deftest schema-id!-test - (let [previous-version-id 2 - previous-schema-id 2 - version-id 3 - schema-id 3 - inner-client (doto (MockSchemaRegistryClient.) - (.register "subject-name" - (AvroSchema. previous-schema) - previous-version-id - previous-schema-id) - (.register "subject-name" - (AvroSchema. schema) - version-id - schema-id))] - (testing "get latest schema version" - (is (= (slava/schema-id! inner-client "subject-name") - schema-id))))) - (def ^Integer version-id (rand-int 100)) @@ -131,10 +113,11 @@ (deftest resolve-schema-test (is (= schema (slava/resolve-schema + config/default (doto (MockSchemaRegistryClient.) (.register "subject-name" (AvroSchema. schema) version-id schema-id)) - {} - schema-id))) + topic + {}))) (is (= writer-schema (slava/resolve-schema (MockSchemaRegistryClient.) (with-meta {} {:piotr-yuxuan.slava/writer-schema writer-schema