Skip to content
/ rdkafka Public

R wrapper for librdkafka

License

Unknown, GPL-3.0 licenses found

Licenses found

Unknown
LICENSE
GPL-3.0
LICENSE.md
Notifications You must be signed in to change notification settings

AbrJA/rdkafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

b5d006b · Jul 12, 2023

History

20 Commits
Jul 12, 2023
Jul 12, 2023
Jul 12, 2023
Jul 10, 2023
Jul 12, 2023
Jun 13, 2023
Jul 12, 2023
Jun 13, 2023
Jul 11, 2023
Jul 11, 2023
Jul 12, 2023
Jul 12, 2023
Jul 10, 2023

Repository files navigation

rdkafka

The goal of rdkafka is to work as a R wrapper for librdkafka

Installation

You can install the development version of rdkafka from GitHub like so:

install.packages("devtools")
devtools::install_github("AbrJA/rdkafka")

Note: Only tested on linux at the moment.

Example

Previously

Start the Kafka broker with the docker compose command:

(sudo) docker compose up -d

Note: Make sure you are in the directory containing the docker-compose.yml file.

Create the example topics Topic1 and Topic2 with the following command:

(sudo) docker compose exec broker \
  kafka-topics --create \
    --topic Topic1 \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1

Use a KafkaProducer object to send messages and a Kafka Consumer to receive them:

library(rdkafka)

producer <- KafkaProducer$new(brokers = "localhost:9092")
consumer <- KafkaConsumer$new(brokers = "localhost:9092", group_id = "readme", extra_options = list("auto.offset.reset" = "earliest"))
counter <- seq_len(5L)
producer$produce(topic = "Topic1", keys = sprintf("Key %s", counter), payloads = sprintf("Message %s", counter)) |> print()
#> [1] 5
producer$produce(topic = "Topic2", keys = sprintf("Id %s", counter), payloads = sprintf("Body %s", counter)) |> print()
#> [1] 5
consumer$subscribe(topics = c("Topic1", "Topic2"))
#> [1] 0
consumer$get_topics()
#> [1] "Topic1" "Topic2"
results <- list()
while (identical(results, list())) {
  results <- consumer$consume(num_results = 10, timeout_ms = 1000)
}
#> Timeout was reached with no new messages
#> Timeout was reached with no new messages
data.table::rbindlist(results)
#>      topic   key   payload
#>  1: Topic1 Key 1 Message 1
#>  2: Topic1 Key 2 Message 2
#>  3: Topic1 Key 3 Message 3
#>  4: Topic1 Key 4 Message 4
#>  5: Topic1 Key 5 Message 5
#>  6: Topic2  Id 1    Body 1
#>  7: Topic2  Id 2    Body 2
#>  8: Topic2  Id 3    Body 3
#>  9: Topic2  Id 4    Body 4
#> 10: Topic2  Id 5    Body 5

Configuration

librdkafka offers extensive customization options. For a comprehensive list of supported properties, please refer to the CONFIGURATION document. To configure a specific property, simply provide a conf object to either KafkaProducer or KafkaConsumer.

About

R wrapper for librdkafka

Resources

License

Unknown, GPL-3.0 licenses found

Licenses found

Unknown
LICENSE
GPL-3.0
LICENSE.md

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published