TraceStax Docs
Kafka Consumers
TraceStax tracks the processing of Kafka messages in your consumer code — not the broker itself. Wrap your message handler to get per-message success/failure tracking, duration metrics, and anomaly detection.
How it works
Section titled “How it works”TraceStax instruments your consumer loop. Each message processed becomes a tracked task_event:
Kafka Broker → Your Consumer (TraceStax SDK here) → TraceStaxThe SDK doesn’t touch the broker or consumer group coordination. It only observes what happens after your consumer receives a message.
import osimport socketimport timeimport traceback
from kafka import KafkaConsumerfrom tracestax.client import TraceStaxClient
client = TraceStaxClient(api_key=os.environ["TRACESTAX_API_KEY"])client.start()
consumer = KafkaConsumer( "orders", bootstrap_servers="localhost:9092", group_id="order-processor",)
worker_key = f"{socket.gethostname()}:{os.getpid()}"
for message in consumer: task_id = f"{message.topic}-{message.partition}-{message.offset}" start = time.monotonic()
base_event = { "type": "task_event", "framework": "kafka", "language": "python", "sdk_version": "0.1.0", "worker": { "key": worker_key, "hostname": socket.gethostname(), "pid": os.getpid(), "queues": [message.topic], }, "task": { "name": "process_order", "id": task_id, "queue": message.topic, "attempt": 1, }, }
client.send_event({**base_event, "status": "started", "metrics": {}})
try: process_order(message.value) duration_ms = round((time.monotonic() - start) * 1000) client.send_event({ **base_event, "status": "succeeded", "metrics": {"duration_ms": duration_ms}, }) except Exception as e: duration_ms = round((time.monotonic() - start) * 1000) client.send_event({ **base_event, "status": "failed", "metrics": {"duration_ms": duration_ms}, "error": { "type": type(e).__name__, "message": str(e), "stack_trace": traceback.format_exc(), }, })import { Kafka } from "kafkajs";import { TraceStaxClient } from "@tracestax/node";import os from "node:os";
const client = new TraceStaxClient({ apiKey: process.env.TRACESTAX_API_KEY! });client.start();
const kafka = new Kafka({ brokers: ["localhost:9092"] });const consumer = kafka.consumer({ groupId: "order-processor" });
await consumer.connect();await consumer.subscribe({ topic: "orders" });
const workerKey = `${os.hostname()}:${process.pid}`;
await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const taskId = `${topic}-${partition}-${message.offset}`; const start = performance.now();
const baseEvent = { type: "task_event" as const, framework: "kafka", language: "typescript", sdk_version: "0.1.0", worker: { key: workerKey, hostname: os.hostname(), pid: process.pid, queues: [topic], }, task: { name: "processOrder", id: taskId, queue: topic, attempt: 1, }, };
client.sendEvent({ ...baseEvent, status: "started", metrics: {} });
try { await processOrder(message.value); const durationMs = Math.round(performance.now() - start); client.sendEvent({ ...baseEvent, status: "succeeded", metrics: { duration_ms: durationMs }, }); } catch (err) { const durationMs = Math.round(performance.now() - start); client.sendEvent({ ...baseEvent, status: "failed", metrics: { duration_ms: durationMs }, error: { type: (err as Error).name, message: (err as Error).message, stack_trace: (err as Error).stack, }, }); throw err; // re-throw so kafkajs handles the failure } },});package main
import ( "context" "fmt" "os" "time"
"github.com/IBM/sarama" "github.com/tracestax/tracestax-go")
func main() { client := tracestax.NewClient(os.Getenv("TRACESTAX_API_KEY")) client.Start() defer client.Shutdown()
hostname, _ := os.Hostname() workerKey := fmt.Sprintf("%s:%d", hostname, os.Getpid())
// Implement sarama.ConsumerGroupHandler handler := &OrderHandler{ client: client, workerKey: workerKey, hostname: hostname, }
group, _ := sarama.NewConsumerGroup( []string{"localhost:9092"}, "order-processor", sarama.NewConfig(), ) for { group.Consume(context.Background(), []string{"orders"}, handler) }}
type OrderHandler struct { client *tracestax.Client workerKey string hostname string}
func (h *OrderHandler) ConsumeClaim( sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim,) error { for msg := range claim.Messages() { taskID := fmt.Sprintf("%s-%d-%d", msg.Topic, msg.Partition, msg.Offset) start := time.Now()
base := tracestax.TaskEvent{ Type: "task_event", Framework: "kafka", Language: "go", SDKVersion: "0.1.0", Worker: tracestax.WorkerInfo{ Key: h.workerKey, Hostname: h.hostname, PID: os.Getpid(), Queues: []string{msg.Topic}, }, Task: tracestax.TaskInfo{ Name: "processOrder", ID: taskID, Queue: msg.Topic, Attempt: 1, }, }
h.client.SendEvent(base.WithStatus("started"))
if err := processOrder(msg.Value); err != nil { dur := time.Since(start).Milliseconds() h.client.SendEvent(base.WithStatus("failed"). WithMetrics(tracestax.Metrics{DurationMs: dur}). WithError(err)) } else { dur := time.Since(start).Milliseconds() h.client.SendEvent(base.WithStatus("succeeded"). WithMetrics(tracestax.Metrics{DurationMs: dur})) }
sess.MarkMessage(msg, "") } return nil}
func (h *OrderHandler) Setup(sarama.ConsumerGroupSession) error { return nil }func (h *OrderHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }import com.tracestax.TraceStaxClient;import com.tracestax.TaskEvent;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;
@Componentpublic class OrderConsumer {
private final TraceStaxClient client;
public OrderConsumer(TraceStaxClient client) { this.client = client; }
@KafkaListener(topics = "orders", groupId = "order-processor") public void handle(ConsumerRecord<String, String> record) { String taskId = record.topic() + "-" + record.partition() + "-" + record.offset(); long start = System.nanoTime();
TaskEvent base = TaskEvent.builder() .framework("kafka") .language("java") .sdkVersion("0.1.0") .workerKey(workerKey()) .hostname(hostname()) .pid(ProcessHandle.current().pid()) .queues(List.of(record.topic())) .taskName("processOrder") .taskId(taskId) .queue(record.topic()) .attempt(1) .build();
client.sendEvent(base.withStatus("started"));
try { processOrder(record.value()); long durationMs = (System.nanoTime() - start) / 1_000_000; client.sendEvent(base.withStatus("succeeded") .withMetrics(Map.of("duration_ms", durationMs))); } catch (Exception e) { long durationMs = (System.nanoTime() - start) / 1_000_000; client.sendEvent(base.withStatus("failed") .withMetrics(Map.of("duration_ms", durationMs)) .withError(e)); throw e; } }}Helper: wrap any handler
Section titled “Helper: wrap any handler”To avoid repeating the instrumentation boilerplate, create a small wrapper.
def tracked(client, topic, handler_name): """Decorator that wraps a Kafka message handler with TraceStax tracking.""" def decorator(fn): def wrapper(message): task_id = f"{message.topic}-{message.partition}-{message.offset}" # ... same start/succeed/fail pattern as above return wrapper return decorator
@tracked(client, "orders", "process_order")def process_order(message): ...function tracked( client: TraceStaxClient, handlerName: string, fn: (message: KafkaMessage) => Promise<void>,) { return async ({ topic, partition, message }) => { const taskId = `${topic}-${partition}-${message.offset}`; // ... same start/succeed/fail pattern as above await fn(message); };}
await consumer.run({ eachMessage: tracked(client, "processOrder", async (msg) => { // your logic here }),});What you get
Section titled “What you get”- Per-message success/failure tracking — every processed message is a
task_event - Processing duration — p50, p95, p99 latency per handler
- Failure rate anomaly detection — alerts when error rates spike
- Consumer lag visibility — combine with worker heartbeats for lag tracking