Spark Streaming jobs can hang or fail when data skew causes some tasks to process vastly more data than others.

Common Causes and Fixes

  1. Uneven Distribution of Keys: The most frequent culprit is an uneven distribution of keys in your groupByKey, reduceByKey, or join operations. If one key has millions of records while others have few, the task processing that key becomes a bottleneck.

    • Diagnosis: Use the Spark UI. Navigate to the "Stages" tab, find the stage with the longest-running tasks, and examine the "Tasks" list. Look for tasks with significantly higher "Shuffle Read Size" or "Duration."
    • Fix: Repartition your RDD/DataFrame before the skewed operation. A common strategy is to repartition to a number of partitions that is a multiple of your cluster’s core count (e.g., rdd.repartition(spark.sparkContext.defaultParallelism * 3) or df.repartition(spark.sparkContext.defaultParallelism * 3)). This redistributes the data more evenly across the available executors, preventing a single task from being overwhelmed.
    • Why it works: Increasing the number of partitions means more tasks are created. Even if some keys are still more frequent, the workload is spread across more workers, reducing the chance of any single task failing or taking too long.
  2. Inadequate Initial Partitioning: If your input data source already has skewed partitions (e.g., a poorly sharded Kafka topic), Spark might inherit this skew.

    • Diagnosis: Check the "Input" tab in the Spark UI for the first stage. Look for uneven "Input Size" across partitions.
    • Fix: Repartition immediately after reading the data. df.repartition(spark.sparkContext.defaultParallelism * 3).createOrReplaceTempView("data_view") or rdd.repartition(spark.sparkContext.defaultParallelism * 3).
    • Why it works: This forces Spark to re-read and re-shuffle the data into a more balanced set of partitions from the very beginning of your streaming job.
  3. Skewed Join Keys: When joining two DataFrames, if the join key has a highly uneven distribution in either DataFrame, it will cause skew.

    • Diagnosis: Examine the stages related to the join operation in the Spark UI. If one side of the join is significantly larger or has much higher shuffle read/write, that’s your indicator.
    • Fix: Use "salting." This involves creating a new "salt" column in both DataFrames. For the skewed DataFrame, you’ll add a salt value that is randomly distributed across a range (e.g., df_skewed.withColumn("salt", (rand() * 10).cast("int"))). For the other DataFrame, you’ll duplicate rows for each possible salt value (df_other.withColumn("salt", explode(array([lit(i) for i in range(10)])))). Then, join on both the original key and the salt column: df_skewed.join(df_other, ["key", "salt"]).
    • Why it works: Salting breaks down the large, skewed key into many smaller, unique composite keys (key_salt). Each of these composite keys is likely to be much smaller and more evenly distributed, allowing Spark to distribute the join workload across more tasks.
  4. Incorrect Shuffle Partitions Configuration: The spark.sql.shuffle.partitions configuration might be too low, leading to large partitions after shuffles, which can exacerbate skew.

    • Diagnosis: Observe the number of partitions in stages after a shuffle operation in the Spark UI. If it’s consistently low and tasks are large, this is a likely cause.
    • Fix: Increase spark.sql.shuffle.partitions. A good starting point is spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism * 3). For streaming, this is often set when creating the SparkSession: SparkSession.builder.config("spark.sql.shuffle.partitions", 200).getOrCreate().
    • Why it works: This setting determines the number of partitions Spark uses for shuffle outputs (like the result of groupByKey or join). Increasing it ensures that the data produced by a shuffle operation is broken into more, smaller partitions, making subsequent operations more balanced.
  5. Data Type Mismatches or Implicit Conversions: While less common for pure skew, subtle data type issues can sometimes lead to unexpected grouping or joining behavior that appears as skew.

    • Diagnosis: Double-check the schema of your DataFrames, especially for columns used in joins or aggregations. Ensure consistent types (e.g., both LongType or both StringType).
    • Fix: Explicitly cast columns to the correct, consistent data type before performing operations: df.withColumn("id", df("id").cast("long")).
    • Why it works: Guarantees that Spark compares and groups identical data types, preventing subtle differences from causing data to be misclassified or unexpectedly aggregated.
  6. Corrupted or Malformed Records: A few bad records with unusual or extremely long string values in a key column can sometimes cause a single task to struggle.

    • Diagnosis: Look for tasks with abnormally high memory usage or garbage collection pauses in the Spark UI. If a specific task is consuming excessive CPU or memory for no clear reason related to data volume, bad records are a possibility.
    • Fix: Implement robust error handling during data ingestion. Filter out or quarantine malformed records. For example, when reading from Kafka, use failOnDataErrors=false and handle records where value is null or malformed. df.filter(col("key").isNotNull) or more sophisticated parsing.
    • Why it works: Removes the problematic data points that might be causing a single task to choke on parsing or processing.

After fixing data skew by repartitioning and salting, the next error you’ll likely encounter is OutOfMemoryError on executor nodes if your overall data volume is still too large for the allocated memory.

Want structured learning?

Take the full Spark-streaming course →