Skip to content

RADICAL AsyncFlow (RAF)ΒΆ

RADICAL AsyncFlow (RAF) is a fast asynchronous scripting library built on top of asyncio for building powerful asynchronous workflows on HPC, clusters, and local machines. It supports pluggable execution backends with intuitive task dependencies and workflow composition.

  • ⚑ Powerful asynchronous workflows β€” Compose complex async and sync workflows easily, with intuitive task dependencies.

  • 🌐 Portable across environments β€” Run seamlessly on HPC systems, clusters, and local machines with pluggable execution backends.

  • 🧩 Flexible and extensible β€” Supports composite workflows management.

AsyncFlow ships with the following built-in execution backends:

  • LocalExecutionBackend β€” local execution using Python's concurrent.futures (ThreadPoolExecutor / ProcessPoolExecutor)
  • NoopExecutionBackend β€” no-op backend for testing and dry_run mode

For HPC execution, install RHAPSODY (pip install rhapsody-py) which provides additional backends β€” see Execution Backends & HPC Integration for the full guide:

  • Radical.Pilot β€” distributed HPC execution
  • Dask β€” parallel computing with Dask distributed
  • Concurrent β€” extended thread/process pool execution
  • Dragon β€” high-performance distributed execution

Basic Usage

import asyncio

from concurrent.futures import ThreadPoolExecutor
from radical.asyncflow import WorkflowEngine, LocalExecutionBackend

async def run():
    # Create backend and workflow
    backend = await LocalExecutionBackend(ThreadPoolExecutor(max_workers=3))
    flow = await WorkflowEngine.create(backend=backend)

    @flow.executable_task
    async def task1():
        return "echo $RANDOM"

    @flow.function_task
    async def task2(t1_result):
        return int(t1_result.strip()) * 2 * 2

    # create the workflow
    t1_fut = task1()
    t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it)

    # shutdown the execution backend
    await flow.shutdown()

if __name__ == "__main__":
    asyncio.run(run())