The Trigger.AvailableNow() in Spark Structured Streaming is designed to process all available data once and then shut down, making it ideal for batch-like micro-batching scenarios.
Let’s see it in action. Imagine you have a Kafka topic my-topic and you want to process messages as they arrive, but only in bursts when you explicitly tell Spark to.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder()
.appName("AvailableNowExample")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val kafkaBrokers = "localhost:9092"
val inputTopic = "my-topic"
val rawStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", inputTopic)
.option("startingOffsets", "earliest") // Start from the beginning of the topic
.load()
val processedStream = rawStream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.map { case (key, value) => s"Processed: $key -> $value" }
// This is the key part: Trigger.availableNow()
val query = processedStream.writeStream
.trigger(Trigger.availableNow())
.format("console") // Output to console for demonstration
.start()
query.awaitTermination()
When you run this, Spark will:
- Connect to Kafka and check for data.
- If data is present, it will process all currently available data in one go. This might involve multiple micro-batches internally if the amount of data is large, but it’s all executed within a single
Trigger.availableNow()invocation. - Once all available data is processed, the streaming query will automatically terminate.
This differs fundamentally from Trigger.ProcessingTime(interval), which continuously runs and checks for new data at the specified interval. Trigger.availableNow() is a one-shot deal.
The core problem Trigger.availableNow() solves is enabling a streaming pipeline to act like a scheduled batch job. You can have a streaming source (like Kafka, Kinesis, or files in a directory), but you only want to trigger a processing run when you decide, perhaps on a cron schedule or manually. When triggered, it should process everything that has arrived since the last successful run and then stop, allowing the next scheduled trigger to pick up from where it left off.
Internally, Spark keeps track of the offsets (for Kafka) or file statuses (for file sources) that have been successfully processed. When Trigger.availableNow() is invoked, Spark reads the current offsets/statuses, processes all data up to those points, and then commits the new offsets/statuses. The "available now" part refers to the data that exists in the source at the moment the trigger is activated and Spark begins its processing cycle. It doesn’t continuously poll for new data within that trigger execution.
The startingOffsets and failOnDataLoss options become particularly important here. For Trigger.availableNow(), startingOffsets is usually set to earliest or latest for the initial run, and Spark’s checkpointing mechanism handles advancing offsets for subsequent runs. If a run fails, Spark will restart from the last committed offset upon the next trigger. failOnDataLoss should generally be false for Trigger.availableNow() if you can tolerate potential data loss (e.g., if a file is deleted before Spark can read it) as it prevents the job from failing on such occurrences.
The Trigger.availableNow() is specifically designed for sources that Spark can reliably track progress on, such as Kafka and file-based sources. It relies on Spark’s checkpointing mechanism to store the progress made so it knows where to resume from the next time availableNow() is triggered. This makes it a powerful tool for scenarios where you want the efficiency of streaming ingestion but the controlled execution of batch processing.
The next logical step after mastering Trigger.availableNow() is understanding how to manage checkpointing robustly, especially when dealing with distributed file systems and ensuring exactly-once processing guarantees in the face of failures.