Spark Streaming’s output partitioning is often the bottleneck for sink writes, but understanding how it works lets you unlock massive throughput.
Let’s see it in action. Imagine we have a streaming DataFrame streamDF and we want to write its output to a Kafka topic named output-topic.
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("id", StringType, true),
StructField("value", IntegerType, true)
))
val streamDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 10000)
.load()
.withColumn("id", ($"value" % 100).cast(StringType)) // Simulate a key for partitioning
.select("id", "value")
val query = streamDF.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "/tmp/spark-checkpoint")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
Here, rowsPerSecond dictates the input rate. The critical part for output partitioning is how Spark decides to group these incoming records before writing them to the sink. By default, Spark tries to write each micro-batch as a single output partition. If you have 10,000 records coming in every 5 seconds, and your sink can’t handle 10,000 records in one go, you’ll hit a bottleneck.
The problem Spark Streaming aims to solve with output partitioning is efficiently writing micro-batches to external systems. Each micro-batch is a DataFrame. When writing this DataFrame, Spark needs to decide how many "tasks" it will use to write that data. This is directly tied to the DataFrame’s partitioning. If the micro-batch DataFrame has, say, 10 partitions, Spark will launch 10 writing tasks. If it has only 1 partition, it launches 1 task. The challenge is that the number of partitions in the input micro-batch might not align with the optimal writing parallelism of the target sink.
The primary lever you control for output partitioning is the repartition() or coalesce() operation before the writeStream call. repartition(N) will shuffle the data to ensure exactly N partitions, while coalesce(N) will reduce partitions, potentially without a full shuffle if it’s reducing partitions.
For instance, if you know your Kafka cluster can handle writes from 16 parallel tasks efficiently, you’d do this:
val query = streamDF
.repartition(16) // Shuffle to 16 partitions
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "/tmp/spark-checkpoint")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
This ensures that each micro-batch DataFrame is shuffled into 16 partitions, and Spark will launch 16 write tasks to Kafka, achieving a higher write throughput if the Kafka sink can keep up. The repartition operation introduces a shuffle, which has a cost, but it allows you to control the number of output tasks precisely.
A common mistake is to think that setting spark.sql.shuffle.partitions affects the output partitioning of streaming writes. It does not. That setting controls the number of partitions for shuffles within a micro-batch (e.g., for aggregations), but not the number of output partitions for the sink. The output partitioning is determined by the partitioning of the DataFrame at the time writeStream is called, and operations like repartition directly influence this.
The actual number of output partitions that Spark attempts to write for a given micro-batch is determined by the number of partitions of the DataFrame just before the writeStream operation. If you don’t explicitly repartition or coalesce, Spark will attempt to use the number of partitions it inherited from the previous stage. For the rate source, this is typically 1 partition by default. For other sources, it depends on how the data was read. This is why explicitly calling repartition(N) or coalesce(N) is so crucial for optimizing sink writes.
When writing to a key-value store like Kafka, you often want to partition the output based on a key to ensure related records go to the same partition on the sink. If you don’t specify a key in the Kafka sink options, Spark will write records in a round-robin fashion across the output partitions. However, if you do specify a key, Spark will attempt to group records by that key within each micro-batch. The number of output partitions will then be influenced by how many distinct keys exist in the micro-batch and how Spark distributes them. If you’ve previously called repartition(N) before writing, Spark will try to distribute those N partitions across the Kafka partitions.
This mechanism means that if you have a highly skewed distribution of keys in your data, even with repartition(16), some of those 16 output tasks might process significantly more data than others, leading to uneven load on your sink. For such cases, you might need more advanced strategies like using groupByKey or mapGroupsWithState to manage state and re-distribute data more effectively before writing.
The next thing you’ll likely encounter is understanding how to handle exactly-once semantics with stateful operations before writing to your sink.