Structured Streaming is the modern, preferred API for stream processing in Spark, replacing the older, lower-level DStream API.

Let’s see Structured Streaming in action. Imagine you have a stream of incoming JSON data on a Kafka topic, and you want to count the occurrences of each word.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()
import spark.implicits._

// Read from Kafka
val kafkaStreamDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input_topic")
  .load()

// Parse JSON and extract words
val wordsDF = kafkaStreamDF.selectExpr("CAST(value AS STRING)")
  .as[String]
  .flatMap(line => line.split(" "))
  .toDF("word")

// Count words
val wordCountsDF = wordsDF.groupBy("word").count()

// Write to console (for demonstration)
val query = wordCountsDF.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

This code snippet does a few key things:

  1. Reads from Kafka: spark.readStream.format("kafka")... establishes a connection to your Kafka cluster and subscribes to the input_topic. This is a streaming DataFrame.
  2. Parses and Transforms: selectExpr("CAST(value AS STRING)").flatMap(...).toDF("word") takes the raw byte value from Kafka, casts it to a string, splits each line into words, and creates a new DataFrame with a single column named "word".
  3. Aggregates: groupBy("word").count() performs a streaming aggregation, maintaining a running count of each unique word encountered so far.
  4. Outputs: writeStream.outputMode("complete").format("console").start() defines how the results are presented. outputMode("complete") means the entire aggregated table will be written out at each trigger interval. format("console") simply prints the results to your terminal.

The core problem Structured Streaming solves is treating a stream of data as an unbounded, continuously appending table. Unlike DStreams, which were a collection of RDDs, Structured Streaming operates on DataFrames and Datasets. This unified API allows you to use the same high-level operations (like select, filter, groupBy, agg) for both batch and streaming data.

Internally, Spark Structured Streaming uses a continuous processing model where it continuously queries the source and processes new data as it arrives. It manages state for aggregations and joins automatically. The trigger interval (which isn’t explicitly set in the example above, defaulting to micro-batching every 0ms) determines how often Spark checks for new data and updates the results.

The exact levers you control are primarily in the writeStream configuration. outputMode is crucial:

  • append: Only new rows added to the result table since the last trigger are written out. This is suitable for queries where you only care about new results, like a streaming select or filter.
  • complete: The entire updated result table is written out at each trigger. This is necessary for aggregations like groupBy().count().
  • update: Only rows that were updated in the result table since the last trigger are written out. This is a more efficient alternative to complete for aggregations, as it only writes out changed rows.

The trigger configuration controls the timing. Trigger.ProcessingTime("5 seconds") would tell Spark to process new data every 5 seconds. Trigger.Once() processes all available data and then stops. Trigger.Continuous("1 second") attempts to process data with a 1-second end-to-end latency, which is the lowest latency possible but may not be suitable for all sources or sinks.

What many people don’t realize is that Structured Streaming’s fault tolerance and state management are built around the concept of a write-ahead log (WAL). When you enable checkpointing (which is implicitly done when you specify a checkpointLocation), Spark writes the state of your streaming query and the offsets read from the source to a durable location. If the Spark application fails, it can restart from the last committed offset and restore its state from the checkpoint, ensuring exactly-once or at-least-once processing guarantees.

The next concept you’ll likely encounter is managing more complex stateful operations, such as arbitrary stateful transformations with mapGroupsWithState or flatMapGroupsWithState.

Want structured learning?

Take the full Spark-streaming course →