Best Practices for AsyncFlow¶
AsyncFlow is built on top of Python’s asyncio
, combining asynchronous execution and task dependency management with a simple API.
This page outlines recommended practices when using AsyncFlow effectively in your projects.
Why Best Practices Matter¶
Async programming can easily lead to:
- Hard-to-debug concurrency bugs 🚨
- Deadlocks or race conditions 💥
- Inefficient task scheduling ⚠️
By following these best practices, you can:
- Make your workflows reliable 🏅
- Maximize concurrency 🔥
- Keep code maintainable 🔧
Structure Your Tasks Clearly¶
- Define small, composable tasks that each do one thing well.
- Prefer pure functions or side-effect-free coroutines as tasks.
- Use
@flow.function_task
or@flow.executable_task
decorators consistently.
Tip
Name your tasks clearly to improve logs and debugging
@flow.function_task
async def fetch_data(url):
...
Use Dependencies Correctly¶
Tasks can depend on the output of other tasks: - Make your dependency graph explicit by passing tasks as arguments. - Don’t block unnecessarily: let AsyncFlow schedule dependencies.
Tip
Tasks that don’t depend on each other run in parallel automatically.
asyncflow = await WorkflowManager.create(dry_run=True)
async def task_a():
asyncio.sleep(2) # (1)!
async def task_a():
asyncio.sleep(2) # (2)!
async def task_c(task_a_fut, task_b_fut):
asyncio.sleep(2)
result = task_c(task_a(), task_b()) # (3)!
- Task A will run asynchronously (independently)
- Task B will run asynchronously (independently)
- Task C will wait implicitly for other tasks
Await Only at the Top Level¶
- Inside your workflow logic, don’t await intermediate tasks.
- Let AsyncFlow build the graph; only await the final or root tasks you care about.
- Awaiting early forces serialization and kills concurrency.
Warning
Avoid this as it will be slower:
await task_a()
await task_b()
result = await task_c(task_a(), task_b())
Use await flow.shutdown()
¶
Always shut down the flow explicitly when finished: - Releases resources (e.g., thread pools, processes). - Ensures a clean exit.
At the end of your async main:
await flow.shutdown()
Logging & Debugging¶
Enable detailed logs to diagnose issues:
export RADICAL_LOG_LVL=DEBUG
Logs show task dependencies, execution order, errors.
Clean Shutdown¶
- Use
try
/finally
in your async main to ensureflow.shutdown()
is always called, even on exceptions.
Success
- Define tasks clearly and concisely.
- Pass tasks as arguments to express dependencies.
- Only await at the top level.
- Shut down cleanly.
- Log at
DEBUG
level when needed.