Changelog¶
All notable changes to RADICAL AsyncFlow will be documented in this file.
The format is based on Keep a Changelog.
[Unreleased]¶
[0.4.0] - 2026-05-18¶
Added¶
-
WorkflowEngine.workflow_scope(workflow_id=None)— new async context manager that groups all tasks registered inside under a sharedasyncflow.workflow_id. Internally sets_workflow_id_ctx(a module-levelContextVar), which asyncio copies into everycreate_task/gatherbranch automatically so concurrent workflow instances remain isolated. When telemetry is active, also opens an OTel"workflow"span, making all task spans inside structural children in the trace hierarchy. Auto-generates a short UUID ifworkflow_idisNone. -
_workflow_id_ctxContextVar — per-instanceContextVar(defaultNone) that carries the active workflow ID across asyncio task boundaries without explicit argument passing. EachWorkflowEngineinstance owns its own uniquely-named ContextVar (asyncflow_workflow_id.<uid>), preventing context leakage when multiple engines are used within the same coroutine.@flow.blockexecution sets it to the block's UID so tasks inside a block inherit the workflow ID automatically. -
WorkflowEngine.start_telemetry()new parameters —span_processors,metric_readers,resource— forwarded toTelemetryManager.__init__(), mirroringSession.start_telemetry(). Enables passing pre-built OTel exporters (OTLP, Prometheus, Jaeger) without any RHAPSODY code changes. -
Span enricher for
asyncflow.workflow_id— registered automatically bystart_telemetry()viaregister_span_enricher(). Stampsasyncflow.workflow_idonto every task OTel span when the task was created inside aworkflow_scope()or@flow.block. Enables per-workflow Gantt views and span filtering in Jaeger / Grafana Tempo. -
_emit()workflow_idkwarg — whenworkflow_idis set, injectsasyncflow.workflow_idinto the event'sattributesdict. All task lifecycle events emitted byWorkflowEngine(TaskCreated, asyncflow.TaskResolved, TaskSubmitted) carry the active workflow ID. -
Example
01-workflow_grouping.py— complete rewrite as an HPC Campaign Manager simulation. Models 4 workflow types with resource tracking and dependency chains: simulate(4 tasks, GPU, no deps) — molecular dynamics runsanalyze(4 tasks, GPU, deps=simulate) — post-processing per simulationtrain(8 tasks, GPU, no deps) — distributed ML training-
evaluate(8 tasks, CPU, deps=train) — lightweight model evaluation UsesResourcePool(asyncio-queue-based GPU/CPU slot tracking) and emitscampaign.ResourceAssignedcustom events to record per-instance resource assignments in the JSONL checkpoint. -
plot_campaign.py— new plotting script producing a two-panel Campaign Manager timeline figure: - Top panel: Gantt chart with rows per workflow instance, bars coloured by workflow type and labelled with the assigned resource (
gpu:N/cpu:N). Right-margin annotations show priority / cpu / gpu per type. Right column renders a config table, scheduler description box, and dependency graph. -
Bottom panel: GPU (left axis) and CPU (right axis) utilisation as step functions over elapsed time, with total-capacity dashed reference lines.
-
capture_stdiodecorator parameter —@flow.executable_task(capture_stdio=True)redirects stdout/stderr from executable tasks directly to files instead of collecting them in memory. The awaited future resolves to the stdout file path ({work_dir}/{uid}/{task_uid}.stdout) instead of a decoded string. Honoured by all RHAPSODY backends (ConcurrentExecutionBackend,DaskExecutionBackend,DragonExecutionBackendV3); silently ignored for function tasks.capture_stdiois stored as a top-level field incomp_desc(not nested insidetask_backend_specific_kwargs) so RHAPSODY backends can read it astask.get("capture_stdio"). WorkflowEngineuidandwork_dirparameters —WorkflowEngine.create(uid=..., work_dir=...)sets the engine's unique identifier (UUID) and working directory. The engine is now the single authority forbackend._work_dir; it setsbackend._work_dir = {work_dir}/{uid}and creates the directory before any tasks run (mirrorsSession.add_backend()in RHAPSODY).WorkflowEngine._attach_backend()— new internal method that replaces the inlineregister_callbackloop. Handles_work_dirinjection,is_attachedflag, and callback registration in one place.- Unit tests for
capture_stdiofield placement, default value,_work_dirauthority, and end-to-end file I/O withConcurrentExecutionBackend.
Fixed¶
-
Block spans parented correctly —
execute_block()now benefits from thespan_scope()session-span fallback added in RHAPSODY: block spans that run in a context with no active OTel span (e.g. spawned fromrun()beforestart_telemetry()was awaited) now correctly nest under the session root span instead of floating as unrooted traces. -
execute_block()dead branch removed — therun_in_executorcode path (used when the wrapped function was sync) is removed.WorkflowEngineenforces a strict async API; all block functions must beasync def.
Changed¶
-
execute_block()usesnullcontext— the no-telemetry code path uses stdlibnullcontext(Python >= 3.7) instead of the former custom_null_context()helper, which is removed. -
execute_block()sets_workflow_id_ctx— the block's UID is set as the active_workflow_id_ctxfor the duration of block execution so every task registered inside the block inherits it asasyncflow.workflow_idwithout requiring an explicitworkflow_scope()call.
Docs¶
docs/telemetry.md:- Added "Forwarding to an external backend" section with corrected
span_processors/metric_readerscode examples (replaces the brokenset_tracer_provider()pattern). - Added "
workflow_scope()context manager" section with usage examples and auto-ID generation. - Added "OTel span hierarchy" section showing the four-level
session -> workflow -> block -> tasktree and explaining howasyncflow.workflow_idpropagates to every span attribute and JSONL event. - Updated
start_telemetry()signature to includespan_processors,metric_readers, andresourceparameters.
[0.3.1] - 2026-03-09¶
Fixed¶
executable_taskcommand parsing —shlex.splitis now applied correctly to the command string returned by the decorated function, ensuring commands with arguments and quoted strings are parsed properly.
[0.3.0] - 2026-02-24¶
Added¶
prompt_taskdecorator — new first-class task type alongsidefunction_taskandexecutable_task. The decorated async function returns a prompt string that is forwarded to an AI execution backend (e.g.DragonVllmInferenceBackendfrom RHAPSODY).NoopExecutionBackendreturns"Dummy Prompt Output"for testing;LocalExecutionBackendraisesNotImplementedErrorwith a clear message directing users to register an AI backend.- Multi-backend registry —
WorkflowEngine.create(backend=...)now accepts either a single backend or a list of pre-initialized named backends (each must expose a.nameattribute). The first backend in the list is the default. Built-in backends (LocalExecutionBackend,NoopExecutionBackend) gain anameparameter defaulting to"default". - Per-task backend routing — task decorators (
function_task,executable_task,prompt_task) accept an optionalbackend="<name>"parameter to route a specific task to a named backend. Tasks withoutbackend=are sent to the default backend. - Parallel backend submission — when a batch contains tasks routed to multiple backends,
submit()dispatches all backend calls concurrently viaasyncio.gather.shutdown()similarly drains all registered backends in parallel. - Example
04-concurrent_backends.py— demonstrates a compute + AI dual-backend workflow running document processing pipelines concurrently. - Tests — unit tests (
tests/unit/test_prompt_task.py) and integration tests (tests/integration/test_multi_backend.py) coveringprompt_taskregistration, validation, multi-backend routing, parallel submission, and graceful shutdown.
Fixed¶
async_wrapperexception propagation — errors raised during task registration (e.g.prompt_taskreturningNone, orexecutable_taskreturning a non-string) are now propagated to the task'sasyncio.Futureinstead of being silently swallowed by the detachedasyncio.create_task. Previously, such errors would causeawait task(...)to hang indefinitely.
Changed¶
- Documentation — all HPC backend references across docs, examples, and the Jupyter tutorial now point to the official RHAPSODY documentation instead of the GitHub repository, and include a direct link to the AsyncFlow integration guide.
- Examples — HPC example scripts (
04-dask_execution_backend.py,05-radical_execution_backend.py,06-dragon_execution_backend.py) updated with module-level docstrings referencing RHAPSODY docs and the AsyncFlow integration guide. - Proof-of-concept examples — all basic/getting-started examples (
basic.md,async_workflows.md,composite_workflow.md, examples 01–03) consistently useLocalExecutionBackend; RHAPSODY backends are only shown in dedicated HPC sections. - Install command — RHAPSODY install command corrected to
pip install rhapsody-pyacross all docs and examples.
[0.2.0] - 2026-02-12¶
Added¶
- Local execution backend —
LocalExecutionBackendships built-in, supporting bothThreadPoolExecutorandProcessPoolExecutorwithcloudpickleserialization. - No-op backend —
NoopExecutionBackendfor testing anddry_runworkflows without computational overhead. - HPC execution via RHAPSODY — HPC backends (
RadicalExecutionBackend,DaskExecutionBackend,DragonExecutionBackendV3) are now provided by the separate RHAPSODY package and plug into AsyncFlow via simple import. - Composite workflows —
@flow.blockdecorator for grouping tasks into logical units with support for nested blocks and inter-block dependencies. - Task dependency resolution — automatic DAG-based scheduling with implicit and explicit data dependencies between
function_taskandexecutable_tasktypes. - Pluggable backend architecture — any backend conforming to the execution interface can be passed to
WorkflowEngine.create(), enabling custom and third-party backends. - Pre-commit pipeline — ruff, docformatter, typos, actionlint, detect-secrets, and standard pre-commit hooks.
- CI/CD — GitHub Actions workflows for unit tests, integration tests, pre-commit checks, and documentation deployment across Python 3.9-3.13.
Changed¶
- Backend decoupling — HPC backends (Dask, RadicalPilot, Concurrent, Dragon) moved from
radical.asyncflowto the standalonerhapsodypackage. AsyncFlow now only shipsLocalExecutionBackendandNoopExecutionBackend. - Imports for HPC backends — HPC backends are now imported from
rhapsody.backendsinstead ofradical.asyncflow. - Documentation — all docs and examples updated to reflect the new architecture, with clear separation between built-in local backends and RHAPSODY HPC backends.
[0.1.0] - 2025-12-07¶
Initial public release of RADICAL AsyncFlow.
Added¶
- Async-first workflow engine with DAG-based task scheduling.
- Support for async and sync function execution within workflows.
- Execution backend abstraction with pluggable backend interface.
- RADICAL-based HPC execution backend integration.
- Agentic workflow support.
- Event-driven shutdown mechanism for backend lifecycle management.
- Task cancellation signaling and cooperative cancellation support.
- Enhanced async context manager support for workflow execution.
- Unit and integration testing framework.
- Initial project documentation.
- Migration to
pyproject.tomlwithtox-based test environments.
Changed¶
- Renamed project from
flowtoasyncflow. - Removed legacy synchronous execution paradigm in favor of fully async architecture.
- Refactored execution loop for thread-based backend.
- Updated RADICAL execution backend callback mechanism to support service-style operation.
- Improved error propagation for failed tasks.
- Refactored internal utilities to remove dependency on
radical.utils. - Eliminated redundant data dependency linking logic.
Fixed¶
- Corrected handling of task failure without explicit
wait(). - Fixed
handle_task_failurelogic for consistent error reporting. - Resolved process-based function execution launch issues.
- Ensured
pre_execdirectives are properly applied. - Addressed multiple execution edge cases in RADICAL backend.