Watermarks in Structured Streaming don’t just track late data; they actively prune old data from the system to prevent infinite state growth.

Imagine this: a streaming job processing website click events. We want to count how many clicks happen within a 5-minute window for each user.

val clicks = spark.readStream
  .schema(clickSchema)
  .option("maxFilesPerTrigger", 1)
  .as[ClickEvent]

val fiveMinuteWindows = clicks
  .withWatermark("timestamp", "10 minutes") // Watermark on the event's timestamp
  .groupBy(window($"timestamp", "5 minutes"), $"userId")
  .count()

val query = fiveMinuteWindows.writeStream
  .format("memory")
  .queryName("clickCounts")
  .outputMode("update")
  .start()

// Simulate some data arriving
// Assume ClickEvent has 'timestamp' (Timestamp) and 'userId' (String)
// ... (code to insert data into the streaming source) ...

// After some time, query the results
spark.sql("SELECT * FROM clickCounts").show(false)

This Spark SQL snippet shows a streaming query that counts clicks. The withWatermark("timestamp", "10 minutes") is the key. It tells Spark that any event with a timestamp older than 10 minutes from the latest event seen so far can be considered "late" for the purpose of windowing. The window($"timestamp", "5 minutes") defines the aggregation window.

The system works by maintaining state for each aggregation. For our click count example, the state would include the counts for each (window, userId) combination. Without watermarks, this state would grow indefinitely as new windows are constantly opened.

The watermark mechanism allows Spark to discard state for windows that are no longer relevant. When a new event arrives, Spark determines the "current time" based on that event’s timestamp. If the current event’s timestamp is more than 10 minutes behind the watermark (which is itself derived from the latest event timestamp seen), then any previously completed windows that are older than this watermark are eligible for pruning. This pruning is crucial for unbounded streaming data.

The 10 minutes in withWatermark("timestamp", "10 minutes") is the allowed lateness. It means that events arriving up to 10 minutes after their actual timestamp will still be considered for their intended window. If an event arrives more than 10 minutes late, it will be dropped. The window duration (e.g., 5 minutes in window($"timestamp", "5 minutes")) and the watermark delay (e.g., 10 minutes) are independent. The watermark delay must be at least as long as the window duration for a watermark to be effective in preventing data loss for late but still relevant data.

What most people don’t realize is that the watermark isn’t just about allowing late data; it’s the primary mechanism for limiting state size. Without a watermark, Spark would have to keep state for every single historical window, leading to memory exhaustion. The watermark defines the boundary of "how old is too old," enabling Spark to clean up.

The next concept you’ll grapple with is how to handle data that is too late – data that exceeds the watermark delay and gets dropped, and how to potentially recover or account for that lost data.

Want structured learning?

Take the full Spark-streaming course →