The most surprising thing about Spark Streaming session windows is that they don’t actually use a fixed time interval to define a "session." Instead, sessions are defined by periods of activity separated by inactivity.

Let’s see this in action. Imagine we have a stream of user click events, each with a userId and a timestamp. We want to aggregate these events into sessions for each user, where a session is defined as a period of user activity followed by 30 minutes of inactivity.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder.appName("SessionWindowsExample").getOrCreate()

// Sample data: userId, eventTime
val data = Seq(
  ("user1", 1678886400000L), // March 15, 2023 12:00:00 PM UTC
  ("user1", 1678886460000L), // March 15, 2023 12:01:00 PM UTC
  ("user2", 1678886500000L), // March 15, 2023 12:01:40 PM UTC
  ("user1", 1678888200000L), // March 15, 2023 12:30:00 PM UTC (still within user1's first session)
  ("user1", 1678890000000L), // March 15, 2023 01:00:00 PM UTC (user1's first session ends, new one starts)
  ("user2", 1678891800000L), // March 15, 2023 01:30:00 PM UTC (user2's first session ends, new one starts)
  ("user1", 1678890100000L)  // March 15, 2023 01:01:40 PM UTC (within user1's second session)
)

// Create a DataFrame representing the streaming data
import spark.implicits._
val streamDF = spark.createDataset(data).toDF("userId", "eventTime")

// Convert eventTime to Timestamp type for windowing
val dfWithTimestamp = streamDF.withColumn("eventTimestamp", from_unixtime($"eventTime" / 1000))

// Define the session window: 30 minutes of inactivity
val sessionWindowDuration = "30 minutes"

// Group by userId and apply session window, then count events per session
val sessionAggregations = dfWithTimestamp
  .withWatermark("eventTimestamp", "1 hour") // Watermark to handle late data
  .groupBy(
    window($"eventTimestamp", sessionWindowDuration, "5 minutes"), // Sliding window for session detection
    $"userId"
  )
  .agg(count("*") as "eventCount", max("eventTimestamp") as "sessionEndTime")
  .select(
    $"userId",
    $"eventCount",
    $"sessionEndTime",
    $"window.start" as "sessionStartTime"
  )

// For demonstration, we'll use a small batch interval and a simple output sink.
// In a real streaming scenario, you'd use a Kafka, file, or other sink.
val query = sessionAggregations
  .writeStream
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .option("truncate", "false")
  .start()

query.awaitTermination(30000) // Run for 30 seconds for demo
spark.stop()

In this example, we’re using groupBy(window($"eventTimestamp", sessionWindowDuration, "5 minutes"), $"userId"). The window function with a single duration like "30 minutes" creates session windows. Spark internally tracks the state for each userId. When a new event arrives for a userId, Spark checks if it falls within an existing active session (i.e., within 30 minutes of the last event for that user). If it does, the event is added to that session. If it’s more than 30 minutes after the last event, a new session is started. The max("eventTimestamp") in the aggregation effectively captures the end time of the session. The window.start gives us the calculated start of that session.

The mental model here is stateful processing. Spark Streaming maintains the state of each user’s sessions across micro-batches. The groupBy key (userId) combined with the session window function tells Spark to maintain separate session state for each user. The watermark is crucial for managing state and handling late data. Without it, Spark might keep session data indefinitely, leading to unbounded state. The watermark, set here to "1 hour," tells Spark that any data older than an hour from the current processing time can be considered late and potentially dropped, allowing Spark to clean up old state related to sessions that are no longer active.

The "5 minutes" in window($"eventTimestamp", sessionWindowDuration, "5 minutes") is the slide interval. While it looks like a fixed window parameter, in the context of a single duration like "30 minutes", it influences how often Spark checks for new session starts. A smaller slide interval can lead to more frequent, but potentially overlapping, session window calculations, ensuring that sessions are detected promptly. For pure sessionization based on inactivity, the slide interval’s primary role is to ensure that Spark re-evaluates potential session boundaries at regular intervals. The actual session boundary is determined by the inactivity gap, not the slide interval itself.

What many users overlook is how the watermark interacts with session windows. A watermark is essential for Spark to expire old session states. If you have a session window of 30 minutes and a watermark of 1 hour, Spark will only retain state for sessions that could potentially still be active or have recently ended. If an event arrives for a user, and that event is more than 1 hour old relative to the current event time, the watermark will cause Spark to discard any state associated with that user’s past sessions. This prevents unbounded state growth.

The next concept you’ll likely encounter is handling multiple session windows for the same user when there are long gaps between activity.

Want structured learning?

Take the full Spark-streaming course →