Events, Metrics & Spans Reference¶
This page is the authoritative reference for every piece of data that RHAPSODY telemetry emits: lifecycle events, resource updates, OTel metrics instruments, and span structure.
Event model¶
All telemetry originates as frozen Python dataclasses (BaseEvent subclasses). They are:
- Immutable — safe for zero-copy fan-out to multiple subscribers.
- Timestamped — every event carries two wall-clock times:
event_time(when it occurred in the task's history) andemit_time(when it entered the event bus). Their difference is the queue latency. - Identified — every event carries a unique
event_id(UUID4 hex), asession_id, and abackendname. - OTel-enriched in the JSONL file — the serializer adds
trace_idandspan_idfields that link the event to the correct OTel span.
Base fields (all events)¶
| Field | Type | Description |
|---|---|---|
event_id |
str |
UUID4 hex, unique per event |
event_type |
str |
Discriminator string (matches the class name) |
event_time |
float |
Wall-clock time the event occurred (time.time()) |
emit_time |
float |
Wall-clock time of queue insertion (time.time()) |
session_id |
str |
Owning session identifier |
backend |
str |
Backend name (e.g. "dragon_v3", "dask", "concurrent") |
task_id |
str \| None |
Task UID — None for session-level events |
node_id |
str \| None |
Hostname or worker address — None when not applicable |
attributes |
dict |
Optional metadata (see per-event notes below) |
Session lifecycle events¶
SessionStarted¶
Emitted once when await telemetry.start() activates the dispatch loop.
In OTel terms this event coincides with the root session span being opened. Its span_id in the JSONL file is the session span's ID — the parent of every task span.
SessionEnded¶
Emitted once when await telemetry.stop() is called.
| Extra field | Type | Description |
|---|---|---|
duration_seconds |
float |
Total wall-clock duration of the session |
The span_id in the JSONL file matches the session span that is closed at this point.
Task lifecycle events¶
The canonical lifecycle is:
(TaskCreated) → TaskSubmitted → (TaskQueued) → TaskStarted → TaskCompleted
↘ TaskFailed
↘ TaskCanceled
TaskCreated¶
Emitted by Session.submit_tasks() when the future is bound and the task is registered, before backend routing and submission.
attributes = {
"executable": str, # function name or binary path
"task_type": str, # "compute", "function", etc.
}
Because TaskCreated fires before routing, its backend field is empty (""). The assigned backend appears on the subsequent TaskSubmitted event.
TaskSubmitted¶
Emitted by the session immediately after routing assigns a backend to the task.
attributes = {
"executable": str, # binary path, function name, or model name
"task_type": str, # "compute", "function", "ai", …
}
OTel correlation
At submission time no task span exists yet. The JSONL trace_id is the session trace ID (so the event is queryable within the right trace), but span_id is null.
TaskQueued¶
Emitted just before the task is handed to the backend's internal queue. Marks the boundary between session-level routing latency and backend-level scheduling latency.
Same OTel correlation as TaskSubmitted — trace-linked but no task span yet.
TaskStarted¶
Emitted when the task transitions to RUNNING state. This is the authoritative "execution began" timestamp.
OTel correlation
Opening this event opens the task OTel span (named "task") as a child of the session span. The JSONL trace_id and span_id are those of the freshly opened task span.
TaskCompleted¶
Emitted when the task reaches DONE state.
| Extra field | Type | Description |
|---|---|---|
duration_seconds |
float |
Wall-clock time from RUNNING → DONE. 0.0 if RUNNING was never recorded (incomplete_lifecycle=True in attributes). |
attributes = {
"executable": str,
"task_type": str,
# optionally:
"incomplete_lifecycle": True, # STARTED was not recorded
}
The task span is closed at this point. span_id in the JSONL is the closed span's ID.
TaskFailed¶
Emitted when the task reaches FAILED state.
| Extra field | Type | Description |
|---|---|---|
duration_seconds |
float |
Same semantics as TaskCompleted. |
error_type |
str |
Python exception class name, or "unknown". |
attributes = {
"executable": str,
"task_type": str,
"error_type": str,
# optionally:
"incomplete_lifecycle": True,
}
TaskCanceled¶
Emitted when a task reaches CANCELED state (user-initiated cancellation or backend-level abort). Distinct from TaskFailed — cancellation is intentional, not an error.
| Extra field | Type | Description |
|---|---|---|
duration_seconds |
float |
Wall-clock time from RUNNING → CANCELED. 0.0 if RUNNING was never recorded. |
attributes = {
"executable": str,
"task_type": str,
# optionally:
"incomplete_lifecycle": True, # task canceled before RUNNING was seen
}
No error_type field. To distinguish cancellations from failures in a subscriber, filter on event.event_type == "TaskCanceled".
Resource update events¶
ResourceUpdate¶
Emitted periodically by backend adapters (default every 5 s). Every ResourceUpdate carries a resource_scope discriminator that tells consumers exactly what the event represents and which fields are populated.
resource_scope |
gpu_id |
Populated fields |
|---|---|---|
"per_node" |
null |
cpu_percent, memory_percent, gpu_percent (aggregate), disk_*, net_* |
"per_gpu" |
0..N |
gpu_percent, gpu_id only — all others are null |
resource_scope is computed automatically from gpu_id and is always present. Consumers should use it as the primary discriminator instead of checking gpu_id is None.
Full field reference¶
| Field | Type | per_node |
per_gpu |
Description |
|---|---|---|---|---|
resource_scope |
str |
"per_node" |
"per_gpu" |
Scope discriminator, always set |
gpu_id |
int \| None |
null |
0..N |
Device index; null on node-level events |
cpu_percent |
float \| None |
✅ float 0–100 | null |
Node CPU utilization |
memory_percent |
float \| None |
✅ float 0–100 | null |
Node memory utilization |
gpu_percent |
float \| None |
✅ or null |
✅ float 0–100 | GPU utilization; null on per_node when backend has no GPU |
disk_read_bytes |
float \| None |
✅ or null |
null |
Bytes read since previous poll (delta). null on first poll |
disk_write_bytes |
float \| None |
✅ or null |
null |
Bytes written since previous poll |
net_sent_bytes |
float \| None |
✅ or null |
null |
Bytes sent since previous poll |
net_recv_bytes |
float \| None |
✅ or null |
null |
Bytes received since previous poll |
OTel correlation
ResourceUpdate events are anchored to the session span, not to a task span. Their trace_id and span_id in the JSONL file match the root session span, making all resource data queryable within the session's trace in any OTel-compatible tool.
Schema examples¶
per_node event (gpu_id=null, no GPU on this node):
{
"resource_scope": "per_node",
"gpu_id": null,
"cpu_percent": 41.2,
"memory_percent": 28.7,
"gpu_percent": null,
"disk_read_bytes": 4096.0,
"disk_write_bytes": 0.0,
"net_sent_bytes": 512.0,
"net_recv_bytes": 1024.0
}
per_node event (gpu_id=null, aggregate GPU across 4 devices):
{
"resource_scope": "per_node",
"gpu_id": null,
"cpu_percent": 78.3,
"memory_percent": 62.1,
"gpu_percent": 87.0,
"disk_read_bytes": 8192.0,
"disk_write_bytes": 2048.0,
"net_sent_bytes": 1024.0,
"net_recv_bytes": 4096.0
}
per_gpu event (one per device):
{
"resource_scope": "per_gpu",
"gpu_id": 2,
"cpu_percent": null,
"memory_percent": null,
"gpu_percent": 75.2,
"disk_read_bytes": null,
"disk_write_bytes": null,
"net_sent_bytes": null,
"net_recv_bytes": null
}
Node-level vs per-device semantics¶
One poll cycle on a 4-GPU node emits 5 ResourceUpdate events:
resource_scope="per_node", gpu_id=null → cpu, mem, disk, net, max-GPU%
resource_scope="per_gpu", gpu_id=0 → gpu_percent only
resource_scope="per_gpu", gpu_id=1 → gpu_percent only
resource_scope="per_gpu", gpu_id=2 → gpu_percent only
resource_scope="per_gpu", gpu_id=3 → gpu_percent only
To get CPU/memory for a specific GPU node, join per_gpu events to the per_node event on (session_id, node_id) within the same poll window.
Adapter coverage matrix¶
| Metric | resource_scope |
Concurrent | Dask | Dragon |
|---|---|---|---|---|
cpu_percent |
per_node only |
✅ | ✅ | ✅ |
memory_percent |
per_node only |
✅ | ✅ | ✅ |
gpu_percent (aggregate) |
per_node |
✅ if NVIDIA present | ❌ | ✅ |
gpu_percent + gpu_id |
per_gpu |
✅ if NVIDIA present | ❌ | ✅ |
disk_read/write_bytes |
per_node only |
✅ (delta) | ✅ (delta, first poll = null) | ✅ (delta) |
net_sent/recv_bytes |
per_node only |
✅ (delta) | ❌ | ✅ (delta) |
Custom events — define_event()¶
Application code and orchestration layers can define their own event types and emit them through the same telemetry bus — without modifying RHAPSODY source.
from rhapsody.telemetry import define_event
from rhapsody.telemetry.events import make_event
# Define a typed event class at module level
LineTimer = define_event(
"myapp.LineTimer",
label=str,
duration_ms=float,
)
define_event(name, **fields) creates a frozen BaseEvent subclass at runtime via dataclasses.make_dataclass. It enforces two rules:
- Namespace required —
namemust contain a.(e.g."myapp.LineTimer"). This prevents collisions with RHAPSODY's own event names. - Base fields protected — fields declared on
BaseEvent(event_id,event_time,session_id, etc.) cannot be shadowed. Attempting to do so raisesValueErrorat definition time.
Emitting a custom event¶
import time
from rhapsody.telemetry.events import make_event
t0 = time.time()
# ... your code ...
duration_ms = (time.time() - t0) * 1000
telemetry.emit(
make_event(
LineTimer,
session_id=telemetry.session_id,
backend="app",
label="my_block",
duration_ms=duration_ms,
)
)
make_event(cls, *, session_id, backend, **fields) stamps the event with a fresh event_id, event_time, and emit_time. Pass event_time=t0 explicitly if you want the event timestamped to when the block started rather than when it was emitted.
telemetry.emit() is synchronous — no await needed.
Receiving custom events in a subscriber¶
def on_event(event):
if event.event_type == "myapp.LineTimer":
print(f"{event.label}: {event.duration_ms:.1f} ms")
telemetry.subscribe(on_event)
Custom events flow through the same dispatch loop as system events and appear in the JSONL checkpoint file with their full field set.
Field type defaults¶
When define_event is called with a bare type (not a (type, default) tuple), defaults are assigned automatically:
| Field type | Default |
|---|---|
float |
0.0 |
int |
0 |
str |
"" |
bool |
False |
| Any other type | None |
To specify a custom default, pass a (type, default) tuple:
Checkpoint = define_event(
"myapp.Checkpoint",
step=(int, -1), # default -1
loss=(float, float("inf")),
label=str, # default ""
)
OTel metrics instruments¶
The TelemetryManager registers the following instruments on a rhapsody meter:
| Instrument name | Type | Unit | Description |
|---|---|---|---|
tasks_submitted |
Counter | tasks | Total tasks submitted to the session |
tasks_started |
Counter | tasks | Total tasks that transitioned to RUNNING |
tasks_completed |
Counter | tasks | Total tasks that reached DONE |
tasks_failed |
Counter | tasks | Total tasks that reached FAILED |
tasks_canceled |
Counter | tasks | Total tasks that reached CANCELED |
tasks_running |
UpDownCounter | tasks | Tasks currently in RUNNING state |
task_duration_seconds |
Histogram | seconds | Per-task execution duration (RUNNING → DONE/FAILED) |
node_cpu_utilization |
UpDownCounter | % | Current CPU % per node (last-seen delta tracking) |
node_memory_utilization |
UpDownCounter | % | Current memory % per node |
node_gpu_utilization |
UpDownCounter | % | Current GPU % per node |
All instruments carry session_id and backend as label dimensions. Task instruments additionally carry task_id, executable, and task_type.
OTel span structure¶
RHAPSODY emits two span types, both in the "rhapsody" tracer scope:
Session span (name="session")¶
session span
├── attributes:
│ session_id = "session.0001"
│ backend = "rhapsody"
└── parent: none (this is the trace root)
All task spans and resource events are linked to this trace via the same trace_id.
Task span (name="task")¶
task span
├── parent: session span
├── attributes:
│ session_id = "session.0001"
│ backend = "dragon_v3"
│ task_id = "task.0042"
│ executable = "/usr/bin/python"
│ task_type = "compute"
│ status = "completed" | "failed"
│ error_type = "RuntimeError" (failed tasks only)
└── duration: TaskStarted.event_time → TaskCompleted.event_time
The span duration is derived from the backend's own task history timestamps — not from event queue latency — so it reflects true execution time.
Trace–span correlation in the JSONL file¶
Every line in the checkpoint file carries two extra keys added at serialization time:
| JSONL key | Source |
|---|---|
trace_id |
Hex string of the OTel trace ID |
span_id |
Hex string of the correlated span ID (null for TaskSubmitted/TaskQueued) |
name |
Alias for event_type (OTel-compatible) |
This makes the JSONL file directly importable into any OTel-aware tool as a timeline.
JSONL checkpoint format¶
Each line in the .telemetry.jsonl file is a JSON object with a "section" discriminator:
{"section": "event", "event_type": "SessionStarted", "trace_id": "0x…", "span_id": "0x…", …}
{"section": "event", "event_type": "TaskStarted", "task_id": "…", "trace_id": "0x…", "span_id": "0x…", …}
{"section": "event", "event_type": "ResourceUpdate", "resource_scope": "per_node", "gpu_id": null, "cpu_percent": 42.3, "memory_percent": 28.7, "gpu_percent": null, "disk_read_bytes": 4096.0, …}
{"section": "event", "event_type": "ResourceUpdate", "resource_scope": "per_gpu", "gpu_id": 0, "cpu_percent": null, "memory_percent": null, "gpu_percent": 75.2, "disk_read_bytes": null, …}
{"section": "metric", "tasks_submitted": 100, "tasks_completed": 98, "tasks_running": 2, …}
{"section": "span", "name": "task", "span_id": "0x…", "trace_id": "0x…", "parent_span_id": "0x…", "duration_ms": 1234.5, …}
The file is written line-buffered, so it is readable during a live run.