Getting Started: Run a Single Workflow with Task Dependencies¶
This guide walks you through running a single workflow with task dependencies using radical.asyncflow
.
You’ll learn how to define tasks, set dependencies, execute the workflow, and shut down the engine gracefully.
Prerequisites¶
- Make sure you have installed
radical.asyncflow
in your Python environment. - You also need a working Jupyter Notebook or Python >=3.8.
Import the necessary modules¶
You’ll need time
, asyncio
, and the key classes from radical.asyncflow
.
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from radical.asyncflow import WorkflowEngine, ConcurrentExecutionBackend
Set up the workflow engine¶
We initialize the workflow engine with a ConcurrentExecutionBackend
using Python’s ThreadPoolExecutor
or ProcessPoolExecutor
.
backend = await ConcurrentExecutionBackend(ThreadPoolExecutor())
flow = await WorkflowEngine.create(backend=backend)
Define the tasks¶
Define your tasks and specify dependencies by passing other tasks as arguments.
@flow.function_task
async def task1(*args):
print(f"Running task1 at {time.time()}")
await asyncio.sleep(1)
return time.time()
@flow.function_task
async def task2(*args):
print(f"Running task2 at {time.time()}")
await asyncio.sleep(1)
return time.time()
@flow.function_task
async def task3(t1_result, t2_result):
print(f"Running task3 after task1 and task2 at {time.time()}")
await asyncio.sleep(1)
return time.time()
Note
task3
depends on the outputs oftask1
andtask2
.- You express this dependency by calling
task3(task1(), task2())
. task1
andtask2
will be automatically resolved during runtime and their values will be assigned totask3
accordingly.
Run the workflow¶
We now submit the tasks with their dependencies and wait for the final task to complete.
async def main():
start_time = time.time()
# Define the task graph
t1 = task1()
t2 = task2()
t3 = task3(t1, t2)
# Wait for the final task to complete
await t3
end_time = time.time()
print(f"\nWorkflow completed in: {end_time - start_time:.2f} seconds")
# Shutdown the workflow engine
await flow.shutdown()
asyncio.run(main())
Example Output¶
Here’s an example of the output you might see:
Show Output Log
Running task1 at 1721847632.3501
Running task2 at 1721847632.3505
Running task3 after task1 and task2 at 1721847633.3534
Workflow completed in: 0.02 seconds
Warning
Make sure to await the shutdown of the WorkflowEngine
before your script exits. Otherwise, resources may leak.
Summary¶
You now know how to:
- Define a set of tasks with dependencies.
- Submit them to the workflow engine.
- Run the workflow asynchronously.
- Shut down the engine properly.
Next Steps¶
- Learn how to run multiple workflows concurrently.
- Explore advanced backends like
DaskExecutionBackend
orRadicalExecutionBackend
. - Integrate with HPC schedulers.