radical.asyncflow.backends.execution.dask_parallel¶
DaskExecutionBackend ¶
DaskExecutionBackend(resources: Optional[Dict] = None)
Bases: BaseExecutionBackend
An async-only Dask execution backend for distributed task execution.
Handles task submission, cancellation, and proper async event loop handling for distributed task execution using Dask. All functions must be async.
Usage
backend = await DaskExecutionBackend(resources)
or¶
async with DaskExecutionBackend(resources) as backend: await backend.submit_tasks(tasks)
Initialize the Dask execution backend (non-async setup only).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resources | Optional[Dict] | Dictionary of resource requirements for tasks. Contains configuration parameters for the Dask client initialization. | None |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
|
__await__ ¶
__await__()
Make DaskExecutionBackend awaitable like Dask Client.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
41 42 43 |
|
register_callback ¶
register_callback(callback: Callable) -> None
Register a callback for task state changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback | Callable | Function to be called when task states change. Should accept task and state parameters. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
66 67 68 69 70 71 72 73 |
|
get_task_states_map ¶
get_task_states_map()
Retrieve a mapping of task IDs to their current states.
Returns:
Name | Type | Description |
---|---|---|
StateMapper | Object containing the mapping of task states for this backend. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
75 76 77 78 79 80 81 |
|
cancel_task async
¶
cancel_task(uid: str) -> bool
Cancel a task by its UID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uid | str | The UID of the task to cancel. | required |
Returns:
Name | Type | Description |
---|---|---|
bool | bool | True if the task was found and cancellation was attempted, False otherwise. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
|
submit_tasks async
¶
submit_tasks(tasks: List[Dict[str, Any]]) -> None
Submit async tasks to Dask cluster.
Processes a list of tasks and submits them to the Dask cluster for execution. Filters out future objects from arguments and validates that all functions are async coroutine functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks | List[Dict[str, Any]] | List of task dictionaries containing: - uid: Unique task identifier - function: Async callable to execute - args: Positional arguments - kwargs: Keyword arguments - executable: Optional executable path (not supported) - task_backend_specific_kwargs: Backend-specific parameters | required |
Note
Executable tasks are not supported and will result in FAILED state. Only async functions are supported - sync functions will result in FAILED state. Future objects are filtered out from arguments as they are not picklable.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
|
cancel_all_tasks async
¶
cancel_all_tasks() -> int
Cancel all currently running/pending tasks.
Returns:
Type | Description |
---|---|
int | Number of tasks that were successfully cancelled |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
|
link_explicit_data_deps ¶
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)
Handle explicit data dependencies between tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src_task | The source task that produces the dependency. Defaults to None. | None | |
dst_task | The destination task that depends on the source. Defaults to None. | None | |
file_name | Name of the file that represents the dependency. Defaults to None. | None | |
file_path | Full path to the file that represents the dependency. Defaults to None. | None |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
221 222 223 224 225 226 227 228 229 230 |
|
link_implicit_data_deps ¶
link_implicit_data_deps(src_task, dst_task)
Handle implicit data dependencies for a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src_task | The source task that produces data. | required | |
dst_task | The destination task that depends on the source task's output. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
232 233 234 235 236 237 238 239 |
|
state async
¶
state() -> str
Get the current state of the Dask execution backend.
Returns:
Type | Description |
---|---|
str | Current state of the backend as a string. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
|
task_state_cb async
¶
task_state_cb(task: dict, state: str) -> None
Callback function invoked when a task's state changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | dict | Dictionary containing task information and metadata. | required |
state | str | The new state of the task. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
257 258 259 260 261 262 263 264 |
|
build_task async
¶
build_task(task: dict) -> None
Build or prepare a task for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | dict | Dictionary containing task definition, parameters, and metadata required for task construction. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
266 267 268 269 270 271 272 273 |
|
shutdown async
¶
shutdown() -> None
Shutdown the Dask client and clean up resources.
Closes the Dask client connection, clears task storage, and handles any cleanup exceptions gracefully.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
|
__aenter__ async
¶
__aenter__()
Async context manager entry.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
304 305 306 307 308 |
|
__aexit__ async
¶
__aexit__(exc_type, exc_val, exc_tb)
Async context manager exit.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
310 311 312 |
|
create async
classmethod
¶
create(resources: Optional[Dict] = None)
Alternative factory method for creating initialized backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resources | Optional[Dict] | Configuration parameters for Dask client initialization. | None |
Returns:
Type | Description |
---|---|
Fully initialized DaskExecutionBackend instance. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
315 316 317 318 319 320 321 322 323 324 325 326 |
|