ZeroMQ doesn’t actually do serialization itself; it just shuttles bytes around. The real magic happens in how you choose to encode your data before sending and decode it after receiving.
Here’s a ZeroMQ ROUTER socket receiving messages, and a DEALER socket sending them. The ROUTER receives messages with a multipart structure: the sender’s identity frame, followed by the message content frames. The DEALER sends messages similarly, but without needing to prepend its own identity.
import zmq
import sys
# Protobuf setup
import person_pb2
person = person_pb2.Person()
person.id = 12345
person.name = "John Doe"
person.email = "jdoe@example.com"
serialized_protobuf = person.SerializeToString()
# MessagePack setup
import msgpack
data_msgpack = {'id': 54321, 'name': 'Jane Doe', 'email': 'jane@example.com'}
serialized_msgpack = msgpack.packb(data_msgpack, use_bin_type=True)
context = zmq.Context()
# Socket facing clients and holding messages
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Socket facing services
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
# Worker simulation (e.g., a microservice)
def worker():
worker_socket = context.socket(zmq.DEALER)
worker_socket.connect("tcp://localhost:5560")
print("Worker started, connecting to backend...")
while True:
try:
# Receive multipart message
identity, message_type, payload = worker_socket.recv_multipart()
print(f"Worker received: {identity.decode()}, {message_type.decode()}, {len(payload)} bytes")
if message_type.decode() == "PROTOBUF":
received_person = person_pb2.Person()
received_person.ParseFromString(payload)
print(f" Decoded Protobuf: ID={received_person.id}, Name={received_person.name}")
response_payload = f"Processed Protobuf: {received_person.name}".encode()
elif message_type.decode() == "MSGPACK":
decoded_data = msgpack.unpackb(payload, raw=False)
print(f" Decoded MsgPack: ID={decoded_data['id']}, Name={decoded_data['name']}")
response_payload = f"Processed MsgPack: {decoded_data['name']}".encode()
else:
response_payload = b"Unknown message type"
# Send reply back to frontend via backend
worker_socket.send_multipart([identity, b"REPLY", response_payload])
print(f"Worker sent reply for {identity.decode()}")
except zmq.Again:
continue
except KeyboardInterrupt:
print("Worker shutting down.")
break
# Start a worker in a separate thread or process for a real scenario
# For this example, we'll simulate sending messages from the main thread
print("Starting ZeroMQ broker/proxy...")
# Start the worker simulation (in a real app, this would be a separate process)
import threading
worker_thread = threading.Thread(target=worker)
worker_thread.daemon = True
worker_thread.start()
print("Broker is running. Sending test messages...")
# Send Protobuf message
print("Sending Protobuf message...")
frontend.send_multipart([b"client1", b"PROTOBUF", serialized_protobuf])
print("Sent Protobuf. Waiting for reply...")
reply_identity, reply_type, reply_payload = frontend.recv_multipart()
print(f"Received reply: {reply_identity.decode()}, {reply_type.decode()}, {reply_payload.decode()}")
# Send MessagePack message
print("\nSending MessagePack message...")
frontend.send_multipart([b"client2", b"MSGPACK", serialized_msgpack])
print("Sent MsgPack. Waiting for reply...")
reply_identity, reply_type, reply_payload = frontend.recv_multipart()
print(f"Received reply: {reply_identity.decode()}, {reply_type.decode()}, {reply_payload.decode()}")
# Keep the main thread alive to allow the worker to run
try:
while True:
pass
except KeyboardInterrupt:
print("Main process shutting down.")
frontend.close()
backend.close()
context.term()
sys.exit(0)
This example demonstrates a common ZeroMQ pattern: the ROUTER/DEALER broker. The ROUTER (frontend) acts as a load balancer, receiving requests from clients and forwarding them to available DEALER workers (backend). The DEALER workers then process the requests and send replies back through the ROUTER.
The core problem this pattern solves is decoupling clients from workers. Clients don’t need to know which worker will handle their request, and workers don’t need to know which client sent the request. This allows for scalable, resilient systems where workers can be added or removed dynamically without impacting clients.
Internally, the ROUTER socket is crucial. When it receives a message, it automatically prepends the identity of the client that sent it to the message frames before forwarding it. When a DEALER worker sends a reply, it must include the original client identity frame as the first frame in its outgoing message. The ROUTER then uses this identity to route the reply back to the correct client.
You control this system primarily through the socket types (ROUTER, DEALER, REQ, REP, PUB, SUB, etc.) and their binding/connecting addresses. The choice of socket type dictates the communication pattern and how messages are routed. For instance, REQ/REP sockets enforce a strict request-reply sequence, while PUB/SUB are for one-to-many message broadcasting.
The most surprising thing about ZeroMQ’s routing is how explicit it is. Unlike higher-level message queues where routing logic is often abstracted away, ZeroMQ exposes the underlying mechanics. The ROUTER socket doesn’t just magically know where to send a message; it explicitly receives and forwards the sender’s identity, giving you fine-grained control over message delivery paths.
The next concept you’ll likely grapple with is managing message queues and preventing worker overload, often addressed with patterns like XSUB/XPUB for fan-out/fan-in scenarios or more complex broker designs.