Spark Streaming Accumulators and Broadcast Variables are two fundamental mechanisms for efficiently sharing state between the Spark driver and its executors.

Let’s see them in action. Imagine we have a Spark Streaming job that needs to count how many times specific "bad" IP addresses appear in an incoming stream of web server logs. We also want to enrich each log entry with a lookup table of geographical data for each IP.

First, the broadcast variable. We have a large CSV file mapping IP addresses to country names. Loading this into memory on every executor would be wasteful. Instead, we broadcast it.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
spark.sparkContext.setLogLevel("WARN") // Reduce verbosity

// Load the IP to country mapping from a file
val ipToCountryData = spark.read.option("header", "true").csv("path/to/ip_country.csv")

// Broadcast the DataFrame
val ipToCountryBroadcast = spark.sparkContext.broadcast(ipToCountryData.collect()) // collect() to bring data to driver first

// Now, in our streaming job, each executor will have a read-only copy of this data

Next, the accumulator. We want to count how many times we encounter IP addresses from a predefined "blacklist."

import org.apache.spark.util.LongAccumulator

// Create a long accumulator, initialized to 0
val badIpCount = spark.sparkContext.longAccumulator("BadIpCounter")

// Define our list of bad IPs
val badIpList = Set("192.168.1.100", "10.0.0.5", "172.16.0.1")

// Assume we have a DStream of log lines (e.g., from Kafka)
// val lines = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

// For demonstration, let's use a simple RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext

val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
val rddQueue = scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[String]]()
val stream = ssc.queueStream(rddQueue)

stream.foreachRDD { rdd =>
  rdd.foreach { line =>
    val parts = line.split(" ")
    if (parts.length > 0) {
      val ipAddress = parts(0)

      // Check if the IP is in our bad list
      if (badIpList.contains(ipAddress)) {
        badIpCount.add(1) // Increment the accumulator
      }

      // Use the broadcast variable for enrichment
      val country = ipToCountryBroadcast.value.find(_(0) == ipAddress).map(_(1)).getOrElse("Unknown")
      println(s"IP: $ipAddress, Country: $country")
    }
  }
  println(s"Current bad IP count: ${badIpCount.value}") // Access accumulator value from driver
}

// Add some dummy data to the queue for testing
rddQueue.enqueue(spark.sparkContext.parallelize(Seq(
  "192.168.1.100 user1 GET /page1",
  "10.0.0.1 user2 GET /page2",
  "192.168.1.100 user3 POST /submit",
  "172.16.0.1 user4 GET /image.jpg"
)))

ssc.start()
ssc.awaitTermination()

The problem these solve is efficient data distribution. Without broadcasting, the IP-to-country mapping would be serialized and sent to each executor for each task that needs it. With broadcasting, Spark sends a single copy of the data to each executor, which then caches it locally. This dramatically reduces network traffic and improves performance, especially for large lookup tables. Accumulators are the flip side: they allow executors to send aggregated updates back to the driver. If each executor tried to maintain its own count and then send it back, you’d have massive communication overhead. Accumulators provide a single, efficient channel for these aggregates.

The broadcast() method takes a value and returns a Broadcast object. You access the actual data using .value. On the executor side, Spark ensures that this data is deserialized and cached efficiently. For accumulators, sparkContext.longAccumulator("name") creates a named accumulator of type Long. You increment it using .add(value) on the executors. The driver can then read the final aggregated value using .value at any time, though it’s typically accessed after a batch has been processed.

The trick with broadcast variables is that collect() brings the data to the driver before broadcasting. If your dataset is too large to fit in the driver’s memory, you’ll get an OutOfMemoryError there. In such cases, you’d typically broadcast a DataFrame that is already distributed, or use toLocalIterator carefully if you absolutely must bring it to the driver. For accumulators, the .add() operation is an asynchronous update. The value you see on the driver using .value is the result of all completed task updates up to that point. If a task fails before reporting its accumulator update, that update is lost. Spark handles the distributed aggregation of these values transparently.

The most surprising thing is how Spark manages the lifecycle of broadcast variables. When a task runs on an executor, it doesn’t just receive the broadcast data once. Spark’s internal block manager handles fetching the data for the executor. If an executor is restarted, it will re-fetch the broadcast variable from the driver or another peer executor that already has it. This makes them resilient to executor failures.

The next thing you’ll run into is how to handle mutable state within broadcast variables when you need to update them, which isn’t directly supported and requires more advanced patterns.

Want structured learning?

Take the full Spark-streaming course →