Telemetry¶
AsyncFlow exposes the full RHAPSODY Telemetry Abstraction Layer (TAL) through a single call on the WorkflowEngine. This page covers what AsyncFlow adds on top of RHAPSODY's core telemetry and how application-level frameworks can consume the event stream.
For the complete reference on events, OTel instruments, JSONL format, Prometheus/Grafana integration, and custom event creation, see the RHAPSODY Telemetry docs.
Enabling telemetry¶
Note
Requires: pip install rhapsody-py[telemetry]
from concurrent.futures import ProcessPoolExecutor
from radical.asyncflow import WorkflowEngine
from rhapsody.backends import ConcurrentExecutionBackend
backend = await ConcurrentExecutionBackend(ProcessPoolExecutor())
flow = await WorkflowEngine.create(backend)
telemetry = await flow.start_telemetry(
resource_poll_interval=5.0, # node CPU/memory/GPU every 5 s
checkpoint_path="./telemetry/", # write a JSONL file (optional)
)
start_telemetry() returns a TelemetryManager. Stop it explicitly when done, or it stops automatically when the workflow engine shuts down:
await flow.shutdown() # also stops telemetry
# or explicitly:
await telemetry.stop()
AsyncFlow-specific events¶
AsyncFlow emits one event that RHAPSODY core does not:
asyncflow.TaskResolved¶
Emitted when all upstream dependencies of a task are satisfied — the moment the task becomes eligible for submission. Tasks with dependencies emit it when the last upstream TaskCompleted fires.
asyncflow.TaskResolved is defined using define_event() rather than being hard-coded in RHAPSODY, keeping the core event schema stable:
# event_type = "asyncflow.TaskResolved"
# task_id = "task.0003"
The asyncflow.TaskResolved is the dependency wait time — how long a task sat in the graph waiting for its inputs.
Full lifecycle from the AsyncFlow perspective¶
-------------------------- User App Layer ---------------------------
CustomEvent1 (User app)
....
CustomEventN (User app)
-------------------------- AsyncFlow Layer --------------------------
TaskCreated ← task registered in DAG (AsyncFlow)
↓
asyncflow.TaskResolved ← all upstream deps satisfied (AsyncFlow)
↓
TaskSubmitted ← handed to RHAPSODY session (AsyncFlow)
↓
-------------------------- RHAPSODY Layer --------------------------
(TaskQueued) ← optional, backend-specific (RHAPSODY Backend)
↓
TaskStarted ← worker begins execution (RHAPSODY Backend)
↓
TaskCompleted | TaskFailed | TaskCanceled (RHAPSODY Backend)
Subscribing to events¶
Use telemetry.subscribe() to receive every event in real time. The callback can be sync or async and runs on the asyncio event loop.
def on_event(event):
print(event.event_type, event.task_id)
telemetry.subscribe(on_event)
Filter by event type:
def on_event(event):
if event.event_type == "TaskCompleted":
print(f"done: {event.task_id} {event.duration_seconds*1000:.0f} ms")
elif event.event_type == "asyncflow.TaskResolved":
print(f"resolved: {event.task_id}")
elif event.event_type == "ResourceUpdate" and event.resource_scope == "per_node":
print(f"cpu: {event.cpu_percent:.1f}% mem: {event.memory_percent:.1f}%")
Measure dependency wait time:
resolved_at = {}
def on_event(event):
if event.event_type == "asyncflow.TaskResolved":
resolved_at[event.task_id] = event.event_time
elif event.event_type == "TaskStarted":
dep_wait_ms = (event.event_time - resolved_at.get(event.task_id, event.event_time)) * 1000
print(f"{event.task_id}: dep_wait={dep_wait_ms:.0f} ms")
Reading results after the run¶
await flow.shutdown()
summary = telemetry.summary()
print(summary["tasks"])
# {'submitted': 200, 'completed': 197, 'failed': 2, 'canceled': 1, 'running': 0}
if summary["duration"]:
print(f"mean task time: {summary['duration']['mean_seconds']*1000:.1f} ms")
Application-level frameworks¶
If you are building a framework on top of AsyncFlow (e.g. a domain-specific orchestrator, an ML pipeline library, a data processing layer), the telemetry bus is the right place to attach application-domain observability — without leaking implementation details back into AsyncFlow or RHAPSODY.
Defining application events¶
Use define_event() to create typed event classes. Names must be namespaced (contain a dot). Base fields (event_id, session_id, event_time, etc.) are protected.
from rhapsody.telemetry import define_event
from rhapsody.telemetry.events import make_event
# Define at module level
DataQualityChecked = define_event(
"myframework.DataQualityChecked",
dataset=str,
score=float,
rows_checked=int,
)
ModelCheckpoint = define_event(
"myframework.ModelCheckpoint",
step=(int, -1),
loss=(float, float("inf")),
metric=str,
)
Emitting application events¶
Emit from your framework code (not from inside task functions — see why):
# After a task result is available
result = await my_pipeline_task()
telemetry.emit(
make_event(
DataQualityChecked,
session_id=telemetry.session_id,
backend="myframework",
dataset="train_2024",
score=result["quality_score"],
rows_checked=result["row_count"],
)
)
telemetry.emit() is synchronous — no await.
Subscribing in a framework layer¶
class PipelineMonitor:
def __init__(self, telemetry):
self.violations = []
self.checkpoints = []
telemetry.subscribe(self._on_event)
def _on_event(self, event):
et = event.event_type
if et == "TaskFailed":
self.violations.append({
"task_id": event.task_id,
"error": event.error_type,
})
elif et == "TaskCanceled":
# Cancellation is intentional — track separately from failures
pass
elif et == "myframework.DataQualityChecked":
if event.score < 0.95:
print(f"[ALERT] quality {event.score:.2%} on {event.dataset}")
elif et == "myframework.ModelCheckpoint":
self.checkpoints.append({"step": event.step, "loss": event.loss})
monitor = PipelineMonitor(telemetry)
Emitting from inside tasks¶
Do not call telemetry.emit() inside a task function. Task functions are serialized by cloudpickle for subprocess execution. Capturing telemetry in the closure serializes its asyncio state, which fails at runtime.
# ✗ WRONG — telemetry captured in closure, fails with cloudpickle
@flow.function_task
async def bad_task(data):
result = process(data)
telemetry.emit(...) # ← do not do this
return result
# ✓ CORRECT — emit after awaiting the task future, in the orchestration layer
async def run():
result_fut = my_task(data)
result = await result_fut
telemetry.emit(make_event(MyEvent, ..., value=result["score"]))
Further reading¶
- RHAPSODY Telemetry Quick Start — enabling telemetry, reading metrics and spans
- Events & Metrics Reference — all event fields, OTel instruments, JSONL format
- Integrations & Extensions — Prometheus, Grafana, Jaeger, MLflow, pandas