Spark Streaming’s schema evolution handling is more about detecting and adapting to changes than truly "handling" them magically.

Let’s see it in action. Imagine we have a Spark Streaming job reading JSON data from Kafka. Initially, our schema is simple:

val initialSchema = StructType(Seq(
  StructField("id", StringType, nullable = true),
  StructField("timestamp", LongType, nullable = true)
))

Our streaming job looks like this:

val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input_topic")
  .load()

val jsonStream = kafkaStream.selectExpr("CAST(value AS STRING)")

val structuredStream = spark.readStream
  .json(jsonStream.as[String], $.with and initialSchema) // Using a dummy source for demonstration
  .withColumn("processing_time", current_timestamp())

structuredStream.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

Now, suppose the producer starts sending records with a new field, "user_id":

{"id": "123", "timestamp": 1678886400, "user_id": "abc"}

By default, Spark’s json parser is strict. If it encounters a field not present in the provided initialSchema, it will throw an error, likely a AnalysisException during query planning or a runtime SchemaMismatchException if schema inference is involved.

The key to handling schema drift is to tell Spark how to react to these discrepancies. The StructType parameter in spark.read.json (or spark.readStream.json) is where you define your expected schema. When Spark encounters data that doesn’t conform, it’s your configuration that dictates the outcome.

Here’s the mental model: Spark doesn’t guess the new schema. It compares the incoming data’s schema against the schema you’ve declared. The core problem Spark Streaming solves here is preventing your job from crashing when the data format changes unexpectedly.

The primary mechanism is the StructType you pass to spark.read.json. When you provide a schema, Spark uses it to parse the incoming data. If a field is missing in the incoming data but present in your StructType, Spark will treat it as null (if nullable = true). If a field is present in the incoming data but not in your StructType, Spark will drop that field by default when using a provided schema.

To truly adapt, you need to re-evaluate your declared schema. This often involves a strategy:

  1. Manual Schema Update: You detect the failure, inspect the new data, update your initialSchema definition in your Spark application code, and redeploy. This is the most robust but least automated approach.

  2. Schema Inference (with caution): For simpler cases, you could remove the explicit StructType and let Spark infer the schema. However, this is generally not recommended for production streaming jobs. Schema inference can be slow, might make incorrect assumptions (e.g., inferring Long when it should be Double), and can lead to unexpected schema changes over time if the data source isn’t stable. If you must use inference, it’s best to have a mechanism to capture the inferred schema and then hardcode it for subsequent runs.

  3. External Schema Registry: For more sophisticated handling, you integrate with an external schema registry (like Confluent Schema Registry for Avro/Protobuf, or even a simple S3 bucket storing JSON schema files). Your Spark job would query this registry to fetch the latest schema before processing data. This is the most flexible and scalable approach.

Let’s illustrate with the external schema registry idea. Imagine you have a SchemaService that fetches the current schema.

// Assume this service fetches the latest schema from an external source
object SchemaService {
  def getCurrentSchema(): StructType = {
    // In a real scenario, this would query a registry, S3, etc.
    // For demo, let's say it now knows about "user_id"
    StructType(Seq(
      StructField("id", StringType, nullable = true),
      StructField("timestamp", LongType, nullable = true),
      StructField("user_id", StringType, nullable = true) // New field
    ))
  }
}

// ... later in your streaming job ...

val currentSchema = SchemaService.getCurrentSchema()

val jsonStream = kafkaStream.selectExpr("CAST(value AS STRING)")

val structuredStream = spark.readStream
  .json(jsonStream.as[String], $.with and currentSchema) // Use the fetched schema
  .withColumn("processing_time", current_timestamp())

// ... rest of your job ...

When a new field like user_id arrives, the SchemaService would return the updated schema. Spark, using this new schema, would now correctly parse and include the user_id field. If a field were removed from the producer, and your SchemaService returned a schema without that field, Spark would simply drop that column from the DataFrame.

The one thing most people don’t realize is that Spark’s json parser has a mode option, similar to batch spark.read.json. The default is PERMISSIVE, which puts malformed records into a _corrupt_record column. However, for schema drift (fields added/removed), this doesn’t help. The other modes are DROPMALFORMED (silently drops bad records) and FAILFAST (throws an exception). When you provide an explicit schema, the PERMISSIVE mode primarily applies to malformed JSON strings themselves, not necessarily schema mismatches in field types or presence. The mode parameter in spark.read.json is often confused with how Spark handles schema evolution versus schema corruption.

The next challenge you’ll likely face is how to handle schema type changes, like an int becoming a string.

Want structured learning?

Take the full Spark-streaming course →