Skip to content

Latest commit

 

History

History
176 lines (118 loc) · 5.81 KB

README.md

File metadata and controls

176 lines (118 loc) · 5.81 KB

#Purpose

A minimal approach (either one function or the combination of two functions) to support streaming SSE (Server-Sent-Events) on any Kafka topic.

The defaults can be tweaked in code and / or configuration.

#Dependency

[com.opengrail/kafka-sse-clj "0.1.0-SNAPSHOT"]

#Usage

(:require [com.opengrail/kafka-sse-clj :as ks])

#Default-based approach (single function)

ks/kafka->sse-ch [topic-name]

The function returns a core.async channel that maps from the data on a Kafka channel to the HTML5 EventSource format

By default the function will consume the events

  • from the point of connection
  • unfiltered
  • and emit SSE comments every 5 seconds to maintain the connection

You can use that channel to stream data out with any web server that supports core.async channels.

I have provided an example using Aleph and Compojure at the end of the README and in the repo.

#Non-defaults

You can provide additional arguments to alter the default behaviour.

ks/kafka->sse-ch [topic-name offset event-filter-regex keep-alive?]
ks/kafka->sse-ch [topic-name offset event-filter-regex]  ; default keep-alive? = true
ks/kafka->sse-ch [topic-name offset]                     ; default regex = .*
ks/kafka->sse-ch [topic-name]                            ; default offset = LATEST

#Message format

The default output of the channel complies to the HTML5 EventSource specification and has these semantics:

  • id (item offset in kafka topic)
  • event (the message key as a string)
  • data (the message value as a string)
id: 556
event: customer
data: {"id" 745 "message" "Hello SSE"}

id: 557
event: product-campaign
data: {"id" 964 "message" "Hello SSE"}

The data in the examples is JSON but it could be any string.

Of course, to use these defaults, messages placed on the Kafka topic must conform to these semantics.

#Filtering

Example (in hands of dev)

The channel is unfiltered by default but supports a request parameter filter[event] to filter responses by matching on event name:

http://server-name/sse?filter[event]=customer

Regular expressions are also supported in a comma separated list:

http://server-name/sse?filter[event]=customer,product.*

#Composition approach (two functions)

##Hand assembly

kafka-consumer->sse-ch [consumer transducer]
kafka-consumer->sse-ch [consumer transducer keep-alive?]

This function takes a Kafka consumer and transducer to assemble how the data placed on the channel by the consumer is processed.

In the second form you can pass a boolean to indicate whether a keep-alive channel is created. If true, the output of the transducer channel and the keep-alive channel are merged into a single channel. The merged channel is returned. This form is used for the default approach.

###Kafka Consumer

sse-consumer [topic offset]
sse-consumer [topic offset brokers]
sse-consumer [topic offset brokers options]
sse-consumer [topic offset brokers options marshallers]

The second function is to enable the construction of a Kafka consumer for SSE purposes.

I make SSE a clear distinction because Kafka has mature support for managing topic reading via a range of sophisticated options.

We do not need that level of sophistication for SSE - we assume clients want to read from one of two places on the Kafka topic

  • EITHER the latest position or the head of the stream (the default)
  • OR a specific offset to enable consumption of any missing records during a network outage

In the latter case, EventSource clients send the Last-Event-Id header.

Providing the Kafka offset as the event id in the output record keeps it simple.

This consumer enables you to provide or as much or as little config as you need.

###core.async transducer

(kafka-consumer->sse-ch consumer (comp (filter #(name-matches? event-filter (.key %)))
                                           (map consumer-record->sse)))

This is the code used to create the default transducer.

Anything goes with your transducer: you can use one of the existing helper functions (name-matches? and consumer-record->sse) or provide your own.

#Operations

The table shows the supported environment variables and defaults.

Environment Variable Meaning Default
SSE_PROXY_KAFKA_BROKER_URL List of brokers localhost:9092
SSE_PROXY_POLL_TIMEOUT_MILLIS Max time in ms for each poll on Kafka 100
SSE_PROXY_BUFFER_SIZE Size the async.core channel buffer 512
SSE_PROXY_KEEP_ALIVE_MILLIS Interval in ms between emitting SSE comments 5000

#Testing

Working on docker images...

#Example (using Aleph and Compojure)

(def ^:private TOPIC (config/env-or-default :sse-proxy-topic "simple-proxy-topic"))

(defn sse-handler-using-defaults
  "Stream SSE data from the Kafka topic"
  [request]
  (let [topic-name (get (:params request) "topic" TOPIC)
        offset (get (:headers request) "last-event-id" config/CONSUME_LATEST)
        event-filter-regex (get (:params request) "filter[event]" ".*")
        ch (ks/kafka->sse-ch topic-name offset event-filter-regex)]
    {:status  200
     :headers {"Content-Type"  "text/event-stream;charset=UTF-8"
               "Cache-Control" "no-cache"}
     :body    (s/->source ch)}))

(def handler
  (params/wrap-params
    (compojure/routes
      (GET "/kafka-sse" [] sse-handler-using-defaults)
      (route/not-found "No such page."))))

#Other Kafka libraries for Clojure

Kafka consumers are generally complex and there are several Clojure libraries to provide access to all of those functions in idiomatic Clojure and from which I have stolen ideas and code.

Take a look at Franzy