Vitess’s rowlog is a surprisingly powerful mechanism for turning database table changes into a distributed message queue, without needing a separate message broker like Kafka or Pulsar.
Let’s see it in action. Imagine we have a users table and we want to trigger an action every time a new user is created.
CREATE TABLE users (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Now, we’ll enable rowlog for this table. This is done through Vitess’s vtctl client, targeting the specific keyspace and tablet.
vtctlclient ApplySchema -keyspace my_keyspace -sql 'ALTER TABLE users WITH rowlog'
This ALTER TABLE command doesn’t just modify the table schema; it instructs Vitess to set up internal mechanisms to capture row changes. Specifically, it creates a _vt_rowlog_users table behind the scenes. This table will store a record of every insert, update, and delete operation on the users table.
When a new user is inserted, Vitess doesn’t just write to the users table. It also writes a corresponding entry into the _vt_rowlog_users table. This entry contains metadata about the change: the type of operation (INSERT, UPDATE, DELETE), the primary key of the affected row, and a timestamp.
INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
After this insert, the _vt_rowlog_users table might look something like this:
| timestamp | operation | primary_key |
|---|---|---|
| 2023-10-27 10:00:00 | INSERT | 1 |
Now, how do we consume these changes? Vitess provides a RowLogReader API. Applications can connect to Vitess, specify the table they are interested in (users in this case), and a starting timestamp or a specific row ID.
Here’s a conceptual Go snippet using the Vitess client library:
package main
import (
"context"
"fmt"
"log"
"time"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)
func main() {
ctx := context.Background()
conn, err := vtgateconn.Dial(ctx, "localhost:15991") // Replace with your vtgate address
if err != nil {
log.Fatalf("Failed to connect to vtgate: %v", err)
}
defer conn.Close()
// Assuming 'my_keyspace' and 'users' table
keyspace := "my_keyspace"
tableName := "users"
// Start reading from now, or a specific timestamp/rowid
startRowID := int64(0) // Or a specific ID if you know it
startTime := time.Now().Add(-24 * time.Hour) // Or a specific time
// Create a RowLogReader
reader, err := conn.RowLogReader(ctx, keyspace, tableName, startRowID, startTime)
if err != nil {
log.Fatalf("Failed to create RowLogReader: %v", err)
}
fmt.Printf("Starting to read row logs for table '%s'...\n", tableName)
for {
row, err := reader.ReadRow(ctx)
if err != nil {
log.Printf("Error reading row: %v. Retrying...", err)
time.Sleep(5 * time.Second) // Simple backoff
continue
}
// Process the row change. The 'row' variable contains details like:
// row.Timestamp, row.Operation, row.PrimaryKey, row.Before, row.After
// For an INSERT, row.Before will be nil, row.After will have the new values.
// For an UPDATE, both row.Before and row.After will be populated.
// For a DELETE, row.After will be nil, row.Before will have the old values.
fmt.Printf("Received change: Timestamp=%v, Operation=%v, PrimaryKey=%v\n",
row.Timestamp, row.Operation, row.PrimaryKey)
// Example: If it's an INSERT, send a welcome email
if row.Operation == query.RowChange_INSERT {
// Extract data from row.After and send email
fmt.Printf("New user created: %v\n", row.After)
// sendWelcomeEmail(row.After["email"].ToString())
}
// Acknowledge the read row to advance the cursor
if err := reader.Commit(ctx, row.PrimaryKey); err != nil {
log.Printf("Failed to commit row %v: %v", row.PrimaryKey, err)
// Handle commit failure - this might mean re-reading the row later
}
}
}
The RowLogReader allows you to specify startRowID and startTime. If startRowID is 0 and startTime is in the past, it will start from the earliest available log entry. If you want to catch up from a specific point, you’d pass the PrimaryKey of the last processed row and its Timestamp. The Commit call is crucial; it tells Vitess that you’ve successfully processed a given row change, so it can advance the internal cursor for that consumer. Without commits, you’d re-read the same changes.
The row object returned by ReadRow is rich. It includes the Timestamp of the change, the Operation type (INSERT, UPDATE, DELETE), the PrimaryKey of the affected row, and importantly, Before and After maps. For an INSERT, Before is nil and After contains the new row’s values. For a DELETE, After is nil and Before contains the old row’s values. For an UPDATE, both Before and After are populated, allowing you to see exactly what changed. This makes rowlog incredibly versatile for implementing asynchronous workflows, auditing, or replication.
The most surprising thing about Vitess rowlog is that it doesn’t require a separate coordination service to manage consumer offsets. Vitess itself, through its internal metadata tables and the RowLogReader API, handles the state of each consumer, ensuring exactly-once processing semantics if the commits are handled correctly. The Commit operation in the RowLogReader is not merely an acknowledgment; it’s a transactional write that updates the consumer’s progress marker within Vitess, ensuring that subsequent reads will pick up where the consumer left off, even across application restarts.
The next logical step after consuming row changes is to implement robust error handling and dead-letter queues for changes that cannot be processed, or to explore how rowlog can be used for distributed transactions across multiple tables or shards.