Spark Streaming’s batch interval and Kafka consumer’s poll interval are distinct but interconnected settings that dramatically impact your application’s latency and throughput.
Let’s see this in action. Imagine you’re processing a high-volume Kafka topic with Spark Streaming. Your goal is to get data into a data warehouse with minimal delay.
Here’s a simplified Spark Streaming job:
val kafkaStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
kafkaStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
// Process RDD: parse JSON, enrich, write to data warehouse
processData(rdd)
}
}
streamingContext.start()
streamingContext.awaitTermination()
In this setup, streamingContext.batchDuration defines the batch interval. This is how often Spark tries to create a new RDD from your Kafka data. For example, new StreamingContext(sparkConf, Seconds(5)) means Spark will look for new data every 5 seconds.
The kafkaParams map contains your Kafka consumer configuration. Crucially, this is where session.timeout.ms, heartbeat.interval.ms, and max.poll.interval.ms live. The max.poll.interval.ms setting is the Kafka consumer’s poll timeout. It’s the maximum time the Kafka consumer can be blocked in a poll() call before it’s considered dead by the broker.
Tuning these two values is a balancing act. A short batch interval means Spark is constantly trying to create small RDDs, which can lead to high overhead and potentially not enough data to make processing efficient. A long batch interval means higher latency, as data waits longer to be batched. Similarly, a short Kafka poll interval might cause the consumer to time out if Spark takes too long to process a batch, leading to rebalances. A long poll interval, however, might mean your Spark application is holding onto Kafka partitions for too long, impacting other consumers or failing to detect dead Spark executors.
The core problem Spark Streaming solves here is processing an unbounded stream of data as a series of discrete, bounded batches. Spark’s createDirectStream (or its Structured Streaming equivalent) is smart enough to figure out the offsets to fetch from Kafka for each batch. It queries Kafka for the latest offsets, determines the range of offsets for the current batch interval, and then fetches that data.
The batch interval directly dictates the maximum latency you’ll see. If your batch interval is 5 seconds, your data will be at most 5 seconds old when it starts processing. However, if processing a batch takes longer than 5 seconds, Spark will fall behind, and your latency will increase.
The Kafka poll interval, specifically max.poll.interval.ms, is the safety net for the Kafka consumer. When Spark calls createDirectStream (or processes an RDD), it’s internally calling consumer.poll(). If this poll() call takes longer than max.poll.interval.ms (default is 5 minutes, but often tuned much lower), the Kafka broker will assume the consumer has died and will trigger a rebalance. This is a critical point: if your Spark batch processing time consistently exceeds max.poll.interval.ms, you’ll get constant Kafka rebalances, which are very disruptive.
Here’s how they interact:
- Spark’s
StreamingContext(orSparkSessionfor Structured Streaming) requests a new batch based onbatchDuration. - The Kafka connector (e.g.,
kafka-clientslibrary) within the Spark executor makes apoll()call to Kafka to fetch records for the new batch. - If the
poll()call takes longer thanmax.poll.interval.ms, the Kafka broker initiates a rebalance. This can happen even if Spark is processing a batch, as long as thepoll()call is still active. - Spark then processes the RDD. If this processing takes longer than
batchDuration, the next batch will be delayed.
To tune them effectively:
- Estimate your batch processing time: Profile your RDD processing logic. Let’s say it takes 2 seconds on average.
- Set
batchDuration: To minimize latency, setbatchDurationslightly higher than your average processing time. If processing is 2s,batchDurationof5sis a good start. This gives Spark some buffer. - Set
max.poll.interval.ms: This should be significantly longer than your maximum expected batch processing time, but not excessively long. If your average batch processing is 2s, and you expect it to spike to 10s occasionally, settingmax.poll.interval.msto300000(5 minutes) might be too long and hide issues. A value like60000(1 minute) is often a good starting point. It allows for some processing delays without immediately triggering a rebalance. However, if your processing reliably takes 10s, and you setmax.poll.interval.msto30000, you’re safe. If you set it to5000, you’ll get rebalances.
Common Pitfalls & Tuning:
max.poll.interval.mstoo low: If your Spark job is busy doing other things (GC, other tasks on the executor) when the Kafka consumer’spoll()call is made, it might appear to hang. If this hang exceedsmax.poll.interval.ms, you get a rebalance. Fix: Increasemax.poll.interval.msinkafkaParamsto something like120000(2 minutes). This gives the consumer more breathing room.batchDurationtoo short relative to processing time: If Spark tries to create a batch every 2 seconds but processing takes 5 seconds, Spark will fall behind. Fix: IncreasebatchDurationinStreamingContextto10s.session.timeout.msandheartbeat.interval.ms: These are related to consumer group coordination. Ifheartbeat.interval.msis too high (e.g., 60s) andsession.timeout.msis also high (e.g., 30s), the broker might think a consumer is dead prematurely. Fix: Ensureheartbeat.interval.msis significantly less thansession.timeout.ms(e.g.,heartbeat.interval.ms=10000,session.timeout.ms=30000).- Executor Overload: If executors are overloaded with other tasks or struggling with garbage collection,
poll()calls will be delayed. Fix: Increase executor memory, reduce parallelism for other tasks, or tune JVM GC settings. - Network Latency/Throughput: Slow network between Spark executors and Kafka brokers can increase
poll()duration. Fix: Ensure network connectivity is robust and consider co-locating Spark and Kafka if possible. - Large Batches: If a single batch contains an enormous amount of data, processing it might exceed
max.poll.interval.ms. Fix: In Spark Streaming’screateDirectStream, you can limit the number of records per batch usingspark.streaming.kafka.maxRatePerPartition. This breaks down large logical batches into smaller physical batches, preventingpoll()from blocking too long. Setspark.streaming.kafka.maxRatePerPartitionto a reasonable value, e.g.,1000.
The next error you’ll hit after fixing these is likely related to Kafka partition assignment issues during rebalances, or perhaps downstream system bottlenecks if your Spark processing is now too fast.