The most surprising thing about using Spark Streaming with Kafka and Avro is how much of the "streaming" part is actually just micro-batching, and how little Avro has to do with the actual streaming process itself.
Let’s see it in action. Imagine you have a Kafka topic, user_events, producing Avro records. Your Spark Streaming application needs to read these, deserialize them using a Schema Registry, and then do something with them – maybe write them to a data lake.
Here’s a simplified Spark Streaming job using Scala:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.avro.SchemaConverters // For potential schema manipulation
object KafkaAvroStream {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("KafkaAvroStream")
.master("local[*]") // For local testing, use your cluster manager in production
.getOrCreate()
spark.sparkContext.setLogLevel("WARN") // Reduce verbosity
val kafkaBrokers = "localhost:9092" // Replace with your Kafka broker list
val kafkaTopic = "user_events"
val schemaRegistryUrl = "http://localhost:8081" // Replace with your Schema Registry URL
// Read from Kafka, assuming Avro format and Schema Registry integration
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest") // Or "latest"
.option("failOnDataLoss", "false") // Important for production
.load()
// The 'value' column from Kafka is a binary blob.
// Spark's Kafka connector, when configured with Avro and Schema Registry,
// can automatically deserialize it if the Kafka topic is configured for Avro
// or if the connector is explicitly told how to find the schema.
// For explicit Avro deserialization with Schema Registry, you'd typically
// use a library like spark-avro and configure it.
// A common pattern is to read the raw binary 'value' and then use
// spark-avro to deserialize it based on schema ID embedded in the Avro.
// Let's assume the Kafka connector is set up to handle Avro and Schema Registry.
// If not, you'd often see a structure like:
// .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// .option("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
// .option("schema.registry.url", schemaRegistryUrl)
// However, Spark SQL's Kafka source often infers this or requires specific config.
// For direct Avro parsing within Spark SQL using spark-avro:
// The 'value' column is binary. We need to cast it to string and then parse.
// This assumes the Avro payload is directly in the Kafka 'value' and
// the Schema Registry is accessible.
// A more robust way is to use the spark-avro package and its schema inference.
// If you've added `spark-avro` to your dependencies, you can do:
val avroSchemaDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS BINARY) as avro_value")
.withColumn("parsed_avro", from_avro($"avro_value", schemaRegistryUrl, Map("mode" -> "FAIL_FAST", "schemaRegistryUrl" -> schemaRegistryUrl)))
.select("parsed_avro.*") // Explode the parsed Avro struct
// Now 'avroSchemaDF' contains your data with named fields from the Avro schema.
// Example: if Avro has fields 'userId', 'eventTime', 'eventType'
// You can then process this DataFrame.
val query = avroSchemaDF.writeStream
.outputMode("append")
.format("console") // For demonstration, write to console
.trigger(Trigger.ProcessingTime("5 seconds")) // Process micro-batches every 5 seconds
.start()
query.awaitTermination()
spark.stop()
}
}
The core idea is that Spark’s readStream from Kafka provides a DataFrame with key and value columns (among others). The value is raw binary. To make it useful, you need to deserialize it. This is where Avro and Schema Registry come in. The spark-avro package (or similar logic) knows how to take that binary value, find the schema ID (often embedded in the Avro itself or discoverable via the Schema Registry URL), fetch the schema from the Schema Registry, and then parse the binary data into a structured Spark DataFrame.
The "streaming" aspect is managed by Spark’s Structured Streaming engine. It continuously polls Kafka for new data, processes it in small batches (micro-batches), and outputs the results. The Trigger.ProcessingTime("5 seconds") dictates how often these micro-batches are formed and processed.
The system solves the problem of handling evolving data schemas without breaking your data pipelines. Avro provides a compact, efficient binary format, and the Schema Registry acts as a central, versioned repository for these schemas. When Spark reads an Avro record, it can use the embedded schema ID (or query the registry) to get the exact schema version used for writing, ensuring correct deserialization even if the schema has changed since the data was produced.
The levers you control are primarily in the Spark configuration for the Kafka source:
kafka.bootstrap.servers: Your Kafka cluster address.subscribe: The Kafka topic(s) to consume from.startingOffsetsandendingOffsets: How much historical data to process on startup.failOnDataLoss: Crucial. Set tofalseto prevent the job from stopping if Kafka detects data loss (e.g., due to retention policies).schema.registry.url: The address of your Schema Registry.- Deserializer settings (if not auto-detected or if using specific Kafka consumer properties).
And in the Structured Streaming part:
Trigger: Defines the processing frequency (e.g.,Trigger.ProcessingTime,Trigger.Once,Trigger.Continuous).OutputMode: How to write results (append,update,complete).
One thing most people don’t realize is that the from_avro function (or its equivalent logic) in Spark SQL is what bridges the binary Kafka value to a structured DataFrame. It’s not magic; it’s a library call that performs the Avro deserialization using the provided Schema Registry URL. The magic is that Spark makes this look like a simple DataFrame transformation.
The next concept you’ll run into is handling schema evolution gracefully, specifically how Spark’s from_avro function behaves with different schema compatibility settings (backward, forward, full, none) defined in your Schema Registry.