ZeroMQ’s "high water mark" is actually a sophisticated backpressure mechanism that prevents slower consumers from drowning faster producers in a flood of messages.
Let’s watch it in action. Imagine a publisher sending messages as fast as it can, and a subscriber that’s a bit sluggish, perhaps doing some disk I/O or complex processing per message.
# Publisher (fast)
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
message_count = 0
while True:
message = f"message {message_count}"
socket.send_string(message)
print(f"Sent: {message}")
message_count += 1
time.sleep(0.01) # Simulate sending rate
# Subscriber (slow)
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, "") # Subscribe to all
socket.connect("tcp://localhost:5556")
message_count = 0
while True:
message = socket.recv_string()
print(f"Received: {message}")
message_count += 1
time.sleep(0.1) # Simulate slow processing
If you run these two scripts, you’ll notice the publisher sending messages much faster than the subscriber can process them. Without a mechanism to slow the publisher down, the subscriber’s internal message queue would grow indefinitely, eventually consuming all available memory. This is where the high water mark (HWM) comes in.
The HWM is a limit on the number of messages that can be queued in the socket’s send buffer (for the sender) or receive buffer (for the receiver) before ZeroMQ starts dropping messages or blocking. When a socket reaches its HWM, it signals backpressure to the other end. For a PUB socket, reaching its HWM means it will stop sending new messages until space becomes available in the buffer. For a SUB socket, reaching its HWM means it will start dropping incoming messages if the publisher is too fast.
The zmq.SNDHWM and zmq.RCVHWM socket options control this behavior. You set them on the socket before binding or connecting.
Here’s how you’d configure the HWM on the publisher to prevent it from overwhelming the subscriber:
# Publisher with HWM
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
# Set send high water mark to 10 messages
socket.setsockopt(zmq.SNDHWM, 10)
socket.bind("tcp://*:5556")
message_count = 0
while True:
message = f"message {message_count}"
try:
socket.send_string(message)
print(f"Sent: {message}")
message_count += 1
# A small sleep to make the output readable,
# but the HWM is the real limiter.
time.sleep(0.001)
except zmq.Again:
print("HWM reached, blocking or dropping messages.")
# In a real app, you might wait or log this.
time.sleep(0.1) # Wait a bit before retrying
And on the subscriber, to ensure it doesn’t drop messages if the publisher is too fast and the subscriber can’t keep up:
# Subscriber with HWM
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
# Set receive high water mark to 100 messages
socket.setsockopt(zmq.RCVHWM, 100)
socket.connect("tcp://localhost:5556")
message_count = 0
while True:
try:
message = socket.recv_string()
print(f"Received: {message}")
message_count += 1
time.sleep(0.1) # Simulate slow processing
except zmq.Again:
print("HWM reached, no messages available to receive.")
# This usually means the publisher is slower than the receiver.
time.sleep(0.1)
By setting zmq.SNDHWM on the publisher to 10, the publisher will block if its outgoing message queue exceeds 10 messages. This forces the publisher to wait for the subscriber to consume messages, effectively slowing down the producer to match the consumer’s pace. Setting zmq.RCVHWM on the subscriber to 100 means that if the publisher somehow still sends messages faster than the subscriber can process them and the subscriber’s incoming buffer exceeds 100 messages, ZeroMQ will start dropping messages from the subscriber’s end to prevent memory exhaustion. The goal is usually to set the HWM on the receiver to prevent drops, and on the sender to prevent blocking indefinitely or overwhelming the network.
The most effective way to tune HWM is to monitor your application’s message throughput and latency. If you see messages being dropped (which ZeroMQ doesn’t explicitly signal on recv unless DONTWAIT is used, but you’d infer it from out-of-order or missing messages) or your publisher is blocking unexpectedly, you’ll need to adjust the HWM. A common starting point for HWM is a few hundred or a few thousand messages, depending on message size and network conditions. However, it’s crucial to understand that setting an HWM is a limit, not a guarantee of a specific buffer size. ZeroMQ may use less than the HWM.
The default HWM for most transports in ZeroMQ is 0, which means no limit, and thus no backpressure. This is fine for fire-and-forget scenarios or when producers and consumers are guaranteed to be perfectly matched in speed, but it’s a recipe for disaster in most real-world distributed systems where speeds inevitably diverge.
Understanding the interplay between SNDHWM and RCVHWM is key to building robust message queues.