Spark Streaming query progress metrics allow you to observe the health of your streaming queries and detect issues before they impact your users.
Let’s look at a streaming query processing JSON logs from Kafka and writing them to Delta Lake.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder()
.appName("StreamingMetricsDemo")
.master("local[*]")
.config("spark.sql.streaming.schemaInference.enabled", "true")
.getOrCreate()
import spark.implicits._
val kafkaStreamDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.load()
val jsonDF = kafkaStreamDF.selectExpr("CAST(value AS STRING)")
val query = jsonDF.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/checkpoints")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start("/tmp/delta/logs")
query.awaitTermination()
This code reads JSON strings from a Kafka topic named "logs", infers the schema, and writes the processed data to a Delta Lake table located at /tmp/delta/logs. The Trigger.ProcessingTime("10 seconds") means Spark will attempt to process new data in 10-second micro-batches. The checkpointLocation is crucial for fault tolerance, storing the state of the query.
The query object, returned by start(), is a StreamingQuery instance. This object is your primary interface for monitoring and controlling the streaming query. You can access its metrics through the lastProgress and recentProgress attributes.
lastProgress: This returns a StreamingQueryProgress object representing the most recently completed micro-batch. It contains detailed information about that batch, such as the batch’s start and end times, processing duration, and the number of records processed.
recentProgress: This returns a sequence of StreamingQueryProgress objects for the last few completed micro-batches. This is useful for observing trends and identifying if processing times are increasing over time.
Let’s inspect the lastProgress of our running query:
import scala.concurrent.duration._
while (true) {
Thread.sleep(5000) // Check every 5 seconds
val progress = query.lastProgress
if (progress != null) {
println(s"Batch ID: ${progress.batchId}")
println(s"Start Time: ${progress.startTime}")
println(s"End Time: ${progress.endTime}")
println(s"Processing Duration: ${progress.processingDurationMs} ms")
println(s"Input Rows Per Second: ${progress.inputRowsPerSecond}")
println(s"Output Rows Per Second: ${progress.outputRowsPerSecond}")
println(s"Number of Input Rows: ${progress.numInputRows}")
println(s"Number of Output Rows: ${progress.numOutputRows}")
println("---")
}
}
This loop continuously fetches and prints the lastProgress of the query. You’ll see metrics like processingDurationMs (how long it took to process the micro-batch) and inputRowsPerSecond (the rate at which data is being consumed).
These metrics are invaluable for understanding your streaming job’s performance. For example, a consistently increasing processingDurationMs can indicate that your job is falling behind. If inputRowsPerSecond starts dropping while processingDurationMs is high, it might mean your downstream sinks are becoming a bottleneck.
The StreamingQueryProgress object also contains sources and sinks arrays, each with their own set of metrics. For our Kafka source, you might see watermark information, indicating the progress of event-time processing. For the Delta Lake sink, you’ll see metrics related to file writes.
The SparkSession itself also exposes metrics that can be scraped by a metrics collection system like Prometheus. You can configure Spark to emit these metrics using reporters. For instance, to enable the Prometheus reporter:
// Add this to your SparkSession builder configuration
.config("spark.metrics.conf", "/path/to/metrics.properties")
And in your metrics.properties file:
*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusSink
*.sink.prometheus.host=localhost
*.sink.prometheus.port=9099
With this setup, Spark will expose metrics on port 9099, which Prometheus can then scrape. You can then build dashboards in Grafana or use alerting tools to monitor metrics like streaming.query.lastProgress.processingTimeMs or streaming.query.numInputRows.
The true power of these metrics lies in setting up alerts. For instance, you could alert if the processingDurationMs for any micro-batch exceeds a predefined threshold (e.g., 80% of your trigger interval) for a sustained period. This proactive alerting helps you identify and address potential issues before they lead to data loss or significant backlogs.
One aspect often overlooked is that the processingDurationMs metric reflects the time taken after data has been read and before it’s written. It doesn’t include the time spent waiting for new data to arrive if your trigger is time-based and no data is available. If you’re using event-time processing, monitoring the watermark values in the sources metrics is crucial to ensure your processing is keeping up with the actual event times. If the watermark falls significantly behind the current time, it indicates that your system is struggling to process events within their expected arrival window.
When you encounter a Query Terminated exception, examining the lastProgress of the failed query before it terminated can provide critical clues about the root cause, often pointing to bottlenecks or errors in specific stages of the processing pipeline.