Spark Streaming’s Kinesis source can be a bit of a black box, but understanding its configuration and checkpointing is key to building robust, stateful streaming applications.

Let’s see it in action. Imagine you’ve got a Kinesis stream named my-spark-stream and you want to read from it in Spark. Here’s how you’d set that up in Spark SQL or DataFrames:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder
  .appName("KinesisSourceExample")
  .master("local[*]") // For local testing, replace with your cluster manager
  .getOrCreate()

val kinesisDataFrame = spark.readStream
  .format("kinesis")
  .option("streamName", "my-spark-stream")
  .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") // Replace with your Kinesis endpoint
  .option("initialPosition", "LATEST") // Or "TRIM_HORIZON" or a specific sequence number
  .option("awsAccessKeyId", "YOUR_ACCESS_KEY") // Consider using IAM roles for production
  .option("awsSecretAccessKey", "YOUR_SECRET_KEY") // Consider using IAM roles for production
  .load()

val processedDataFrame = kinesisDataFrame
  .selectExpr("CAST(data AS STRING) as message") // Kinesis data is usually binary
  .filter(length(col("message")) > 0)

val query = processedDataFrame.writeStream
  .outputMode("append")
  .format("console") // Or your preferred sink
  .option("checkpointLocation", "/tmp/spark-checkpoints/kinesis-example") // Crucial for stateful operations
  .start()

query.awaitTermination()

This snippet demonstrates the core components: the kinesis format, essential options like streamName and endpointUrl, and how to specify where to start reading with initialPosition. The checkpointLocation is where Spark will store metadata about your stream’s progress.

The problem Spark Streaming with Kinesis solves is enabling real-time data processing from AWS Kinesis Data Streams within the Spark ecosystem. Kinesis is a managed streaming service, and Spark provides a powerful, distributed engine for transforming and analyzing that data as it arrives. By integrating them, you get scalable, fault-tolerant stream processing without managing the underlying infrastructure for both components.

Internally, the Kinesis source in Spark works by polling Kinesis shards for new records. It maintains a sequence number for each shard, indicating the last record it has successfully processed. When a Spark Streaming job restarts, it needs to know where to pick up. This is where checkpoints come in. The checkpointLocation directory on a distributed file system (like HDFS, S3, or GCS) stores metadata, including the sequence numbers for each shard. This allows Spark to resume processing from exactly where it left off, ensuring exactly-once or at-least-once processing semantics depending on the sink and the rest of your pipeline.

The initialPosition option is critical for controlling where Spark starts reading when it encounters an empty checkpoint or when you explicitly want to reset. LATEST means it starts from the most recent data available in the stream. TRIM_HORIZON means it starts from the oldest data. You can also provide a specific Kinesis sequence number to start from. If a checkpoint exists, initialPosition is ignored, and Spark resumes from the saved checkpoint.

One aspect that often trips people up is how Kinesis shard iterators and Spark’s checkpointing interact. When Spark reads from Kinesis, it uses shard iterators. These iterators are tied to a specific sequence number. Spark’s checkpointing mechanism saves the last successfully processed sequence number for each shard. Upon restart, Spark retrieves these saved sequence numbers and then requests new iterators from Kinesis starting after those sequence numbers. This ensures no data is reprocessed and no data is missed, provided your checkpoint storage is reliable. If your checkpoint location becomes unavailable or corrupted, Spark will fall back to initialPosition and potentially reprocess data or miss it.

The most surprising thing about checkpointLocation is that it’s not just for Kinesis; it’s fundamental to all stateful operations in Spark Structured Streaming. If you’re using .mapGroupsWithState(), .flatMapGroupsWithState(), or doing aggregations over time windows, Spark requires a checkpoint location. It uses this location not only to store Kinesis shard progress but also to persist the state of your streaming application itself. Without it, any stateful computation would be lost upon job restart.

The next hurdle you’ll likely face is dealing with data schema inference and evolution from Kinesis, especially if your data format isn’t strictly JSON or Avro.

Want structured learning?

Take the full Spark-streaming course →