Spark Streaming’s exactly-once guarantees are surprisingly achieved not by preventing duplicates from arriving, but by reliably identifying and discarding them after the fact.

Let’s watch this in action. Imagine we have a Kafka topic user_events and we want to process these events, ensuring each user action is counted precisely once, even if Kafka delivers a message multiple times.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder()
  .appName("KafkaExactlyOnceDeduplication")
  .master("local[*]") // For local testing
  .getOrCreate()

// Configure Spark to use Kafka
spark.sparkContext.setLogLevel("WARN") // Reduce verbosity

val kafkaBootstrapServers = "localhost:9092" // Replace with your Kafka broker(s)
val inputTopic = "user_events"

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("subscribe", inputTopic)
  .option("startingOffsets", "earliest") // Start from the beginning of the topic
  .load()

// We need a unique identifier for each message to perform deduplication.
// Kafka's message key is a good candidate, but if it's not unique,
// we'd need to rely on a combination of fields or a generated ID.
// For this example, let's assume the Kafka key is sufficient or we'll extract it.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Assuming Kafka messages are JSON strings with a 'eventId' field
// If your Kafka message value is not JSON, adjust this parsing.
val parsedDf = df.selectExpr("CAST(key AS STRING) as kafka_key", "CAST(value AS STRING) as json_value")
  .withColumn("event_data", from_json(col("json_value"), eventSchema)) // Define eventSchema based on your JSON structure
  .select("kafka_key", "event_data.*") // Flatten the event_data struct

// Define a schema for your event data (example)
val eventSchema = StructType(Seq(
  StructField("eventId", StringType, true),
  StructField("userId", StringType, true),
  StructField("timestamp", TimestampType, true)
))


// The core of deduplication: using a stateful operation (like `dropDuplicates`)
// on a streaming DataFrame requires specifying a watermark and a grouping key.
// The watermark tells Spark how long it should wait for late data before discarding state.
// The grouping key defines what constitutes a duplicate.
val deduplicatedDf = parsedDf
  .withWatermark("timestamp", "10 seconds") // Allow late data up to 10 seconds
  .dropDuplicates("eventId") // Deduplicate based on the unique eventId

// Now, let's process the deduplicated stream (e.g., count events per user)
val resultDf = deduplicatedDf
  .groupBy("userId")
  .agg(count("*").as("event_count"))

// Write the results to the console for demonstration
val query = resultDf.writeStream
  .outputMode("update") // Use 'update' or 'complete' for aggregations
  .trigger(Trigger.ProcessingTime("5 seconds")) // Process micro-batches every 5 seconds
  .format("console")
  .option("truncate", "false")
  .start()

query.awaitTermination()

The problem Spark Streaming solves here is ensuring that if Kafka sends the same message twice (e.g., due to a producer retry or a broker issue), your downstream processing logic only sees it once. Spark achieves this by maintaining state across micro-batches.

The Mental Model: State and Watermarking

At its heart, Spark Streaming’s deduplication relies on state management. When you use dropDuplicates on a streaming DataFrame, Spark doesn’t just look at the current micro-batch. It maintains a state of all unique identifiers it has seen so far. For each new micro-batch, it checks the incoming unique identifiers against this maintained state. If an identifier has already been seen, the record is dropped.

The crucial companion to state is the watermark. Without a watermark, Spark would have to keep state forever, leading to unbounded memory usage. The watermark (.withWatermark("timestamp", "10 seconds")) tells Spark: "Any event with a timestamp older than 10 seconds from the current processing time can be considered 'late' and its state can be safely discarded." This is because, for most applications, if an event is more than a certain duration old, it’s unlikely to be relevant for a stateful operation that’s already moved past that time.

The Levers You Control

  1. Unique Identifier: The most critical lever is choosing the correct field(s) for deduplication. This must be a field that uniquely identifies an event from its source. If your Kafka key isn’t unique, you’ll need to extract or construct a unique ID from your message payload (like eventId in the example).
  2. Watermark Delay: The 10 seconds in .withWatermark("timestamp", "10 seconds") is your primary tuning knob for late data tolerance.
    • Too short: You might drop legitimate late-arriving messages if your data ingestion or processing has inherent delays.
    • Too long: You increase the memory footprint of Spark’s state store because it holds onto identifiers for longer.
    • The choice depends on your data’s characteristics and expected latencies.
  3. Trigger Interval: The .trigger(Trigger.ProcessingTime("5 seconds")) dictates how often Spark processes new data. A shorter interval means lower latency but potentially more overhead. A longer interval increases latency but can be more efficient for batching.
  4. Output Mode: For aggregations like groupBy().agg(), you’ll typically use update or complete. update only outputs rows where the aggregate value has changed, while complete outputs all rows every time. append is used for non-aggregating queries where new rows are always added.

The one thing most people don’t fully appreciate is how the watermark interacts with state expiration. When Spark processes a micro-batch, it first advances its internal time based on the event timestamps within that batch. Then, it checks the watermark. Any state associated with keys whose original event timestamps are older than current_processing_time - watermark_delay is eligible for garbage collection. This means that even if a duplicate message with a very old timestamp arrives, if its timestamp is older than the watermark threshold, Spark will drop its state and effectively allow a new "first occurrence" of that event identifier, which is usually not desired. The watermark must be set to accommodate the maximum expected arrival latency of your data, not just the processing latency.

The next concept you’ll likely encounter is handling failures during stateful operations, which leads into Spark’s checkpointing mechanism for fault tolerance.

Want structured learning?

Take the full Spark-streaming course →