Python
The TraceStax Python SDK supports Celery, Dramatiq, and RQ. One package, multiple integrations — use whichever submodule matches your queue.
Requirements
Section titled “Requirements”- Python 3.9+
- One of: Celery 5.x, Dramatiq 1.x, RQ 1.x
Installation
Section titled “Installation”pip install tracestaxfrom celery import Celeryfrom tracestax import configure
app = Celery("myapp", broker="redis://localhost:6379/0")
configure(app, api_key="ts_live_xxxxxxxxxxxx")configure() attaches to Celery’s signal system automatically — no changes to your task definitions required.
Django + Celery:
import osfrom celery import Celeryfrom tracestax import configure
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
app = Celery("myproject")app.config_from_object("django.conf:settings", namespace="CELERY")app.autodiscover_tasks()
configure(app, api_key=os.environ["TRACESTAX_API_KEY"])Enable duration tracking — set this in your Celery config or it will default on:
app.conf.task_track_started = Truefrom celery.beat import PersistentSchedulerfrom tracestax.beat import TraceStaxBeatScheduler
class MonitoredScheduler(TraceStaxBeatScheduler, PersistentScheduler): """Celery Beat scheduler that reports dispatches to TraceStax."""
def apply_async(self, entry, producer=None, advance=True, **kwargs): result = super().apply_async(entry, producer, advance, **kwargs) self.on_task_dispatched(entry.name, result.id) return resultThen point Celery at your scheduler:
CELERY_BEAT_SCHEDULER = "myapp.celerybeat_scheduler:MonitoredScheduler"TRACESTAX_API_KEY = "ts_live_xxxxxxxxxxxx"Or configure programmatically:
from tracestax.beat import TraceStaxBeatScheduler
scheduler = TraceStaxBeatScheduler(api_key="ts_live_xxxxxxxxxxxx")import dramatiqfrom dramatiq.brokers.redis import RedisBrokerfrom tracestax.dramatiq import configure
broker = RedisBroker(url="redis://localhost:6379")broker.add_middleware(configure(api_key="ts_live_xxxxxxxxxxxx"))dramatiq.set_broker(broker)The middleware hooks into Dramatiq’s before_process_message and after_process_message lifecycle — no per-actor changes needed.
With periodiq (scheduled actors):
Periodic actors are automatically tracked. The scheduled: true flag is set on events from actors registered with periodiq.
from redis import Redisfrom tracestax import configurefrom tracestax.rq import TraceStaxWorker
conn = Redis()client = configure(api_key="ts_live_xxxxxxxxxxxx")
# Drop-in replacement for rq.Workerw = TraceStaxWorker(client=client, queues=["default", "high"], connection=conn)w.work()TraceStaxWorker extends rq.Worker at runtime so isinstance(w, rq.Worker) is always True.
Configuration
Section titled “Configuration”All options can be passed as keyword arguments to configure() or set via environment variables.
| Option | Env var | Default | Description |
|---|---|---|---|
api_key | TRACESTAX_API_KEY | — | Required. Project API key |
endpoint | TRACESTAX_INGEST_URL | https://ingest.tracestax.com | Override for proxying or self-hosted |
enabled | TRACESTAX_ENABLED | true | Set to false to disable ingest (e.g. in tests) |
flush_interval | — | 5.0 | Seconds between background flushes |
max_batch_size | — | 100 | Max events per HTTP request |
enable_lineage | — | true | Track task chains and parent/child relationships (Celery only) |
enable_snapshots | — | true | Periodically snapshot queue depth (Celery only) |
Per-task metadata
Section titled “Per-task metadata”@app.task(tracestax={"tags": ["critical", "payments"]})def process_payment(order_id: int): ...@dramatiq.actor(tracestax_tags=["critical", "payments"])def process_payment(order_id: int): ...# Pass as job_kwargs when enqueuingqueue.enqueue(process_payment, job_kwargs={"tracestax_tags": ["critical"]})Testing
Section titled “Testing”Set TRACESTAX_ENABLED=false in your test environment:
# pytest conftest.pyimport osos.environ.setdefault("TRACESTAX_ENABLED", "false")For Celery, task_always_eager suppresses signals so events won’t fire anyway — but being explicit is good practice.
Troubleshooting
Section titled “Troubleshooting”Events not appearing after deploy
Check your worker process can reach ingest.tracestax.com:443. Ingest calls are made in a background thread and will log a warning if they fail — check your worker logs.
Duration showing as 0 (Celery)
task_track_started = True must be set in your Celery config. Without it the task_started signal doesn’t fire and duration can’t be calculated.
High event volume
The SDK batches events and flushes asynchronously. If you’re processing thousands of tasks per second, increase max_batch_size in configure() and ensure your workers have outbound HTTPS access.