Skip to content
TraceStax Docs

RabbitMQ Consumers

TraceStax tracks the processing of RabbitMQ messages in your consumer code — not the broker itself. Wrap your message handler to get per-message success/failure tracking, duration metrics, and anomaly detection.

TraceStax instruments your consumer callback. Each message processed becomes a tracked task_event:

RabbitMQ Broker → Your Consumer (TraceStax SDK here) → TraceStax

The SDK doesn’t touch the broker, exchanges, or queue bindings. It only observes what happens after your consumer receives a message.


import os
import socket
import time
import traceback
import pika
from tracestax.client import TraceStaxClient
client = TraceStaxClient(api_key=os.environ["TRACESTAX_API_KEY"])
client.start()
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.queue_declare(queue="orders")
worker_key = f"{socket.gethostname()}:{os.getpid()}"
def on_message(ch, method, properties, body):
task_id = properties.message_id or method.delivery_tag
start = time.monotonic()
base_event = {
"type": "task_event",
"framework": "rabbitmq",
"language": "python",
"sdk_version": "0.1.0",
"worker": {
"key": worker_key,
"hostname": socket.gethostname(),
"pid": os.getpid(),
"queues": [method.routing_key],
},
"task": {
"name": "process_order",
"id": str(task_id),
"queue": method.routing_key,
"attempt": 1,
},
}
client.send_event({**base_event, "status": "started", "metrics": {}})
try:
process_order(body)
duration_ms = round((time.monotonic() - start) * 1000)
client.send_event({
**base_event,
"status": "succeeded",
"metrics": {"duration_ms": duration_ms},
})
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
duration_ms = round((time.monotonic() - start) * 1000)
client.send_event({
**base_event,
"status": "failed",
"metrics": {"duration_ms": duration_ms},
"error": {
"type": type(e).__name__,
"message": str(e),
"stack_trace": traceback.format_exc(),
},
})
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue="orders", on_message_callback=on_message)
channel.start_consuming()

If your consumer implements retry logic (e.g. via dead-letter exchanges), track the attempt number using a message header:

headers = properties.headers or {}
attempt = headers.get("x-retry-count", 0) + 1
base_event["task"]["attempt"] = attempt

For DLX-based retries, increment a counter header each time you republish the message, then read it back in your consumer.


  • Per-message success/failure tracking — every processed message is a task_event
  • Processing duration — p50, p95, p99 latency per handler
  • Failure rate anomaly detection — alerts when error rates spike
  • Consumer visibility — combine with worker heartbeats for fleet monitoring