This module provides an alternative Kafka integration for legacy Spark Streaming DStreams, with prefetching consumers, pooled reuse, and admin-based offset management. It is more extensible and configurable than the original integration.
Requires Spark 4 and Java 21+.
Compared to the official Spark Streaming Kafka integration, this rewrite adds:
- Prefetching consumers that poll on a dedicated virtual thread and feed a bounded queue, smoothing fetch/processing, enabling async I/O during batch processing, and applying natural backpressure.
- A cached consumer pool with size and TTL controls to avoid reconnects and preserve prefetch state for hot partitions.
- Driver-side offset discovery and commits via Kafka Admin APIs rather than the Consumer API.
- Optional offset range splitting to reduce skew across Spark tasks and automatically adjust the number of partitions per batch.
- A more resilient driver loop, with retry hooks planned to avoid crashes under transient failures.
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.consumer.pool.ConsumerPoolStrategies
import org.apache.spark.streaming.kafka.offset.OffsetStoreStrategies
val sparkConf = new SparkConf().setAppName("KafkaDStreamExample")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val adminClientParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> 5000
)
val executorKafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
)
val stream = KafkaDStream.builder[String, String](ssc)
.withAdminClientParams(adminClientParams)
.withSubscription(SubscriptionStrategies.Subscribe(Set("topic-a")))
.withOffsetStore(OffsetStoreStrategies.NativeKafka(groupId = "stream"))
.withConsumerPool(ConsumerPoolStrategies.CachedConsumer(executorKafkaParams))
.build()
stream.foreachRDD { (rdd, time: Time) =>
// Process records...
stream.commitAsync(time)
}Spark configs specific to this module:
Consumer:
-
spark.streaming.kafka.consumer.poll.ms(default:spark.network.timeout * 1000)Consumer poll timeout in milliseconds.
-
spark.streaming.kafka.consumer.prefetch.queueCapacity(default:3 * max.poll.records)Prefetch queue capacity (records); larger values smooth bursts but increase memory use per executor.
-
spark.streaming.kafka.consumer.cache.maxSize(default:32)Max cached consumers per executor.
-
spark.streaming.kafka.consumer.cache.expireAfterAccessSec(default:300)Cache TTL in seconds.
Rate limits:
-
spark.streaming.kafka.maxRatePerPartition(default:+inf)Upper bound per partition per batch; set to cap input during spikes.
-
spark.streaming.kafka.minRatePerPartition(default:0)Lower bound per partition per batch.
Offsets:
-
spark.streaming.kafka.offsetRange.maxSizeRatio(default:+inf)Split skewed offset ranges when a partition exceeds this ratio of the average range size (must be > 1).
-
spark.streaming.kafka.offsets.maxRetries(default:3)Retry count for offset discovery; increase if your cluster sees transient Admin API failures.
Standard Spark backpressure settings apply:
spark.streaming.backpressure.enabledspark.streaming.backpressure.initialRate
- Allow clients to read partitions from the same rack by leveraging Kafka rack awareness and the replica selection features for reading from followers.