Spark Streaming jobs crash unpredictably on Spot Instances.

Spot Instance Interruptions

The core issue is that Spot Instances can be reclaimed by the cloud provider with very little notice (typically 2 minutes). Spark Streaming, designed for continuous processing, doesn’t gracefully handle these sudden, unannounced terminations of worker nodes, leading to job failures.

Common Causes and Fixes

  1. Instance Termination Without Checkpointing:

    • Diagnosis: Check your Spark Streaming application logs for TaskKilled or ExecutorLost exceptions, especially those referencing spot instance interruptions. Look for a lack of successful checkpointing events in the logs.
    • Fix: Enable checkpointing for your Spark Streaming application. This saves the state of your DStream (or DataFrame/Dataset) periodically to a fault-tolerant storage system (like HDFS, S3, or Azure Blob Storage).
      streamingContext.checkpoint("/path/to/checkpoint/dir")
      
      Or for Structured Streaming:
      df.writeStream.format("parquet").option("checkpointLocation", "/path/to/checkpoint/dir").start()
      
    • Why it works: Checkpointing allows Spark to resume from the last saved state upon restart, rather than reprocessing everything from scratch or losing all progress when a worker is lost.
  2. Insufficient Graceful Shutdown Time:

    • Diagnosis: Observe the time between a spot interruption notice and the actual instance termination. If your Spark executors don’t have enough time to finish their current micro-batch or write their final state, you’ll see data loss or incomplete processing.
    • Fix: Configure Spark to respect the instance interruption notice. For AWS, this involves setting spark.cleaner.ttl and potentially spark.worker.cleanup.enabled. For other providers, consult their specific instance interruption handling mechanisms.
      // Example for AWS EC2, though often handled by the instance metadata service
      // and Spark's internal logic reacting to it. Explicitly setting TTL can help.
      spark.sparkContext.setLocalProperty("spark.cleaner.ttl", "3600000") // 1 hour in milliseconds
      
      More critically, ensure your application logic can handle partial micro-batches or implement idempotent writes.
    • Why it works: By allowing Spark executors a bit more time (or ensuring they can signal completion) before termination, you increase the chances of completing a micro-batch or safely shutting down.
  3. Lack of Driver Resilience:

    • Diagnosis: If the driver node itself is a Spot Instance and gets terminated, the entire application fails. Check logs for driver-level exceptions or the application disappearing entirely from the Spark UI.
    • Fix: Do NOT run your Spark Streaming driver on a Spot Instance. Use On-Demand instances for the driver, or better yet, run the driver on a service like EMR, Databricks, or Kubernetes where driver management is handled.
    • Why it works: The driver is the brain of the operation. Losing it means the entire job is lost. Keeping it on a stable instance type ensures continuity.
  4. Executor Memory Over-allocation on Smaller Instances:

    • Diagnosis: Spot instances are often smaller and have less RAM. If your executors are configured to use too much memory (spark.executor.memory and spark.executor.memoryOverhead), they might be killed by the OS for exceeding available memory, which can be mistaken for a spot interruption. Check dmesg on the terminated instance for OOM killer messages.
    • Fix: Reduce spark.executor.memory and spark.executor.memoryOverhead to fit within the limits of your chosen Spot Instance type. Monitor actual memory usage.
      spark.conf.set("spark.executor.memory", "4g")
      spark.conf.set("spark.executor.memoryOverhead", "1g") // Adjust based on instance type
      
    • Why it works: Prevents the operating system from killing Spark processes due to out-of-memory conditions, ensuring the executor remains alive until a genuine spot interruption occurs.
  5. Stateful Operations Without Proper State Management:

    • Diagnosis: If your streaming job performs stateful operations (e.g., updateStateByKey, mapWithState, or aggregations over time windows) and doesn’t checkpoint its state, losing an executor holding part of that state will lead to incorrect results or crashes.
    • Fix: Ensure state is checkpointed. For Structured Streaming, the checkpoint directory handles state. For DStreams, you must explicitly configure checkpoint() and potentially use StateSpec for mapWithState/updateStateByKey to manage state TTL.
      // For DStreams with state
      val stateSpec = StateSpec.function(updateFunction _).initialStateTimeout(new Duration(3600000)) // 1 hour
      val mappedStateDStream = originalDStream.mapWithState(stateSpec)
      mappedStateDStream.checkpoint(new Duration(60000)) // Checkpoint state every minute
      
    • Why it works: State is critical for many streaming operations. Checkpointing ensures that even if an executor holding a piece of state is lost, that state can be reconstructed or is available from a previous checkpoint.
  6. Underestimating the Interruption Rate:

    • Diagnosis: You might be using Spot Instances for too long a duration without sufficient capacity rebalancing, or your interruption rate is simply too high for your workload’s tolerance. Monitor the interruption rate of your Spot Instance fleet.
    • Fix: Implement a strategy to dynamically scale your Spark cluster. Use tools like Spot Fleet, EC2 Auto Scaling Groups, or Kubernetes Cluster Autoscaler configured with Spot Instances, and set up a mechanism to gracefully shut down Spark executors when an interruption is imminent. Consider using a mix of On-Demand and Spot instances.
      # Example: Using AWS CLI to manage a Spot Fleet request
      aws ec2 request-spot-instances --instance-count 5 --type "one-time" --launch-specification file://launch-spec.json
      
      In launch-spec.json, you’d specify instance types, target capacities, and interruption behavior.
    • Why it works: By having a dynamic fleet that can be replenished, and by gracefully shutting down executors when an interruption is signaled, you minimize the impact of individual instance losses.

