RADICAL AsyncFlow (RAF)ΒΆ
RADICAL AsyncFlow (RAF) is a fast asynchronous scripting library built on top of asyncio for building powerful asynchronous workflows on HPC, clusters, and local machines. It supports pluggable execution backends with intuitive task dependencies and workflow composition.
-
β‘ Powerful asynchronous workflows β Compose complex async and sync workflows easily, with intuitive task dependencies.
-
π Portable across environments β Run seamlessly on HPC systems, clusters, and local machines with pluggable execution backends.
-
π§© Flexible and extensible β Supports composite workflows management.
AsyncFlow ships with the following built-in execution backends:
LocalExecutionBackendβ local execution using Python's concurrent.futures (ThreadPoolExecutor / ProcessPoolExecutor)NoopExecutionBackendβ no-op backend for testing anddry_runmode
For HPC execution, install RHAPSODY (pip install rhapsody-py) which provides additional backends β see Execution Backends & HPC Integration for the full guide:
- Radical.Pilot β distributed HPC execution
- Dask β parallel computing with Dask distributed
- Concurrent β extended thread/process pool execution
- Dragon β high-performance distributed execution
Basic Usage
import asyncio
from concurrent.futures import ThreadPoolExecutor
from radical.asyncflow import WorkflowEngine, LocalExecutionBackend
async def run():
# Create backend and workflow
backend = await LocalExecutionBackend(ThreadPoolExecutor(max_workers=3))
flow = await WorkflowEngine.create(backend=backend)
@flow.executable_task
async def task1():
return "echo $RANDOM"
@flow.function_task
async def task2(t1_result):
return int(t1_result.strip()) * 2 * 2
# create the workflow
t1_fut = task1()
t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it)
# shutdown the execution backend
await flow.shutdown()
if __name__ == "__main__":
asyncio.run(run())