Vitess’s stream aggregates can perform sorting and aggregation across shards without needing to pull all the data into a single VReplication worker.

Let’s watch a stream aggregate in action. Imagine we have a user_stats table sharded by user_id, and we want to find the average login_count for all users, grouped by their country.

-- Schema:
CREATE TABLE user_stats (
    user_id BIGINT NOT NULL,
    country VARCHAR(64) NOT NULL,
    login_count INT NOT NULL,
    PRIMARY KEY (user_id)
);

-- Sharding:
-- Shard 1: user_id BETWEEN 1 AND 1000000
-- Shard 2: user_id BETWEEN 1000001 AND 2000000

A traditional approach would involve:

  1. Reading all login_count and country from every shard.
  2. Sending this data to a central VReplication worker.
  3. Performing the GROUP BY country and AVG(login_count) on the worker.

This is inefficient for large datasets. Vitess’s stream aggregate optimizes this. When you execute a query like:

SELECT country, AVG(login_count) AS avg_logins
FROM user_stats
GROUP BY country
WITH ROLLUP;

Vitess doesn’t just dump data. It uses a two-phase approach:

Phase 1: Local Aggregation (on each shard)

Each VReplication worker (or the VGTID writer) on the source shards performs a preliminary aggregation. For our user_stats table, on Shard 1, it might compute:

  • ('USA', 150, 5000) – (country, count of logins, sum of logins)
  • ('CAN', 75, 2000)

And on Shard 2:

  • ('USA', 200, 7000)
  • ('MEX', 120, 3500)

This intermediate data is much smaller than the raw rows.

Phase 2: Global Aggregation (on the destination)

Vitess then streams these intermediate aggregated results from each shard to the destination. The destination VReplication worker receives these partial results and merges them.

For our example, the destination would receive:

  • From Shard 1: ('USA', 150, 5000), ('CAN', 75, 2000)
  • From Shard 2: ('USA', 200, 7000), ('MEX', 120, 3500)

The destination worker then performs a final aggregation:

  • For USA: It sees (150, 5000) and (200, 7000). It sums the counts (150 + 200 = 350) and the sums of logins (5000 + 7000 = 12000). The average is 12000 / 350 = 34.28.
  • For CAN: It only sees (75, 2000). Average is 2000 / 75 = 26.67.
  • For MEX: It only sees (120, 3500). Average is 3500 / 120 = 29.17.

The final result is: ('USA', 34.28), ('CAN', 26.67), ('MEX', 29.17). This is achieved with minimal data transfer and processing on the destination.

The WITH ROLLUP clause, if present, also gets handled during this final aggregation phase, producing aggregate rows for higher levels of the group-by hierarchy.

The core problem this solves is the network and memory overhead of cross-shard aggregation. Instead of shipping raw rows, Vitess ships pre-aggregated summaries. This is particularly effective for aggregate functions like COUNT, SUM, AVG, MIN, MAX where intermediate results can be combined without losing accuracy or completeness. The "sorting" aspect comes into play internally to efficiently merge these intermediate results on the destination, ensuring that rows belonging to the same group (e.g., all "USA" rows) are processed together.

What most people don’t realize is that Vitess doesn’t just rely on the destination to do all the merging. It can push down some aggregation logic to the source shards themselves. This is what happens when you specify GROUP BY on columns that are also part of the sharding key or are otherwise efficiently accessible on the source. The VReplication writer on the source shard will then perform a local aggregation before sending any data for that group to the destination. This is a crucial optimization that significantly reduces the amount of data that needs to traverse the network.

The next step in optimizing cross-shard operations is understanding how Vitess handles joins across shards, particularly when one side of the join is much larger than the other.

Want structured learning?

Take the full Vitess course →