Spark Structured Streaming can directly read from and write to Kafka topics without requiring any intermediate storage or complex connectors.

Here’s a Kafka Structured Streaming job in action:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder
  .appName("KafkaStructuredStreamingExample")
  .master("local[*]") // Use local mode for demonstration
  .getOrCreate()

import spark.implicits._

// Define Kafka broker addresses and topics
val kafkaBrokers = "localhost:9092"
val inputTopic = "input_topic"
val outputTopic = "output_topic"

// Read from Kafka
val kafkaStreamDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", inputTopic)
  .load()

// Process the stream (example: parse JSON and select a field)
val processedStreamDF = kafkaStreamDF
  .selectExpr("CAST(value AS STRING)") // Kafka value is binary by default
  .as[String]
  .map(parseJsonValue) // Assuming a function parseJsonValue: String => String
  .toDF("processed_value")

// Write to Kafka
val query = processedStreamDF.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("topic", outputTopic)
  .trigger(Trigger.ProcessingTime("10 seconds")) // Process data in micro-batches
  .start()

query.awaitTermination()

// Dummy function for JSON parsing (replace with your actual parsing logic)
def parseJsonValue(jsonString: String): String = {
  // In a real scenario, you'd use a JSON library like Jackson or Circe
  // For this example, we'll just return a modified string
  s"Processed: ${jsonString.toUpperCase()}"
}

This code demonstrates a basic end-to-end pipeline. It reads messages from input_topic, applies a simple transformation (converting the message value to uppercase), and writes the transformed messages to output_topic. The Trigger.ProcessingTime("10 seconds") ensures that Spark processes data in 10-second intervals, creating micro-batches.

The core problem Structured Streaming with Kafka solves is enabling near real-time data processing and reaction. Traditional batch processing often introduces significant latency, making it unsuitable for use cases like fraud detection, real-time analytics dashboards, or immediate alerting. By directly integrating with Kafka, Spark leverages Kafka’s distributed commit log to provide fault-tolerant, high-throughput streaming capabilities. Spark manages the offsets in Kafka, ensuring that data is processed exactly once or at least once, depending on the configuration and downstream sinks.

Internally, Spark treats the Kafka stream as an unbounded table. When you readStream from Kafka, Spark doesn’t load all historical data. Instead, it continuously polls Kafka for new messages based on the committed offsets. Each new batch of messages from Kafka is appended to this "unbounded table," and your defined transformations are applied. The writeStream operation then takes the results of these transformations and pushes them to the output sink (another Kafka topic in this case), committing the offsets of the processed data back to Kafka’s consumer group management.

The key levers you control are the Kafka connection parameters (kafka.bootstrap.servers, subscribe, topic), the processing logic within your DataFrame operations, and the Trigger mechanism. The Trigger is crucial for defining how often your streaming query should run and process new data. Options include ProcessingTime, Once, and Continuous (though continuous processing has specific requirements and limitations).

What most people don’t realize is that Structured Streaming’s Kafka integration doesn’t just read the raw message value. It also provides metadata about each Kafka record, accessible via the selectExpr or select methods. You can access fields like key, topic, partition, offset, and timestamp directly from the DataFrame. This allows for much richer stream processing logic, such as routing messages based on their topic or key, or filtering based on Kafka timestamps. For instance, selectExpr("CAST(key AS STRING)", "value") would give you both the key and value of each Kafka message.

The next concept you’ll likely explore is handling schema evolution in Kafka messages and managing stateful operations within your Spark Structured Streaming jobs.

Want structured learning?

Take the full Spark-streaming course →