The next error you’ll likely encounter is OutOfMemoryError if your spark.executor.memoryOverhead is too low for the data being processed, or if you haven’t correctly configured checkpointing and encounter an executor loss that corrupts state.


Spark Streaming on Spot Instances: Cut Costs Safely

The most surprising truth about running Spark Streaming on Spot Instances is that it’s not just about cost savings; it fundamentally forces you to build more resilient data pipelines.

Let’s see it in action. Imagine a simple Spark Streaming job that reads from Kafka, performs a word count on each micro-batch, and writes the results to a Kinesis stream.

Scenario:

  • Source: Kafka topic my-input-topic
  • Transformation: Word count per micro-batch
  • Sink: Kinesis stream my-output-stream
  • Cluster: EMR cluster with Spot Instances for core nodes (executors). Driver is on an On-Demand instance.

Code Snippet (Scala - Structured Streaming):

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._

val spark = SparkSession.builder
  .appName("SpotStreamingWordCount")
  .getOrCreate()

// Configure Spark to use Kafka
val kafkaBrokers = "your_kafka_brokers:9092"
val inputTopic = "my-input-topic"
val outputStream = "my-output-stream"
val checkpointDir = "s3://your-bucket/spark-checkpoints/wordcount" // Crucial for resilience!

// Read from Kafka
val rawStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", inputTopic)
  .load()

// Process the stream: basic word count
val words = rawStream.selectExpr("CAST(value AS STRING)")
  .as[String]
  .flatMap(_.split(" "))
  .toDF("word")

val wordCounts = words.groupBy("word").count()

// Write to Kinesis (or another sink)
val query = wordCounts.writeStream
  .format("kinesis") // Assuming a Kinesis sink is available or custom
  .option("streamName", outputStream)
  .option("checkpointLocation", checkpointDir) // Essential for fault tolerance
  .trigger(Trigger.ProcessingTime("10 seconds")) // Process micro-batches every 10 seconds
  .start()

query.awaitTermination()

When this job runs, Spark reads data in 10-second intervals. If a Spot Instance running an executor gets a termination notice, here’s what happens internally:

  1. Interruption Signal: The cloud provider sends a signal (e.g., EC2 instance-metadataservice) to the instance.
  2. Spark’s Reaction: Spark’s internal listeners detect this signal.
  3. Graceful Shutdown Attempt: Executors try to finish their current task or micro-batch. They might flush buffers, write partial results, or signal completion.
  4. Checkpointing: If checkpointing is enabled, Spark attempts to save the current state of the streaming query (offsets, stateful aggregations) to s3://your-bucket/spark-checkpoints/wordcount.
  5. Instance Termination: The instance is terminated.
  6. Driver Re-scheduling: The Spark driver, running on a stable instance, detects the lost executor.
  7. Resumption: If checkpointing was successful, the driver can restart the affected tasks or even the entire query from the last saved state. It will then request new executors (which will be new Spot Instances).

The Mental Model:

  • Spot Instances are Ephemeral Workers: Think of them as highly available but temporary laborers. They’re cheap, but they can leave without warning.
  • Checkpointing is the Safety Net: This is your insurance policy. It records the progress and state of your job so that if a worker disappears, you can pick up where you left off without losing data or corrupting results.
  • Driver is the Manager: The driver orchestrates everything. It needs to be stable. It tracks which tasks are running, which have failed, and where to resume from.
  • Micro-batches are Units of Work: Spark Streaming breaks continuous data into small, manageable chunks. The goal is for each micro-batch to be processed and its state saved before an instance is reclaimed.
  • Idempotency is Key: For sinks, ensure that writing the same data multiple times has no adverse effect. This is crucial if an executor fails mid-write and the driver has to retry.

The critical configuration levers are:

  • checkpointLocation: Absolutely non-negotiable. This must point to a reliable, distributed file system (S3, HDFS, ADLS).
  • Trigger.ProcessingTime / Trigger.Once / Trigger.Continuous: Controls how often Spark processes data. Shorter intervals mean less data lost per interruption but more overhead.
  • spark.executor.instances / spark.dynamicAllocation.enabled: How your cluster scales. Dynamic allocation is often better with Spot Instances as it can scale down quickly when needed.
  • spark.streaming.stopGracefullyOnShutdown: Ensures Spark attempts a graceful shutdown when the application is signaled to stop.

The one thing most people don’t realize is that the checkpointLocation for Structured Streaming is not just for recovering from failures; it’s also how Spark tracks the offsets it has successfully processed from your source (like Kafka). Without it, Spark would re-read data, leading to duplicate processing, or it would simply fail when an executor holding offset information disappears. The checkpoint directory contains not only state but also the metadata about the query’s progress.

If you get your checkpointing and driver stability right, you can leverage Spot Instances effectively, turning unpredictable failures into manageable restarts and drastically cutting your operational costs.

The next hurdle you’ll face is optimizing the balance between micro-batch latency and the risk of interruption, potentially exploring continuous processing modes or more advanced fault-tolerance patterns.

Want structured learning?

Take the full Spark-streaming course →