The most surprising thing about Kafka’s vector source and sink is that they don’t actually do anything with the data themselves; they’re just thin wrappers around Kafka’s core producer and consumer APIs, designed to be pluggable components.
Let’s see them in action. Imagine you have a Kafka topic named raw_data and you want to process it with a simple script that just prints each message.
First, the source. This is how you’d configure a Vector agent to read from raw_data:
[sources.my_kafka_source]
type = "kafka"
brokers = ["kafka-broker-1:9092", "kafka-broker-2:9092"]
topics = ["raw_data"]
group_id = "vector_processor_group"
auto_offset_reset = "earliest"
When this source starts, Vector connects to the specified Kafka brokers, subscribes to the raw_data topic, and joins the consumer group vector_processor_group. auto_offset_reset = "earliest" means if no offset is found for this group, it will start reading from the very beginning of the topic. Each message received from Kafka will become a Vector event.
Now, let’s say you want to send processed data to another Kafka topic called processed_data. Here’s a sink configuration:
[sinks.my_kafka_sink]
type = "kafka"
brokers = ["kafka-broker-1:9092", "kafka-broker-2:9092"]
topic = "processed_data"
encoding.codec = "json"
This sink will take incoming Vector events, serialize them into JSON (because we specified encoding.codec = "json"), and produce them to the processed_data topic on the configured Kafka brokers.
The beauty here is how Vector wires these up. You’d typically have a processing component in between. For example, if you wanted to add a timestamp to each message:
[sources.my_kafka_source]
type = "kafka"
brokers = ["kafka-broker-1:9092", "kafka-broker-2:9092"]
topics = ["raw_data"]
group_id = "vector_processor_group"
auto_offset_reset = "earliest"
[transforms.add_timestamp]
type = "remap"
inputs = ["my_kafka_source"]
source = '''
.timestamp = now()
'''
[sinks.my_kafka_sink]
type = "kafka"
brokers = ["kafka-broker-1:9092", "kafka-broker-2:9092"]
topic = "processed_data"
encoding.codec = "json"
inputs = ["add_timestamp"]
In this setup, events flow from my_kafka_source to add_timestamp, where a new field .timestamp is added to each event. Then, these modified events are passed to my_kafka_sink for production. This forms a simple, robust data pipeline directly within Vector, leveraging Kafka for both ingestion and egress. The source and sink themselves are stateless concerning the data they produce/consume; their state is managed by Kafka’s consumer group offsets.
When producing, if encoding.codec is not specified, Vector defaults to text, which simply serializes the event’s message field as a plain string. For more complex event structures, json or protobuf are common choices, requiring corresponding schema definitions or conventions on the consumer side. The acks setting on the sink is crucial for durability; acks = "all" ensures that the leader broker waits for all in-sync replicas to acknowledge the write before confirming to the producer, guaranteeing no data loss even if the leader fails immediately after acknowledgment.
The Kafka source’s group_id is fundamental to distributed consumption. Multiple Vector agents running with the same group_id will collectively consume partitions of the subscribed topics. If one agent crashes, Kafka rebalances the partitions among the remaining active agents in that group, ensuring continuous processing.
The Kafka sink has a partition_strategy that defaults to random, meaning each record is sent to a random partition within the target topic. If you need to ensure messages with the same key always go to the same partition (e.g., for ordered processing of related events), you can set partition_strategy = "key" and ensure your Vector events have a kafka.key field populated, which the sink will then use for partitioning.
A subtle but powerful feature of the Kafka source is its support for partition_discovery. By setting partition_discovery.mode = "topic" and partition_discovery.interval = "30s", Vector will periodically poll Kafka for new partitions added to the subscribed topics and automatically start consuming them, making your pipeline dynamically scale with your Kafka topic.
The next concept you’ll likely encounter is managing schema evolution when producing or consuming data from Kafka topics that use formats like Avro or Protobuf.