Skip to content
TraceStax Docs

SQS Consumers

TraceStax tracks the processing of SQS messages in your consumer code — not the queue itself. Wrap your message handler to get per-message success/failure tracking, duration metrics, and anomaly detection.

TraceStax instruments your polling loop. Each message processed becomes a tracked task_event:

SQS Queue → Your Consumer (TraceStax SDK here) → TraceStax

The SDK doesn’t touch the queue configuration or visibility timeout. It only observes what happens after your consumer receives a message.


import os
import socket
import time
import traceback
import boto3
from tracestax.client import TraceStaxClient
client = TraceStaxClient(api_key=os.environ["TRACESTAX_API_KEY"])
client.start()
sqs = boto3.client("sqs")
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789/orders"
worker_key = f"{socket.gethostname()}:{os.getpid()}"
while True:
resp = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
)
for message in resp.get("Messages", []):
task_id = message["MessageId"]
# ApproximateReceiveCount tracks SQS-level retries
attempt = int(
message.get("Attributes", {}).get("ApproximateReceiveCount", 1)
)
start = time.monotonic()
base_event = {
"type": "task_event",
"framework": "sqs",
"language": "python",
"sdk_version": "0.1.0",
"worker": {
"key": worker_key,
"hostname": socket.gethostname(),
"pid": os.getpid(),
"queues": ["orders"],
},
"task": {
"name": "process_order",
"id": task_id,
"queue": "orders",
"attempt": attempt,
},
}
client.send_event({**base_event, "status": "started", "metrics": {}})
try:
process_order(message["Body"])
duration_ms = round((time.monotonic() - start) * 1000)
client.send_event({
**base_event,
"status": "succeeded",
"metrics": {"duration_ms": duration_ms},
})
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message["ReceiptHandle"],
)
except Exception as e:
duration_ms = round((time.monotonic() - start) * 1000)
client.send_event({
**base_event,
"status": "failed",
"metrics": {"duration_ms": duration_ms},
"error": {
"type": type(e).__name__,
"message": str(e),
"stack_trace": traceback.format_exc(),
},
})
# Don't delete — SQS will redeliver after visibility timeout

To receive ApproximateReceiveCount, add AttributeNames=["ApproximateReceiveCount"] to your receive_message call.


SQS provides ApproximateReceiveCount as a message attribute, which maps directly to task.attempt. Request it in your receive_message call:

resp = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=["ApproximateReceiveCount"],
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
)

When a message exceeds its redrive policy and moves to a dead-letter queue, SQS stops delivering it. If you consume from the DLQ separately, treat those as new tasks with queue set to the DLQ name.


  • Per-message success/failure tracking — every processed message is a task_event
  • Processing duration — p50, p95, p99 latency per handler
  • Failure rate anomaly detection — alerts when error rates spike
  • Retry visibilityApproximateReceiveCount maps to task.attempt for retry tracking