radical.asyncflow.backends.execution.dask_parallel¶
DaskExecutionBackend ¶
DaskExecutionBackend(resources: Optional[dict] = None)
Bases: BaseExecutionBackend
An async-only Dask execution backend for distributed task execution.
Handles task submission, cancellation, and proper async event loop handling for distributed task execution using Dask. All functions must be async.
Usage
backend = await DaskExecutionBackend(resources)
or¶
async with DaskExecutionBackend(resources) as backend: await backend.submit_tasks(tasks)
Initialize the Dask execution backend (non-async setup only).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resources | Optional[dict] | Dictionary of resource requirements for tasks. Contains configuration parameters for the Dask client initialization. | None |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
|
__await__ ¶
__await__()
Make DaskExecutionBackend awaitable like Dask Client.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
52 53 54 |
|
register_callback ¶
register_callback(callback: Callable) -> None
Register a callback for task state changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback | Callable | Function to be called when task states change. Should accept task and state parameters. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
78 79 80 81 82 83 84 85 |
|
get_task_states_map ¶
get_task_states_map()
Retrieve a mapping of task IDs to their current states.
Returns:
Name | Type | Description |
---|---|---|
StateMapper | Object containing the mapping of task states for this backend. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
87 88 89 90 91 92 93 |
|
cancel_task async
¶
cancel_task(uid: str) -> bool
Cancel a task by its UID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uid | str | The UID of the task to cancel. | required |
Returns:
Name | Type | Description |
---|---|---|
bool | bool | True if the task was found and cancellation was attempted, |
bool | False otherwise. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
|
submit_tasks async
¶
submit_tasks(tasks: list[dict[str, Any]]) -> None
Submit async tasks to Dask cluster.
Processes a list of tasks and submits them to the Dask cluster for execution. Filters out future objects from arguments and validates that all functions are async coroutine functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks | list[dict[str, Any]] | List of task dictionaries containing: - uid: Unique task identifier - function: Async callable to execute - args: Positional arguments - kwargs: Keyword arguments - executable: Optional executable path (not supported) - task_backend_specific_kwargs: Backend-specific parameters | required |
Note
Executable tasks are not supported and will result in FAILED state. Only async functions are supported - sync functions will result in FAILED state. Future objects are filtered out from arguments as they are not picklable.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
|
cancel_all_tasks async
¶
cancel_all_tasks() -> int
Cancel all currently running/pending tasks.
Returns:
Type | Description |
---|---|
int | Number of tasks that were successfully cancelled |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
|
link_explicit_data_deps ¶
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)
Handle explicit data dependencies between tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src_task | The source task that produces the dependency. | None | |
dst_task | The destination task that depends on the source. | None | |
file_name | Name of the file that represents the dependency. | None | |
file_path | Full path to the file that represents the dependency. | None |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
238 239 240 241 242 243 244 245 246 247 248 249 |
|
link_implicit_data_deps ¶
link_implicit_data_deps(src_task, dst_task)
Handle implicit data dependencies for a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src_task | The source task that produces data. | required | |
dst_task | The destination task that depends on the source task's output. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
251 252 253 254 255 256 257 258 |
|
state async
¶
state() -> str
Get the current state of the Dask execution backend.
Returns:
Type | Description |
---|---|
str | Current state of the backend as a string. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
|
task_state_cb async
¶
task_state_cb(task: dict, state: str) -> None
Callback function invoked when a task's state changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | dict | Dictionary containing task information and metadata. | required |
state | str | The new state of the task. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
276 277 278 279 280 281 282 283 |
|
build_task async
¶
build_task(task: dict) -> None
Build or prepare a task for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | dict | Dictionary containing task definition, parameters, and metadata required for task construction. | required |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
285 286 287 288 289 290 291 292 |
|
shutdown async
¶
shutdown() -> None
Shutdown the Dask client and clean up resources.
Closes the Dask client connection, clears task storage, and handles any cleanup exceptions gracefully.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
|
__aenter__ async
¶
__aenter__()
Async context manager entry.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
324 325 326 327 328 |
|
__aexit__ async
¶
__aexit__(exc_type, exc_val, exc_tb)
Async context manager exit.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
330 331 332 |
|
create async
classmethod
¶
create(resources: Optional[dict] = None)
Alternative factory method for creating initialized backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resources | Optional[dict] | Configuration parameters for Dask client initialization. | None |
Returns:
Type | Description |
---|---|
Fully initialized DaskExecutionBackend instance. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
335 336 337 338 339 340 341 342 343 344 345 346 |
|