The most surprising thing about vector reduce transform is that it doesn’t actually "reduce" data in the way you might think; it groups and aggregates.

Imagine you’ve got a firehose of log events coming in, each with a timestamp, a source IP, and a message. You want to know, for every minute, how many errors came from each IP. That’s where reduce shines.

Here’s a simplified example:

[sources.my_logs]
type = "file"
include = ["/var/log/app.log"]

[transforms.group_by_ip_and_minute]
type = "reduce"
inputs = ["my_logs"]
# This is the key: what fields define a unique group.
group_by = ["source_ip", "timestamp.minute"]
# This is what we're doing *within* each group.
# We're counting events.
# The 'count' function is a common aggregator.
aggregate = [
  { key = "error_count", type = "count" }
]
# This is how we enrich the output event.
# We're keeping the original source_ip and the minute of the timestamp.
# 'timestamp.minute' is a special Vector field representing the minute of the event's timestamp.
# 'timestamp' will be the *first* timestamp encountered in that group, which is often useful.
fields = ["source_ip", "timestamp", "error_count"]
# Optional: if you want to filter *before* grouping.
# Here, we only care about events where the message contains "error".
# This is a common use case for reducing noise.
where = "message.contains('error')"

[sinks.my_output]
type = "console"
inputs = ["group_by_ip_and_minute"]
# You can see the aggregated events here.

When Vector processes this, it doesn’t just pass each log line through. It buffers them. For every incoming log event from my_logs, it checks if it matches the where clause. If it does, it extracts the source_ip and the minute from the timestamp. This combination (source_ip + minute) becomes the key for grouping.

It maintains an internal state for each unique key it encounters. For our example, if the first event has source_ip="192.168.1.10" and timestamp="2023-10-27T10:30:15Z", it creates a group for ("192.168.1.10", 30). The aggregate section tells it what to do with this event for that group. type = "count" simply increments a counter for that group. If another event arrives with the same source_ip and minute, the counter for that group goes up again.

The aggregate section can do more than just count. You can sum values, avg (average), min, max, first, last, collect (gather all values into an array), and set (gather unique values).

For instance, if you wanted to collect all the error messages for a given IP and minute:

aggregate = [
  { key = "error_messages", type = "collect" }
]
fields = ["source_ip", "timestamp", "error_messages"]

This would result in an event like: { source_ip: "192.168.1.10", timestamp: "2023-10-27T10:30:15Z", error_messages: ["Error: File not found", "Error: Connection refused"] }

The group_by fields are crucial. You can group by any field, including nested ones or even dynamically generated ones (like timestamp.minute or timestamp.hour). The fields section then dictates which fields from the original event (or derived from the grouping/aggregation) end up in the output event. timestamp in the fields list, when used with group_by, typically refers to the timestamp of the first event that fell into that group.

The reduce transform is stateful. It holds data in memory. If you have a massive number of unique group_by combinations, you can exhaust memory. Vector has mechanisms to manage this state, like max_groups to limit the number of active groups and timeout_secs to flush groups that haven’t received new events for a while. These are critical for preventing memory leaks in long-running systems.

When a group times out or max_groups is reached and a new group needs to be added, Vector emits the aggregated event for that group. This is why reduce is often used with time-based grouping (like timestamp.minute, timestamp.hour) in conjunction with a timeout_secs to create tumbling or sliding windows of aggregated data.

One subtle point is how timestamp fields are handled in aggregations. When you use timestamp.minute in group_by, Vector internally extracts that minute. However, if you also include timestamp in your fields list, it will typically take the timestamp of the earliest event that contributed to that aggregated group. This is often desirable for understanding when the aggregation period began.

The next logical step after aggregating is often to transform the aggregated data further, perhaps to calculate rates or to enrich it with external data based on the aggregated results.

Want structured learning?

Take the full Vector course →