Spark Streaming can write ACID transactions to Iceberg tables, but getting it right involves more than just pointing a DataFrame at a table.

Let’s see it in action. Imagine we have a Spark Streaming job that processes clickstream data and wants to append it to an Iceberg table named clicks_iceberg in the default database.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("SparkStreamingToIceberg")
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.iceberg_catalog.type", "hadoop")
  .config("spark.sql.catalog.iceberg_catalog.warehouse", "/path/to/your/iceberg/warehouse")
  .getOrCreate()

// Define schema for clickstream data
val clickSchema = StructType(Seq(
  StructField("timestamp", TimestampType, true),
  StructField("user_id", StringType, true),
  StructField("page", StringType, true)
))

// Create a streaming DataFrame from a source (e.g., Kafka)
// For demonstration, we'll use a rate source. Replace with your actual source.
val streamingDF = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()
  .withColumn("timestamp", (org.apache.spark.sql.functions.current_timestamp()).cast(TimestampType))
  .withColumn("user_id", (org.apache.spark.sql.functions.md5(org.apache.spark.sql.functions.rand().cast(StringType))).cast(StringType))
  .withColumn("page", org.apache.spark.sql.functions.element_at(org.apache.spark.sql.functions.array("index.html", "about.html", "contact.html"), (org.apache.spark.sql.functions.rand() * 3).cast(IntegerType) + 1))
  .select("timestamp", "user_id", "page")

// Define the target Iceberg table path
val icebergTablePath = "iceberg_catalog.default.clicks_iceberg"

// Ensure the Iceberg table exists. If not, create it.
// This is a one-time setup for the table schema.
spark.sql(s"CREATE TABLE IF NOT EXISTS $icebergTablePath (timestamp TIMESTAMP, user_id STRING, page STRING) USING iceberg")

// Write the streaming DataFrame to the Iceberg table
val query = streamingDF.writeStream
  .format("iceberg")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("10 seconds")) // Process data in 10-second micro-batches
  .option("flush-interval", "10 seconds") // Flush data to Iceberg every 10 seconds
  .option("checkpointLocation", "/path/to/your/streaming/checkpoint/dir") // Essential for fault tolerance
  .toTable(icebergTablePath)

query.awaitTermination()

This setup enables Spark Streaming to write to an Iceberg table using the iceberg format. The checkpointLocation is crucial for recovering from failures, ensuring that no data is lost or duplicated. The trigger and flush-interval control how frequently micro-batches are processed and committed to the Iceberg table.

The core problem Iceberg solves here is providing transactional guarantees for streaming writes. Traditional file-based sinks (like Parquet or ORC directly to HDFS/S3) struggle with concurrent writes and updates, leading to data corruption or inconsistencies. Iceberg’s metadata layer tracks all committed snapshots of the table, allowing for atomic commits of new data files. When Spark writes a micro-batch, it creates new data files and then atomically updates the table’s metadata pointer to include these new files in the latest snapshot. This ensures that either all the data from a micro-batch is successfully committed, or none of it is.

The spark.sql.catalog.iceberg_catalog configuration tells Spark how to find and interact with Iceberg tables. The hadoop type is common for warehouses stored on HDFS or cloud object storage. The warehouse path is the root directory where Iceberg will manage its tables and metadata.

The checkpointLocation is not just for Iceberg; it’s a fundamental requirement for Spark Structured Streaming. It stores the progress of your streaming query, including offsets from your source (like Kafka) and metadata about completed micro-batches. Without it, Spark wouldn’t know where to resume from after a failure.

The most surprising thing about setting up Iceberg for streaming is how many seemingly minor configurations actually have a profound impact on transactional integrity and performance. For instance, the flush-interval in the writeStream options doesn’t just control how often data is written to storage; it directly influences the latency of your data becoming queryable in the Iceberg table and the size of the individual commits. A shorter interval means lower latency but potentially smaller, less optimized files and more frequent metadata operations. A longer interval means higher latency but larger files and fewer, larger commits.

Once you have this basic streaming sink set up, you’ll likely want to explore how to manage table evolution, such as adding or dropping columns, without interrupting your streaming job.

Want structured learning?

Take the full Spark-streaming course →