The most surprising thing about ZeroMQ’s Ventilator-Sink pattern is that it’s not fundamentally about "fan-out" or "collecting results" at all; it’s about managing asynchronous task distribution and acknowledgment across an unpredictable network of workers.

Let’s see this in action. Imagine a simple task: squaring a number. On the client side, we’ll have a "ventilator" that sends tasks out.

import zmq
import random
import time

context = zmq.Context()
# Frontend socket: Pushes tasks to workers
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5557")

# Backend socket: Receives results from workers
backend = context.socket(zmq.ROUTER)
backend.bind("tcp://*:5558")

# Worker pool: Keep track of available workers
workers = set()

def recv_multipart(socket):
    """Receive a multipart message."""
    message = []
    while True:
        try:
            message.append(socket.recv(zmq.NOBLOCK))
        except zmq.Again:
            break
    return message

print("Starting Up...")

while True:
    # Poll sockets for activity
    # Poll frontend for tasks to send, backend for results
    socks = dict(zmq.select([(frontend, zmq.POLLIN), (backend, zmq.POLLIN)], [], [], 1000) # 1 second timeout
    
    if frontend in socks and socks[frontend] == zmq.POLLIN:
        # Receive message from client (task request)
        client_id, empty_frame, task = recv_multipart(frontend)
        
        # If there are available workers, send the task to one.
        # Otherwise, queue the task implicitly by not processing it yet.
        if workers:
            worker_id = workers.pop()
            # Send task to worker: [worker_id, client_id, empty_frame, task]
            # client_id is sent back so the worker knows which client to reply to
            backend.send_multipart([worker_id, client_id, b'', task])
        else:
            # No workers available, put the task back on the frontend queue
            # This is a simplification; a real system might buffer tasks
            frontend.send_multipart([client_id, b'', task])
            print("No workers available, task requeued.")
            time.sleep(0.1) # Prevent busy-waiting

    if backend in socks and socks[backend] == zmq.POLLIN:
        # Receive message from worker (result)
        worker_id, client_id, empty_frame, result = recv_multipart(backend)
        
        # Send the result back to the original client
        frontend.send_multipart([client_id, b'', result])
        
        # Add the worker back to the pool
        workers.add(worker_id)
        print(f"Sent task to worker {worker_id}, received result: {result.decode()}")

And here’s the worker:

import zmq
import time
import random

context = zmq.Context()
# Socket to talk to the ventilator (frontend)
sender = context.socket(zmq.ROUTER)
sender.connect("tcp://localhost:5557")

# Socket to talk to the collector (backend)
receiver = context.socket(zmq.ROUTER)
receiver.connect("tcp://localhost:5558")

worker_id = f"worker-{random.randint(1000, 9999)}".encode()

print(f"Worker {worker_id.decode()} starting...")

while True:
    # Send a ready signal to the ventilator
    sender.send_multipart([b'', worker_id, b'READY'])
    
    # Wait for a task, then process and send back the result
    try:
        client_id, empty_frame, task = receiver.recv_multipart(zmq.NOBLOCK)
        
        # Simulate work
        num = int(task.decode())
        result = num * num
        time.sleep(random.uniform(0.1, 1.0)) # Simulate variable work time
        
        # Send result back to the ventilator with the original client_id
        receiver.send_multipart([b'', worker_id, client_id, b'', str(result).encode()])
        print(f"Worker {worker_id.decode()} processed {task.decode()} -> {result}")
        
    except zmq.Again:
        # No task received, continue loop to send READY signal
        time.sleep(0.05) # Small sleep to prevent busy-waiting
    except Exception as e:
        print(f"Worker {worker_id.decode()} error: {e}")

Finally, a client to send tasks and receive results:

import zmq
import time

context = zmq.Context()
# Socket to send tasks to the ventilator
client_socket = context.socket(zmq.ROUTER)
client_socket.connect("tcp://localhost:5557")

# Socket to receive results from the ventilator
results_socket = context.socket(zmq.ROUTER)
results_socket.connect("tcp://localhost:5558")

client_id = f"client-{random.randint(1000, 9999)}".encode()

print(f"Client {client_id.decode()} starting...")

# Send tasks
for i in range(10):
    task = str(i).encode()
    client_socket.send_multipart([b'', client_id, b'', task])
    print(f"Sent task {i}")
    time.sleep(0.1)

# Collect results
for i in range(10):
    try:
        _, _, result = client_socket.recv_multipart(zmq.NOBLOCK)
        print(f"Received result for task {i}: {result.decode()}")
    except zmq.Again:
        print(f"Waiting for result for task {i}...")
        time.sleep(0.2) # Wait a bit for results to come back
    except Exception as e:
        print(f"Client error: {e}")

print("All tasks sent and results collected.")
client_socket.close()
results_socket.close()
context.term()

The core problem this solves is decoupling the task producer (ventilator) from the task consumers (workers). The ventilator doesn’t need to know how many workers there are, or if they are available. It just sends tasks out. The ROUTER socket on the ventilator is key here: it allows sending messages to specific peers (workers) and receiving messages while preserving the sender’s identity. This identity is crucial for routing results back.

When a worker starts, it announces itself as READY to the ventilator. The ventilator, using its ROUTER socket, can then pick an available worker from its internal workers set and send the task. The ROUTER socket is essential because it handles the identity of the sender and receiver. When the ventilator sends a task to a worker, it prepends the worker’s identity. When the worker sends a result back, it prepends its own identity and the original client’s identity.

The ROUTER socket on the backend of the ventilator is for receiving results. It also preserves the worker’s identity, allowing the ventilator to know which worker sent the result and to add that worker back to its available pool. The ROUTER socket on the client is used to send tasks and receive results, again preserving its own identity so the ventilator knows where to send the final output.

The mental model is that the ventilator is a dispatcher. It has a queue of tasks and a pool of workers. When a task arrives from a client, it’s put into the ventilator’s internal queue. When a worker is ready, it signals the ventilator. The ventilator then takes a task from its queue and a worker from its pool, and sends the task to the worker. When a worker finishes, it sends the result back to the ventilator, which then routes it to the original client and marks the worker as available again.

The surprising part is how the ROUTER socket handles the client_id when sending tasks to workers. The ventilator receives [client_id, empty_frame, task] from the client. It then sends [worker_id, client_id, empty_frame, task] to the worker. The client_id is essentially being "stamped" onto the task, so when the worker replies, it sends [b'', worker_id, client_id, b'', result] back to the ventilator. The ventilator then strips off the worker_id and empty_frame and sends [client_id, b'', result] to the original client. This is how the results are correlated and returned to the correct originator.

What most people don’t realize is that the ROUTER socket, by default, enforces a strict ordering of incoming messages from any given peer. If a worker sends multiple messages without receiving any, or if a client sends multiple requests without receiving replies, the ROUTER socket will queue them up and deliver them in the order they were sent. This can lead to deadlocks if not managed carefully, especially if a worker is blocked waiting for a reply that will never come because the ventilator is blocked waiting for the worker to respond.

The next concept you’ll likely run into is handling worker failures and ensuring task durability, which leads to patterns like the ROBBER or implementing more robust task acknowledgment mechanisms.

Want structured learning?

Take the full Zeromq course →