A Spark Streaming custom gRPC source lets you ingest data into Spark by directly calling gRPC services, bypassing the usual network intermediaries.

Let’s see how this works with a simple example. Imagine we have a gRPC service running that simulates a stream of sensor readings.

syntax = "proto3";

package sensor;

message SensorReading {
  string sensor_id = 1;
  double value = 2;
  int64 timestamp = 3;
}

service SensorStreamer {
  rpc StreamReadings(Empty) returns (stream SensorReading);
}

message Empty {}

We’ll build a Spark Source that connects to this service and pulls data.

First, the gRPC service needs to be running. For this example, let’s assume it’s accessible at localhost:50051.

The core of our custom source will be a class extending org.apache.spark.sql.streaming.Source. This class needs to implement a few key methods: sourceSchema(), getOffset(), getBatch(), and stop().

The sourceSchema() method defines the schema of the data that our source will produce. This must match the schema of the messages received from the gRPC stream.

import org.apache.spark.sql.types._

def sourceSchema(): StructType = {
  StructType(Seq(
    StructField("sensor_id", StringType, false),
    StructField("value", DoubleType, false),
    StructField("timestamp", LongType, false)
  ))
}

The getOffset(params: Map[String, String]) method is crucial for exactly-once processing. It returns the "latest" offset available from the source. For a streaming gRPC source, this could be the timestamp of the last received message or a sequence number. If this is the first time the source is being run, it should return null to indicate the beginning of the stream.

import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.util.CaseInsensitiveMap

// In a real scenario, this would track the last processed offset.
// For simplicity, we'll just return a dummy offset representing the "current" state.
// A more robust implementation would involve storing and retrieving this offset.
private var currentOffset: Map[String, String] = Map.empty

def getOffset(params: CaseInsensitiveMap[String]): Offset = {
  if (currentOffset.isEmpty) {
    null // Start from the beginning
  } else {
    // In a real-time scenario, we'd fetch the actual latest offset from the gRPC stream's state.
    // For this example, we'll simulate an incrementing offset.
    val latestTimestamp = currentOffset.getOrElse("timestamp", "0").toLong + 1
    new MyStreamingOffset(Map("timestamp" -> latestTimestamp.toString))
  }
}

The getBatch(start: Offset, end: Offset) method is where the actual data fetching happens. Spark calls this method to retrieve a batch of data between the start and end offsets. Our implementation will connect to the gRPC service, consume messages until it reaches the end offset, and return them as a Spark DataFrame.

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.write.DataWriterFactory
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import io.grpc.ManagedChannelBuilder
import sensor._ // Assuming your generated gRPC classes are in this package

// Placeholder for your actual gRPC client logic
class GrpcClient(host: String, port: Int) {
  private val channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
  private val stub = SensorStreamerGrpc.newStub(channel)

  def streamReadings(empty: Empty): Iterator[SensorReading] = {
    stub.streamReadings(empty)
  }

  def shutdown(): Unit = {
    channel.shutdown()
  }
}

// Define a custom Offset class to hold our offset information
case class MyStreamingOffset(offsetValues: Map[String, String]) extends Offset {
  override def toString: String = offsetValues.mkString(",")
  override def json(): String = offsetValues.toString()
}

def getBatch(start: Offset, end: Offset): DataFrame = {
  val grpcHost = "localhost"
  val grpcPort = 50051
  val client = new GrpcClient(grpcHost, grpcPort)

  // Determine the range of data to fetch based on start and end offsets
  val startTimestamp = start.asInstanceOf[MyStreamingOffset].offsetValues.getOrElse("timestamp", "0").toLong
  val endTimestamp = end.asInstanceOf[MyStreamingOffset].offsetValues.getOrElse("timestamp", Long.MaxValue.toString).toLong

  val data = scala.collection.mutable.ArrayBuffer[InternalRow]()
  val stream = client.streamReadings(Empty.newBuilder().build())

  while (stream.hasNext) {
    val reading = stream.next()
    val currentTimestamp = reading.getTimestamp
    if (currentTimestamp >= startTimestamp && currentTimestamp < endTimestamp) {
      val row = new GenericInternalRow(Array[Any](
        reading.getSensorId,
        reading.getValue,
        reading.getTimestamp
      ))
      data += row
    } else if (currentTimestamp >= endTimestamp) {
      // We've gone past the end offset for this batch, stop consuming
      // In a real scenario, you might need to buffer this message for the next batch.
      // For simplicity, we'll discard it here.
      client.shutdown() // Ensure channel is closed if we exit early
      return spark.createDataFrame(data.asJava, sourceSchema())
    }
  }

  client.shutdown() // Ensure channel is closed when stream ends
  spark.createDataFrame(data.asJava, sourceSchema())
}

The stop() method is called when the streaming query is stopped to clean up resources, like closing the gRPC channel.

def stop(): Unit = {
  // Clean up any resources, e.g., close gRPC channels
}

To use this custom source, you’d configure it in your Spark application.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder()
  .appName("gRPCSourceExample")
  .master("local[*]") // Or your cluster manager
  .getOrCreate()

import spark.implicits._

// Assuming your custom source class is named GrpcSource
val customSource = new GrpcSource(...) // Pass any necessary configuration

val streamingQuery = spark.readStream
  .format("my.custom.grpc.source.fully.qualified.ClassName") // Or use a string alias
  .option("grpc.host", "localhost")
  .option("grpc.port", "50051")
  // Add any other options your source needs
  .load()
  .writeStream
  .format("console") // Or your desired sink
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()

streamingQuery.awaitTermination()

The format() method can take the fully qualified class name of your Source implementation or a registered alias. Options like grpc.host and grpc.port would be passed to your Source’s constructor or a dedicated initialize method if you were implementing StreamSourceFactory.

The most surprising true thing about this setup is that Spark Streaming’s Source interface is designed to be pull-based, even when dealing with inherently push-based data sources like gRPC streams. Your getBatch method is asked by Spark for data when Spark is ready for it, rather than your gRPC client actively pushing data into Spark.

The mental model here is that Spark is the conductor, and your custom source is an orchestra player. The conductor (Spark) decides when to play (ask for data) and how much to play (which batch). Your player (gRPC source) just waits for the conductor’s cue and then plays its part (fetches and returns a batch of data). The offsets are like sheet music markers, telling the player where to start and stop for each cue.

A critical detail often missed is how to handle messages that arrive after the end offset for a batch but before the next getBatch call. A naive implementation might drop these, leading to data loss. A robust solution would involve buffering these "late" messages within the source itself, so they are available for the subsequent batch request.

The next concept you’ll likely encounter is implementing a custom Sink to write the processed data to another system, completing the end-to-end streaming pipeline.

Want structured learning?

Take the full Spark-streaming course →