The Majordomo broker doesn’t actually guarantee message delivery to workers; it guarantees the broker’s ability to deliver messages to workers and the worker’s ability to signal successful processing back to the broker.
Let’s see it in action. Imagine a simple mdworker that just prints a message and sends back a "DONE" reply:
# mdworker.py
import zmq
import sys
import time
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5555") # Connect to the broker
print("Worker started, connecting to broker...")
while True:
try:
# Receive message from broker
# Format: [empty, service, request]
frames = socket.recv_multipart()
sender_id = frames[0] # The broker's internal ID for this connection
service = frames[1].decode('utf-8')
request = frames[2].decode('utf-8')
print(f"Received request for service '{service}': {request}")
# Simulate work
time.sleep(1)
# Send reply back to broker
# Format: [empty, sender_id, reply]
reply = "DONE"
socket.send_multipart([b'', sender_id, reply.encode('utf-8')])
print("Sent reply: DONE")
except zmq.ZMQError as e:
print(f"Worker error: {e}")
break
except KeyboardInterrupt:
print("Worker shutting down...")
break
socket.close()
context.term()
And a mdbroker that manages these workers. This is the core of Majordomo:
# mdbroker.py
import zmq
import time
# Broker states
STATE_READY = 0
STATE_DEALER_CONNECTED = 1
# Majordomo commands
CMD_REQUEST = b"MD#0"
CMD_REPLY = b"MD#1"
CMD_HEARTBEAT = b"MD#2" # Not implemented in this simple example
class MajordomoBroker:
def __init__(self):
self.context = zmq.Context()
self.frontend = self.context.socket(zmq.ROUTER) # Clients connect here
self.backend = self.context.socket(zmq.ROUTER) # Workers connect here
self.frontend.bind("tcp://*:5555") # Clients
self.backend.bind("tcp://*:5556") # Workers
self.workers = {} # {worker_id: [state, service, socket]}
self.waiting = {} # {service: [worker_id, ...]}
self.client_queue = [] # [(client_id, service, request)]
self.poll = zmq.Poller()
self.poll.register(self.frontend, zmq.POLLIN)
self.poll.register(self.backend, zmq.POLLIN)
print("Majordomo broker started.")
def run(self):
while True:
# Poll for events, with a short timeout to allow periodic checks
socks = dict(self.poll.poll(100)) # 100ms timeout
if self.frontend in socks and socks[self.frontend] == zmq.POLLIN:
self.handle_frontend()
if self.backend in socks and socks[self.backend] == zmq.POLLIN:
self.handle_backend()
# Process queued client requests if workers are available
if self.client_queue:
self.dispatch_client_requests()
def handle_frontend(self):
client_id, service, request = self.frontend.recv_multipart()
print(f"Frontend: Received request from {client_id.decode()} for service {service.decode()}")
self.client_queue.append((client_id, service, request))
self.dispatch_client_requests() # Try to dispatch immediately
def handle_backend(self):
worker_id, empty, command = self.backend.recv_multipart()
print(f"Backend: Received command from worker {worker_id.decode()} ({command.decode()})")
if command == CMD_REPLY:
# Worker sent a reply
if worker_id in self.workers:
_, service, _ = self.workers[worker_id] # Get service worker was handling
# Find the original client request for this worker's task
# In a real system, you'd map worker_id to a specific client request
# For simplicity here, we assume the worker replies to the last task it was assigned
# This is where the broker's "state" for the worker is crucial.
# This simplified example doesn't track specific client requests per worker.
# A more robust broker would store (client_id, service, request) associated with worker_id.
# For now, we'll just look for any client waiting for this service.
# In a real implementation, you'd have a mapping from worker_id to the specific request they just finished.
# For this example, we'll just find *a* client waiting for this service.
# This highlights why robust state management is key.
# Let's assume the worker_id implies a specific task it just completed.
# We need to find the client that initiated that task.
# A better approach: when sending a request to a worker, store the client_id and request.
# When worker replies, use that stored info to send back to the client.
# For this simplified example, we'll just imagine we know which client to reply to.
# Let's assume the worker_id is tied to a specific client task.
# This is a simplification. A real system would track this mapping.
# For demonstration, we'll just use the service to find *a* client.
# This is NOT how a production Majordomo handles it.
# Proper Majordomo: worker_id is a ROUTER socket ID. The broker stores the client_id
# when it sends the request to the worker. When the worker replies, the broker
# receives the worker_id and the reply, and uses the stored client_id to send back.
# Simplified logic: if worker is back, it's ready. The reply itself is not used here,
# but in a real case, the worker would send the actual result.
print(f"Worker {worker_id.decode()} finished task for service {service.decode()}. Making worker available.")
self.workers[worker_id] = (STATE_READY, service, None) # Mark worker as ready
self.dispatch_client_requests() # Try to dispatch new requests
else:
print(f"Warning: Received reply from unknown worker {worker_id.decode()}")
elif command == CMD_REQUEST:
# Worker is requesting to register for a service
service = empty # The service name is in the "empty" frame for registration
if worker_id not in self.workers:
self.workers[worker_id] = (STATE_READY, service.decode('utf-8'), None)
print(f"Worker {worker_id.decode()} registered for service '{service.decode()}'")
self.dispatch_client_requests() # Check if there are pending requests for this service
else:
print(f"Warning: Worker {worker_id.decode()} already registered. Ignoring.")
# We ignore HEARTBEAT for this example
def dispatch_client_requests(self):
while self.client_queue:
client_id, service, request = self.client_queue[0] # Peek at the front
# Find an available worker for this service
available_worker_id = None
for w_id, (state, worker_service, _) in self.workers.items():
if state == STATE_READY and worker_service == service.decode('utf-8'):
available_worker_id = w_id
break
if available_worker_id:
# Found a worker, dequeue the request and send it
self.client_queue.pop(0)
print(f"Dispatching request to worker {available_worker_id.decode()} for service {service.decode()}")
# Update worker state to busy
self.workers[available_worker_id] = (STATE_DEALER_CONNECTED, service.decode('utf-8'), client_id)
# Send request to worker: [worker_id, empty, CMD_REQUEST, client_id, request]
# Note: The CMD_REQUEST here is *not* for registration. It signifies a task request.
# A more robust implementation might use different commands or frame structures.
# The 'empty' frame is essential for ROUTER sockets to correctly identify the sender.
self.backend.send_multipart([available_worker_id, b'', CMD_REQUEST, client_id, request])
else:
# No available worker for this service, stop trying to dispatch for now
break
if __name__ == "__main__":
broker = MajordomoBroker()
broker.run()
When you run mdbroker.py and then mdworker.py, the worker will connect to the broker’s backend. When a client (which we haven’t written here, but imagine it sends a message to tcp://localhost:5555) sends a request, the broker receives it on its frontend, queues it, and then dispatches it to an available worker on the backend. When the worker finishes, it sends a CMD_REPLY back to the broker, which makes the worker available again.
The Majordomo pattern is essentially a stateful proxy that manages a pool of workers. It handles:
- Worker Management: It tracks which workers are available, busy, or have disconnected. Workers register themselves with a service, and the broker notes this.
- Request Queuing: When clients send requests to the broker, if no worker is immediately available for that service, the request is placed in a queue.
- Load Balancing: The broker distributes incoming requests to available workers. The
ROUTERsocket inherently provides a form of load balancing by handing out requests to connectedDEALERsockets. The simple dispatch logic above picks the first available worker. - Reliability (Broker-side): The broker ensures that if a worker successfully signals completion (
CMD_REPLY), that signal is processed. If a worker crashes before signaling completion, the broker will eventually detect the dead connection (e.g., via heartbeats or subsequent polls) and re-queue the task or mark the worker as dead. - Client Abstraction: Clients only need to know about the broker’s address. They don’t need to know about individual workers.
The most surprising thing about Majordomo is that its core strength isn’t guaranteed end-to-end message delivery but rather robust broker-worker communication management. The broker is designed to be resilient, but it relies on workers to signal completion. If a worker dies mid-task and doesn’t send a reply, the task is lost unless the broker has a timeout mechanism or a way to detect dead workers and re-queue.
The exact levers you control are primarily the broker’s frontend and backend binding addresses, and the logic for how it dispatches requests and handles worker replies. The ROUTER and DEALER socket types are critical. ROUTER sockets (used by the broker) can send messages to specific peers (identified by their DEALER socket’s identity) and receive messages with the sender’s identity prepended. DEALER sockets (used by workers) behave like ROUTER sockets but don’t automatically prepend identities.
The one thing most people don’t know is how crucial the ROUTER socket’s ability to retain the sender’s identity is for stateful proxies like Majordomo. When the broker sends a request to a worker (using backend.send_multipart([worker_id, ...])), the worker_id is the identity of the DEALER socket connected to that worker. When the worker replies, the ROUTER socket on the broker receives this reply along with the worker_id that was implicitly attached by the DEALER socket. This allows the broker to know which worker sent the reply and, by extension, which client request that reply belongs to (if the broker stored that association when it sent the request out).
The next concept you’ll run into is implementing client-side resilience and handling broker failures.