Skip to content
TraceStax Docs

Kafka Consumers

TraceStax tracks the processing of Kafka messages in your consumer code — not the broker itself. Wrap your message handler to get per-message success/failure tracking, duration metrics, and anomaly detection.

TraceStax instruments your consumer loop. Each message processed becomes a tracked task_event:

Kafka Broker → Your Consumer (TraceStax SDK here) → TraceStax

The SDK doesn’t touch the broker or consumer group coordination. It only observes what happens after your consumer receives a message.


import os
import socket
import time
import traceback
from kafka import KafkaConsumer
from tracestax.client import TraceStaxClient
client = TraceStaxClient(api_key=os.environ["TRACESTAX_API_KEY"])
client.start()
consumer = KafkaConsumer(
"orders",
bootstrap_servers="localhost:9092",
group_id="order-processor",
)
worker_key = f"{socket.gethostname()}:{os.getpid()}"
for message in consumer:
task_id = f"{message.topic}-{message.partition}-{message.offset}"
start = time.monotonic()
base_event = {
"type": "task_event",
"framework": "kafka",
"language": "python",
"sdk_version": "0.1.0",
"worker": {
"key": worker_key,
"hostname": socket.gethostname(),
"pid": os.getpid(),
"queues": [message.topic],
},
"task": {
"name": "process_order",
"id": task_id,
"queue": message.topic,
"attempt": 1,
},
}
client.send_event({**base_event, "status": "started", "metrics": {}})
try:
process_order(message.value)
duration_ms = round((time.monotonic() - start) * 1000)
client.send_event({
**base_event,
"status": "succeeded",
"metrics": {"duration_ms": duration_ms},
})
except Exception as e:
duration_ms = round((time.monotonic() - start) * 1000)
client.send_event({
**base_event,
"status": "failed",
"metrics": {"duration_ms": duration_ms},
"error": {
"type": type(e).__name__,
"message": str(e),
"stack_trace": traceback.format_exc(),
},
})

To avoid repeating the instrumentation boilerplate, create a small wrapper.

def tracked(client, topic, handler_name):
"""Decorator that wraps a Kafka message handler with TraceStax tracking."""
def decorator(fn):
def wrapper(message):
task_id = f"{message.topic}-{message.partition}-{message.offset}"
# ... same start/succeed/fail pattern as above
return wrapper
return decorator
@tracked(client, "orders", "process_order")
def process_order(message):
...

  • Per-message success/failure tracking — every processed message is a task_event
  • Processing duration — p50, p95, p99 latency per handler
  • Failure rate anomaly detection — alerts when error rates spike
  • Consumer lag visibility — combine with worker heartbeats for lag tracking