TraceStax Docs
RabbitMQ Consumers
TraceStax tracks the processing of RabbitMQ messages in your consumer code — not the broker itself. Wrap your message handler to get per-message success/failure tracking, duration metrics, and anomaly detection.
How it works
Section titled “How it works”TraceStax instruments your consumer callback. Each message processed becomes a tracked task_event:
RabbitMQ Broker → Your Consumer (TraceStax SDK here) → TraceStaxThe SDK doesn’t touch the broker, exchanges, or queue bindings. It only observes what happens after your consumer receives a message.
import osimport socketimport timeimport traceback
import pikafrom tracestax.client import TraceStaxClient
client = TraceStaxClient(api_key=os.environ["TRACESTAX_API_KEY"])client.start()
connection = pika.BlockingConnection( pika.ConnectionParameters("localhost"))channel = connection.channel()channel.queue_declare(queue="orders")
worker_key = f"{socket.gethostname()}:{os.getpid()}"
def on_message(ch, method, properties, body): task_id = properties.message_id or method.delivery_tag start = time.monotonic()
base_event = { "type": "task_event", "framework": "rabbitmq", "language": "python", "sdk_version": "0.1.0", "worker": { "key": worker_key, "hostname": socket.gethostname(), "pid": os.getpid(), "queues": [method.routing_key], }, "task": { "name": "process_order", "id": str(task_id), "queue": method.routing_key, "attempt": 1, }, }
client.send_event({**base_event, "status": "started", "metrics": {}})
try: process_order(body) duration_ms = round((time.monotonic() - start) * 1000) client.send_event({ **base_event, "status": "succeeded", "metrics": {"duration_ms": duration_ms}, }) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: duration_ms = round((time.monotonic() - start) * 1000) client.send_event({ **base_event, "status": "failed", "metrics": {"duration_ms": duration_ms}, "error": { "type": type(e).__name__, "message": str(e), "stack_trace": traceback.format_exc(), }, }) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue="orders", on_message_callback=on_message)channel.start_consuming()import amqplib from "amqplib";import { TraceStaxClient } from "@tracestax/node";import os from "node:os";
const client = new TraceStaxClient({ apiKey: process.env.TRACESTAX_API_KEY! });client.start();
const conn = await amqplib.connect("amqp://localhost");const ch = await conn.createChannel();await ch.assertQueue("orders");
const workerKey = `${os.hostname()}:${process.pid}`;
ch.consume("orders", async (msg) => { if (!msg) return;
const taskId = msg.properties.messageId || msg.fields.deliveryTag.toString(); const start = performance.now();
const baseEvent = { type: "task_event" as const, framework: "rabbitmq", language: "typescript", sdk_version: "0.1.0", worker: { key: workerKey, hostname: os.hostname(), pid: process.pid, queues: [msg.fields.routingKey], }, task: { name: "processOrder", id: taskId, queue: msg.fields.routingKey, attempt: 1, }, };
client.sendEvent({ ...baseEvent, status: "started", metrics: {} });
try { await processOrder(msg.content); const durationMs = Math.round(performance.now() - start); client.sendEvent({ ...baseEvent, status: "succeeded", metrics: { duration_ms: durationMs }, }); ch.ack(msg); } catch (err) { const durationMs = Math.round(performance.now() - start); client.sendEvent({ ...baseEvent, status: "failed", metrics: { duration_ms: durationMs }, error: { type: (err as Error).name, message: (err as Error).message, stack_trace: (err as Error).stack, }, }); ch.nack(msg, false, true); // requeue throw err; }});package main
import ( "fmt" "os" "time"
amqp "github.com/rabbitmq/amqp091-go" "github.com/tracestax/tracestax-go")
func main() { client := tracestax.NewClient(os.Getenv("TRACESTAX_API_KEY")) client.Start() defer client.Shutdown()
hostname, _ := os.Hostname() workerKey := fmt.Sprintf("%s:%d", hostname, os.Getpid())
conn, _ := amqp.Dial("amqp://localhost:5672") ch, _ := conn.Channel() ch.QueueDeclare("orders", true, false, false, false, nil)
msgs, _ := ch.Consume("orders", "", false, false, false, false, nil)
for msg := range msgs { taskID := msg.MessageId if taskID == "" { taskID = fmt.Sprintf("%d", msg.DeliveryTag) } start := time.Now()
base := tracestax.TaskEvent{ Type: "task_event", Framework: "rabbitmq", Language: "go", SDKVersion: "0.1.0", Worker: tracestax.WorkerInfo{ Key: workerKey, Hostname: hostname, PID: os.Getpid(), Queues: []string{msg.RoutingKey}, }, Task: tracestax.TaskInfo{ Name: "processOrder", ID: taskID, Queue: msg.RoutingKey, Attempt: 1, }, }
client.SendEvent(base.WithStatus("started"))
if err := processOrder(msg.Body); err != nil { dur := time.Since(start).Milliseconds() client.SendEvent(base.WithStatus("failed"). WithMetrics(tracestax.Metrics{DurationMs: dur}). WithError(err)) msg.Nack(false, true) } else { dur := time.Since(start).Milliseconds() client.SendEvent(base.WithStatus("succeeded"). WithMetrics(tracestax.Metrics{DurationMs: dur})) msg.Ack(false) } }}Retry tracking
Section titled “Retry tracking”If your consumer implements retry logic (e.g. via dead-letter exchanges), track the attempt number using a message header:
headers = properties.headers or {}attempt = headers.get("x-retry-count", 0) + 1
base_event["task"]["attempt"] = attemptFor DLX-based retries, increment a counter header each time you republish the message, then read it back in your consumer.
What you get
Section titled “What you get”- Per-message success/failure tracking — every processed message is a
task_event - Processing duration — p50, p95, p99 latency per handler
- Failure rate anomaly detection — alerts when error rates spike
- Consumer visibility — combine with worker heartbeats for fleet monitoring