Tansu is a drop-in replacement for Apache Kafka with PostgreSQL, S3 or memory storage engines. Without the cost of broker replicated storage for durability. Licensed under the GNU AGPL. Written in 100% safe 🦺 async 🚀 Rust 🦀
Features:
- Apache Kafka API compatible
- Available with PostgreSQL, S3 or memory storage engines
- JSON Schema, Apache Avro or Protocol Buffers broker validation of messages
- Topics validated by JSON Schema, Apache Avro or Protocol buffers are generated as Apache Parquet files for easy consumption into your data lake
For data durability:
- S3 is designed to exceed 99.999999999% (11 nines)
- PostgreSQL with continuous archiving streaming transaction logs files to an archive
- The memory storage engine is designed for ephemeral non-production environments
Tansu is a single statically linked binary containing the following:
- broker an Apache Kafka API compatible broker and schema registry
- topic a CLI to create/delete Topics
- cat a CLI to consume or produce Avro, JSON or Protobuf messages to a topic
- proxy an Apache Kafka compatible proxy
The broker subcommand is default if no other command is supplied.
Usage: tansu [OPTIONS]
tansu <COMMAND>
Commands:
broker Apache Kafka compatible broker with Avro, JSON, Protobuf schema validation [default if no command supplied]
topic Create or delete topics managed by the broker
cat Easily consume or produce Avro, JSON or Protobuf messages to a topic
proxy Apache Kafka compatible proxy
help Print this message or the help of the given subcommand(s)
Options:
--kafka-cluster-id <KAFKA_CLUSTER_ID>
All members of the same cluster should use the same id [env: CLUSTER_ID=] [default: tansu_cluster]
--kafka-listener-url <KAFKA_LISTENER_URL>
The broker will listen on this address [env: LISTENER_URL=] [default: tcp://[::]:9092]
--kafka-advertised-listener-url <KAFKA_ADVERTISED_LISTENER_URL>
This location is advertised to clients in metadata [env: ADVERTISED_LISTENER_URL=] [default: tcp://localhost:9092]
--storage-engine <STORAGE_ENGINE>
Storage engine examples are: postgres://postgres:postgres@localhost, memory://tansu/ or s3://tansu/ [env: STORAGE_ENGINE=] [default: memory://tansu/]
--schema-registry <SCHEMA_REGISTRY>
Schema registry examples are: file://./etc/schema or s3://tansu/, containing: topic.json, topic.proto or topic.avsc [env: SCHEMA_REGISTRY=]
--data-lake <DATA_LAKE>
Apache Parquet files are written to this location, examples are: file://./lake or s3://lake/ [env: DATA_LAKE=]
--prometheus-listener-url <PROMETHEUS_LISTENER_URL>
Broker metrics can be scraped by Prometheus from this URL [env: PROMETHEUS_LISTENER_URL=] [default: tcp://[::]:9100]
-h, --help
Print help
-V, --version
Print version
A broker can be started by simply running tansu
, all options have defaults. Tansu pickup any existing environment,
loading any found in .env
. An example.env is provided as part of the distribution
and can be copied into .env
for local modification. Sample schemas can be found in etc/schema, used in the examples.
If an Apache Avro, Protobuf or JSON schema has been assigned to a topic, the
broker will reject any messages that are invalid. Schema backed topics are written
as Apache Parquet when the -data-lake
option is provided.
The tansu topic
command has the following subcommands:
Create or delete topics managed by the broker
Usage: tansu topic <COMMAND>
Commands:
create Create a topic
delete Delete an existing topic
help Print this message or the help of the given subcommand(s)
Options:
-h, --help Print help
To create a topic use:
tansu topic create taxi
The tansu cat
command, has the following subcommands:
tansu cat --help
Easily consume or produce Avro, JSON or Protobuf messages to a topic
Usage: tansu cat <COMMAND>
Commands:
produce Produce Avro/JSON/Protobuf messages to a topic
consume Consume Avro/JSON/Protobuf messages from a topic
help Print this message or the help of the given subcommand(s)
Options:
-h, --help Print help
The produce
subcommand reads JSON formatted messages encoding them into
Apache Avro, Protobuf or JSON depending on the schema used by the topic.
For example, the taxi
topic is backed by taxi.proto.
Using trips.json containing a JSON array of objects,
tansu cat produce
encodes each message into protobuf into the broker:
tansu cat produce taxi etc/data/trips.json
Using duckdb we can read the Apache Parquet files created by the broker:
duckdb :memory: "SELECT * FROM 'data/taxi/*/*.parquet'"
Results in the following output:
|-----------+---------+---------------+-------------+---------------|
| vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd |
| int64 | int64 | float | double | int32 |
|-----------+---------+---------------+-------------+---------------|
| 1 | 1000371 | 1.8 | 15.32 | 0 |
| 2 | 1000372 | 2.5 | 22.15 | 0 |
| 2 | 1000373 | 0.9 | 9.01 | 0 |
| 1 | 1000374 | 8.4 | 42.13 | 1 |
|-----------+---------+---------------+-------------+---------------|
The following will configure a S3 storage engine using the "tansu" bucket (full context is in compose.yaml and example.env):
Copy example.env
into .env
so that you have a local working copy:
cp example.env .env
Edit .env
so that STORAGE_ENGINE
is defined as:
STORAGE_ENGINE="s3://tansu/"
First time startup, you'll need to create a bucket, an access key and a secret in minio.
Just bring minio up, without tansu:
docker compose up -d minio
Create a minio local
alias representing http://localhost:9000
with the default credentials of minioadmin
:
docker compose exec minio \
/usr/bin/mc \
alias \
set \
local \
http://localhost:9000 \
minioadmin \
minioadmin
Create a tansu
bucket in minio using the local
alias:
docker compose exec minio \
/usr/bin/mc mb local/tansu
Once this is done, you can start tansu with:
docker compose up -d tansu
Using the regular Apache Kafka CLI you can create topics, produce and consume messages with Tansu:
kafka-topics \
--bootstrap-server localhost:9092 \
--partitions=3 \
--replication-factor=1 \
--create --topic test
Describe the test
topic:
kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic test
Note that node 111 is the leader and ISR for each topic partition. This node represents the broker handling your request. All brokers are node 111.
Producer:
echo "hello world" | kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic test
Group consumer using test-consumer-group
:
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--group test-consumer-group \
--topic test \
--from-beginning \
--property print.timestamp=true \
--property print.key=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.value=true
Describe the consumer test-consumer-group
group:
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group test-consumer-group \
--describe
To switch between the minio and PostgreSQL examples, firstly shutdown Tansu:
docker compose down tansu
Switch to the PostgreSQL storage engine by updating .env:
# minio storage engine
# STORAGE_ENGINE="s3://tansu/"
# PostgreSQL storage engine -- NB: @db and NOT @localhost :)
STORAGE_ENGINE="postgres://postgres:postgres@db"
Start PostgreSQL:
docker compose up -d db
Bring Tansu back up:
docker compose up -d tansu
Using the regular Apache Kafka CLI you can create topics, produce and consume messages with Tansu:
kafka-topics \
--bootstrap-server localhost:9092 \
--partitions=3 \
--replication-factor=1 \
--create --topic test
Producer:
echo "hello world" | kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic test
Consumer:
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--group test-consumer-group \
--topic test \
--from-beginning \
--property print.timestamp=true \
--property print.key=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.value=true
Or using librdkafka to produce:
echo "Lorem ipsum dolor..." | \
./examples/rdkafka_example -P \
-t test -p 1 \
-b localhost:9092 \
-z gzip
Consumer:
./examples/rdkafka_example \
-C \
-t test -p 1 \
-b localhost:9092
Please raise an issue if you encounter a problem.
Tansu is licensed under the GNU AGPL.