ZeroMQ’s PUB-SUB is a fire-and-forget messaging pattern where publishers send messages to subscribers. The "late joiner syndrome" is the common problem where subscribers that start after a publisher has already sent messages miss those initial messages entirely.

Here’s a common scenario:

# Publisher
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
print("Publisher started. Sending messages...")

for i in range(10):
    message = f"Message {i}"
    print(f"Sending: {message}")
    socket.send_string(message)
    time.sleep(0.5)

socket.close()
context.term()
# Subscriber
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt_string(zmq.SUBSCRIBE, "") # Subscribe to all topics
print("Subscriber started. Waiting for messages...")

while True:
    try:
        message = socket.recv_string(zmq.NOBLOCK)
        print(f"Received: {message}")
    except zmq.Again:
        time.sleep(0.1) # Avoid busy-waiting

If you run the subscriber after the publisher has already sent a few messages, the subscriber will only start receiving from the point it connected. The earlier messages are gone forever. This is a feature of the PUB-SUB pattern, not a bug. It’s designed for high throughput and low latency, not guaranteed message delivery for late joiners.

The Problem: Lost Initial State

The core issue arises when your application state is embodied in those initial messages. For example, if the publisher is sending configuration updates, sensor readings that establish a baseline, or initial user presence information, a subscriber joining late will start with incomplete or stale data, potentially leading to incorrect behavior or crashes.

The Mental Model: What’s Happening Under the Hood

ZeroMQ’s PUB-SUB sockets are fundamentally unidirectional and do not buffer messages on the publisher side for subscribers. When a PUB socket sends a message, it’s broadcast to all currently connected SUB sockets. There’s no persistent queue for subscribers to fetch from. If a SUB socket isn’t connected at the exact moment a message is sent, that message is lost to that SUB socket. The SUB socket does have an internal buffer, but it’s for holding messages after they’ve been received from the PUB socket, not for catching up on missed ones.

The Solution: Synchronization Strategies

To overcome late joiner syndrome, you need to implement a synchronization mechanism. The most common approach involves a handshake or a specific initial message exchange before the main data flow begins.

1. The "Hello, I’m here" Handshake with a Sync Socket

This is a robust pattern. The subscriber, upon connecting, sends a "ready" signal to the publisher. The publisher, upon receiving this signal, knows the subscriber is ready to receive the current state and then starts sending its regular messages.

Publisher Side (Modified):

# Publisher with Sync
import zmq
import time

context = zmq.Context()
# Publisher socket for regular messages
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://*:5555")

# Socket to signal subscribers are ready
sync_socket = context.socket(zmq.REP) # Using REP for explicit sync signal
sync_socket.bind("tcp://*:5556")

print("Publisher started. Waiting for sync signals...")

# We need to know how many subscribers are ready before sending initial state
# A simple way is to just wait for one, or use a counter if multiple are expected.
# For simplicity, this example waits for one.

# Wait for the sync signal from the subscriber
sync_socket.recv_string() # Blocks until a signal is received
print("Sync signal received. Sending initial state...")

# Simulate sending initial state or configuration
for i in range(5): # Initial state messages
    message = f"STATE: Initial {i}"
    print(f"Sending: {message}")
    pub_socket.send_string(message)
    time.sleep(0.1)

# Now, send regular updates
print("Initial state sent. Starting regular updates...")
for i in range(10):
    message = f"UPDATE: Message {i}"
    print(f"Sending: {message}")
    pub_socket.send_string(message)
    time.sleep(0.5)

sync_socket.close()
pub_socket.close()
context.term()

Subscriber Side (Modified):

# Subscriber with Sync
import zmq
import time

context = zmq.Context()
# Subscriber socket for regular messages
sub_socket = context.socket(zmq.SUB)
sub_socket.connect("tcp://localhost:5555")
sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") # Subscribe to all topics

# Socket to signal readiness to the publisher
sync_socket = context.socket(zmq.REQ) # Using REQ for explicit sync signal
sync_socket.connect("tcp://localhost:5556")

print("Subscriber started. Sending sync signal...")
sync_socket.send_string("READY") # Send the sync signal

print("Sync signal sent. Waiting for initial state and updates...")

# First, receive the initial state messages
for _ in range(5): # Expecting 5 initial state messages
    try:
        message = sub_socket.recv_string(timeout=2000) # Add a timeout
        print(f"Received initial state: {message}")
    except zmq.Again:
        print("Timeout waiting for initial state.")
        break

# Then, receive regular updates
while True:
    try:
        message = sub_socket.recv_string(zmq.NOBLOCK)
        print(f"Received update: {message}")
    except zmq.Again:
        time.sleep(0.1) # Avoid busy-waiting
    except KeyboardInterrupt:
        break

sync_socket.close()
sub_socket.close()
context.term()

Why it works: The REQ/REP sockets provide a synchronous, request-reply mechanism. The subscriber sends a REQ ("READY") and blocks until the publisher sends a REP (implicitly, by receiving the REQ). Once the publisher has received the "READY" signal, it knows the subscriber is attached and ready. It then sends its initial state before proceeding with the stream of updates. The subscriber, having sent its "READY" signal, then expects and receives this initial state first.

2. Using a Snapshotting Mechanism (for stateful publishers)

If your publisher’s state is complex or large, you might not want to resend it every time a subscriber joins. Instead, the publisher can expose a separate mechanism (e.g., another ZeroMQ socket or an HTTP endpoint) where a new subscriber can request a snapshot of the current state. The publisher then sends this snapshot, and after it’s complete, the subscriber can connect to the main PUB socket or signal it’s ready for incremental updates.

3. Using High Water Marks (HWM) and Durable Queues (with caveats)

ZeroMQ sockets have HWM settings. zmq.SNDHWM and zmq.RCVHWM control the maximum number of messages that can be queued for sending or receiving, respectively. If the HWM is reached, the socket will block or drop messages depending on the socket type and configuration.

For PUB/SUB, SNDHWM on the publisher is relevant if the publisher is faster than the network can carry messages to subscribers. However, it does not solve the late joiner problem because messages are not persisted if no subscriber is connected.

Some ZeroMQ transports (like inproc or ipc on certain platforms) can offer some degree of message persistence or buffering that might help, but tcp does not.

For true durability and guaranteed delivery for late joiners, you’d typically look at patterns involving queues (like ZeroMQ’s ROUTER/DEALER with a queueing agent) or external message brokers (Kafka, RabbitMQ).

The Key Takeaway: PUB-SUB is for broadcasting, not for guaranteed state delivery to late joiners. You must explicitly build a synchronization and state-transfer mechanism around it.

The next error you’ll likely encounter after implementing synchronization is related to timeouts if one side of the handshake fails, or incorrect message counts if your initial state or update logic is slightly off.

Want structured learning?

Take the full Zeromq course →