Spark Structured Streaming’s stream-to-stream joins are a powerful way to combine events from two real-time data sources, but they introduce a subtle complexity that often trips people up: managing state across arbitrary time windows.
Imagine you have two streams: orders (order ID, customer ID, order timestamp) and customers (customer ID, customer name, update timestamp). You want to enrich the orders stream with customer names. A direct join seems straightforward, but how do you ensure you’re matching orders with the current customer information at the time the order was placed, especially if customer details change over time?
Here’s a quick demo:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder()
.appName("StreamStreamJoinDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Simulate orders stream
val ordersDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
.selectExpr("timestamp as order_ts", "value as order_id")
.withColumn("customer_id", (rand() * 10).cast("integer")) // Assign random customer ID
// Simulate customer updates stream
val customersDF = spark.readStream
.format("rate")
.option("rowsPerSecond", 0.5) // Slower updates
.load()
.selectExpr("timestamp as update_ts", "value as customer_id")
.withColumn("customer_name", concat(lit("Customer_"), $"customer_id"))
.select($"customer_id", $"customer_name", $"update_ts")
// Join the streams
val joinedDF = ordersDF.as("o")
.join(
customersDF.as("c"),
expr("o.customer_id = c.customer_id AND o.order_ts >= c.update_ts"), // Crucial: order event must be after customer update
"inner"
)
.select("o.order_id", "o.order_ts", "c.customer_id", "c.customer_name")
// Output to console for demonstration
val query = joinedDF.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("5 seconds")) // Process in 5-second micro-batches
.start()
query.awaitTermination()
In this example, the expr("o.order_ts >= c.update_ts") clause is critical. It tells Spark to only consider customer updates that happened before or at the same time as the order was placed. This is the core of stream-to-stream join logic: defining the temporal relationship between events from different streams.
The problem Spark Structured Streaming solves here is how to perform joins on data that is continuously arriving and potentially unbounded. Traditional batch joins operate on static datasets. For streaming, Spark needs to maintain state for both streams, track their progress (watermarks), and decide when to emit results based on the defined join conditions and watermarks.
Internally, when you perform a stream-to-stream join, Spark creates separate state stores for each input stream. For each stream, it maintains a record of the data it has processed. When a new micro-batch arrives for one stream, Spark looks at the state of the other stream to find matching records based on the join condition. The key is that the join condition must involve time attributes (like in o.order_ts >= c.update_ts).
Watermarking is essential for managing state and preventing unbounded memory growth. You define watermarks on each stream to indicate the maximum expected delay of late data. For example, ordersDF.withWatermark("order_ts", "10 seconds"). If an order arrives more than 10 seconds after its order_ts, it’s considered late and dropped. Similarly for the customer stream. The join operation will only consider matches between records whose timestamps are within the allowed watermark delay of each other. This ensures that Spark can eventually discard old state.
The most surprising thing about stream-to-stream joins is how Spark handles the "event time" of the joined output. By default, the event time of a joined record is the maximum of the event times of the two input records. This means if an order event at T1 joins with a customer update event at T2 (where T2 <= T1), the resulting joined record’s event time will be T1. This behavior is crucial for maintaining correct event time ordering in downstream operations and for enabling effective watermarking on the output stream.
If you don’t specify watermarks on your input streams, Spark will keep all processed data indefinitely, leading to OutOfMemoryError. The join condition, especially when involving temporal predicates like event_time >= update_time, dictates how records are matched. Without a proper temporal condition, you might end up with incorrect results or an inability to prune state. The join condition must be able to resolve to a point where Spark can determine that no further matches are possible for older data, allowing for state cleanup.