SQS Consumers
TraceStax tracks the processing of SQS messages in your consumer code — not the queue itself. Wrap your message handler to get per-message success/failure tracking, duration metrics, and anomaly detection.
How it works
Section titled “How it works”TraceStax instruments your polling loop. Each message processed becomes a tracked task_event:
SQS Queue → Your Consumer (TraceStax SDK here) → TraceStaxThe SDK doesn’t touch the queue configuration or visibility timeout. It only observes what happens after your consumer receives a message.
import osimport socketimport timeimport traceback
import boto3from tracestax.client import TraceStaxClient
client = TraceStaxClient(api_key=os.environ["TRACESTAX_API_KEY"])client.start()
sqs = boto3.client("sqs")queue_url = "https://sqs.us-east-1.amazonaws.com/123456789/orders"
worker_key = f"{socket.gethostname()}:{os.getpid()}"
while True: resp = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=20, )
for message in resp.get("Messages", []): task_id = message["MessageId"] # ApproximateReceiveCount tracks SQS-level retries attempt = int( message.get("Attributes", {}).get("ApproximateReceiveCount", 1) ) start = time.monotonic()
base_event = { "type": "task_event", "framework": "sqs", "language": "python", "sdk_version": "0.1.0", "worker": { "key": worker_key, "hostname": socket.gethostname(), "pid": os.getpid(), "queues": ["orders"], }, "task": { "name": "process_order", "id": task_id, "queue": "orders", "attempt": attempt, }, }
client.send_event({**base_event, "status": "started", "metrics": {}})
try: process_order(message["Body"]) duration_ms = round((time.monotonic() - start) * 1000) client.send_event({ **base_event, "status": "succeeded", "metrics": {"duration_ms": duration_ms}, }) sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"], ) except Exception as e: duration_ms = round((time.monotonic() - start) * 1000) client.send_event({ **base_event, "status": "failed", "metrics": {"duration_ms": duration_ms}, "error": { "type": type(e).__name__, "message": str(e), "stack_trace": traceback.format_exc(), }, }) # Don't delete — SQS will redeliver after visibility timeoutTo receive ApproximateReceiveCount, add AttributeNames=["ApproximateReceiveCount"] to your receive_message call.
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand,} from "@aws-sdk/client-sqs";import { TraceStaxClient } from "@tracestax/node";import os from "node:os";
const client = new TraceStaxClient({ apiKey: process.env.TRACESTAX_API_KEY! });client.start();
const sqs = new SQSClient({});const queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/orders";
const workerKey = `${os.hostname()}:${process.pid}`;
while (true) { const { Messages = [] } = await sqs.send( new ReceiveMessageCommand({ QueueUrl: queueUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, AttributeNames: ["ApproximateReceiveCount"], }), );
for (const message of Messages) { const taskId = message.MessageId!; const attempt = Number( message.Attributes?.ApproximateReceiveCount ?? 1, ); const start = performance.now();
const baseEvent = { type: "task_event" as const, framework: "sqs", language: "typescript", sdk_version: "0.1.0", worker: { key: workerKey, hostname: os.hostname(), pid: process.pid, queues: ["orders"], }, task: { name: "processOrder", id: taskId, queue: "orders", attempt, }, };
client.sendEvent({ ...baseEvent, status: "started", metrics: {} });
try { await processOrder(message.Body!); const durationMs = Math.round(performance.now() - start); client.sendEvent({ ...baseEvent, status: "succeeded", metrics: { duration_ms: durationMs }, }); await sqs.send( new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle!, }), ); } catch (err) { const durationMs = Math.round(performance.now() - start); client.sendEvent({ ...baseEvent, status: "failed", metrics: { duration_ms: durationMs }, error: { type: (err as Error).name, message: (err as Error).message, stack_trace: (err as Error).stack, }, }); // Don't delete — SQS will redeliver after visibility timeout } }}package main
import ( "context" "fmt" "os" "strconv" "time"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/tracestax/tracestax-go")
func main() { rw := tracestax.NewClient(os.Getenv("TRACESTAX_API_KEY")) rw.Start() defer rw.Shutdown()
cfg, _ := config.LoadDefaultConfig(context.TODO()) sqsClient := sqs.NewFromConfig(cfg)
queueURL := "https://sqs.us-east-1.amazonaws.com/123456789/orders" hostname, _ := os.Hostname() workerKey := fmt.Sprintf("%s:%d", hostname, os.Getpid())
for { out, _ := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{ QueueUrl: &queueURL, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, AttributeNames: []types.QueueAttributeName{"ApproximateReceiveCount"}, })
for _, msg := range out.Messages { taskID := *msg.MessageId attempt, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"]) if attempt == 0 { attempt = 1 } start := time.Now()
base := tracestax.TaskEvent{ Type: "task_event", Framework: "sqs", Language: "go", SDKVersion: "0.1.0", Worker: tracestax.WorkerInfo{ Key: workerKey, Hostname: hostname, PID: os.Getpid(), Queues: []string{"orders"}, }, Task: tracestax.TaskInfo{ Name: "processOrder", ID: taskID, Queue: "orders", Attempt: attempt, }, }
rw.SendEvent(base.WithStatus("started"))
if err := processOrder(*msg.Body); err != nil { dur := time.Since(start).Milliseconds() rw.SendEvent(base.WithStatus("failed"). WithMetrics(tracestax.Metrics{DurationMs: dur}). WithError(err)) // Don't delete — SQS will redeliver } else { dur := time.Since(start).Milliseconds() rw.SendEvent(base.WithStatus("succeeded"). WithMetrics(tracestax.Metrics{DurationMs: dur})) sqsClient.DeleteMessage(context.TODO(), &sqs.DeleteMessageInput{ QueueUrl: &queueURL, ReceiptHandle: msg.ReceiptHandle, }) } } }}Retry tracking
Section titled “Retry tracking”SQS provides ApproximateReceiveCount as a message attribute, which maps directly to task.attempt. Request it in your receive_message call:
resp = sqs.receive_message( QueueUrl=queue_url, AttributeNames=["ApproximateReceiveCount"], MaxNumberOfMessages=10, WaitTimeSeconds=20,)const { Messages } = await sqs.send( new ReceiveMessageCommand({ QueueUrl: queueUrl, AttributeNames: ["ApproximateReceiveCount"], MaxNumberOfMessages: 10, WaitTimeSeconds: 20, }),);When a message exceeds its redrive policy and moves to a dead-letter queue, SQS stops delivering it. If you consume from the DLQ separately, treat those as new tasks with queue set to the DLQ name.
What you get
Section titled “What you get”- Per-message success/failure tracking — every processed message is a
task_event - Processing duration — p50, p95, p99 latency per handler
- Failure rate anomaly detection — alerts when error rates spike
- Retry visibility —
ApproximateReceiveCountmaps totask.attemptfor retry tracking