Skip to content

radical.asyncflow.backends.execution.dask_parallel

DaskExecutionBackend

DaskExecutionBackend(resources: Optional[Dict] = None)

Bases: BaseExecutionBackend

An async-only Dask execution backend for distributed task execution.

Handles task submission, cancellation, and proper async event loop handling for distributed task execution using Dask. All functions must be async.

Usage

backend = await DaskExecutionBackend(resources)

or

async with DaskExecutionBackend(resources) as backend: await backend.submit_tasks(tasks)

Initialize the Dask execution backend (non-async setup only).

Parameters:

Name Type Description Default
resources Optional[Dict]

Dictionary of resource requirements for tasks. Contains configuration parameters for the Dask client initialization.

None
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@typeguard.typechecked
def __init__(self, resources: Optional[Dict] = None):
    """Initialize the Dask execution backend (non-async setup only).

    Args:
        resources: Dictionary of resource requirements for tasks. Contains
            configuration parameters for the Dask client initialization.
    """
    self.tasks = {}
    self._client = None
    self.session = Session()
    self._callback_func = None
    self._resources = resources or {}
    self._initialized = False

__await__

__await__()

Make DaskExecutionBackend awaitable like Dask Client.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
41
42
43
def __await__(self):
    """Make DaskExecutionBackend awaitable like Dask Client."""
    return self._async_init().__await__()

register_callback

register_callback(callback: Callable) -> None

Register a callback for task state changes.

Parameters:

Name Type Description Default
callback Callable

Function to be called when task states change. Should accept task and state parameters.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
66
67
68
69
70
71
72
73
def register_callback(self, callback: Callable) -> None:
    """Register a callback for task state changes.

    Args:
        callback: Function to be called when task states change. Should accept
            task and state parameters.
    """
    self._callback_func = callback

get_task_states_map

get_task_states_map()

Retrieve a mapping of task IDs to their current states.

Returns:

Name Type Description
StateMapper

Object containing the mapping of task states for this backend.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
75
76
77
78
79
80
81
def get_task_states_map(self):
    """Retrieve a mapping of task IDs to their current states.

    Returns:
        StateMapper: Object containing the mapping of task states for this backend.
    """
    return StateMapper(backend=self)

cancel_task async

cancel_task(uid: str) -> bool

Cancel a task by its UID.

Parameters:

Name Type Description Default
uid str

The UID of the task to cancel.

required

Returns:

Name Type Description
bool bool

True if the task was found and cancellation was attempted, False otherwise.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async def cancel_task(self, uid: str) -> bool:
    """Cancel a task by its UID.

    Args:
        uid (str): The UID of the task to cancel.

    Returns:
        bool: True if the task was found and cancellation was attempted, False otherwise.
    """
    self._ensure_initialized()
    if uid in self.tasks:
        task = self.tasks[uid]
        future = task.get('future')
        if future:
            return await future.cancel()
    return False

submit_tasks async

submit_tasks(tasks: List[Dict[str, Any]]) -> None

Submit async tasks to Dask cluster.

Processes a list of tasks and submits them to the Dask cluster for execution. Filters out future objects from arguments and validates that all functions are async coroutine functions.

Parameters:

Name Type Description Default
tasks List[Dict[str, Any]]

List of task dictionaries containing: - uid: Unique task identifier - function: Async callable to execute - args: Positional arguments - kwargs: Keyword arguments - executable: Optional executable path (not supported) - task_backend_specific_kwargs: Backend-specific parameters

required
Note

Executable tasks are not supported and will result in FAILED state. Only async functions are supported - sync functions will result in FAILED state. Future objects are filtered out from arguments as they are not picklable.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
async def submit_tasks(self, tasks: List[Dict[str, Any]]) -> None:
    """Submit async tasks to Dask cluster.

    Processes a list of tasks and submits them to the Dask cluster for execution.
    Filters out future objects from arguments and validates that all functions
    are async coroutine functions.

    Args:
        tasks: List of task dictionaries containing:
            - uid: Unique task identifier
            - function: Async callable to execute
            - args: Positional arguments
            - kwargs: Keyword arguments
            - executable: Optional executable path (not supported)
            - task_backend_specific_kwargs: Backend-specific parameters

    Note:
        Executable tasks are not supported and will result in FAILED state.
        Only async functions are supported - sync functions will result in FAILED state.
        Future objects are filtered out from arguments as they are not picklable.
    """
    self._ensure_initialized()

    for task in tasks:

        is_func_task = bool(task.get('function'))
        is_exec_task = bool(task.get('executable'))

        if is_exec_task:
            error_msg = 'DaskExecutionBackend does not support executable tasks'
            task['stderr'] = ValueError(error_msg)
            self._callback_func(task, 'FAILED')
            continue

        # Validate that function is async
        if is_func_task and not asyncio.iscoroutinefunction(task['function']):
            error_msg = 'DaskExecutionBackend only supports async functions'
            task['exception'] = ValueError(error_msg)
            self._callback_func(task, 'FAILED')
            continue

        self.tasks[task['uid']] = task

        # Filter out future objects as they are not picklable
        task['args'] = tuple(arg for arg in task['args'] if not isinstance(arg,
                                           (ConcurrentFuture, asyncio.Future)))
        try:
            await self._submit_async_function(task)
        except Exception as e:
            task['exception'] = e
            self._callback_func(task, 'FAILED')

cancel_all_tasks async

cancel_all_tasks() -> int

Cancel all currently running/pending tasks.

Returns:

Type Description
int

Number of tasks that were successfully cancelled

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def cancel_all_tasks(self) -> int:
    """Cancel all currently running/pending tasks.

    Returns:
        Number of tasks that were successfully cancelled
    """
    self._ensure_initialized()
    cancelled_count = 0
    task_uids = list(self.tasks.keys())

    for task_uid in task_uids:
        if await self.cancel_task(task_uid):
            cancelled_count += 1

    return cancelled_count
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)

Handle explicit data dependencies between tasks.

Parameters:

Name Type Description Default
src_task

The source task that produces the dependency. Defaults to None.

None
dst_task

The destination task that depends on the source. Defaults to None.

None
file_name

Name of the file that represents the dependency. Defaults to None.

None
file_path

Full path to the file that represents the dependency. Defaults to None.

None
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
221
222
223
224
225
226
227
228
229
230
def link_explicit_data_deps(self, src_task=None, dst_task=None, file_name=None, file_path=None):
    """Handle explicit data dependencies between tasks.

    Args:
        src_task: The source task that produces the dependency. Defaults to None.
        dst_task: The destination task that depends on the source. Defaults to None.
        file_name: Name of the file that represents the dependency. Defaults to None.
        file_path: Full path to the file that represents the dependency. Defaults to None.
    """
    pass
link_implicit_data_deps(src_task, dst_task)

Handle implicit data dependencies for a task.

Parameters:

Name Type Description Default
src_task

The source task that produces data.

required
dst_task

The destination task that depends on the source task's output.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
232
233
234
235
236
237
238
239
def link_implicit_data_deps(self, src_task, dst_task):
    """Handle implicit data dependencies for a task.

    Args:
        src_task: The source task that produces data.
        dst_task: The destination task that depends on the source task's output.
    """
    pass

state async

state() -> str

Get the current state of the Dask execution backend.

Returns:

Type Description
str

Current state of the backend as a string.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def state(self) -> str:
    """Get the current state of the Dask execution backend.

    Returns:
        Current state of the backend as a string.
    """
    if not self._initialized or self._client is None:
        return "DISCONNECTED"

    try:
        # Check if client is still connected
        await self._client.scheduler_info()
        return "CONNECTED"
    except Exception:
        return "DISCONNECTED"

task_state_cb async

task_state_cb(task: dict, state: str) -> None

Callback function invoked when a task's state changes.

Parameters:

Name Type Description Default
task dict

Dictionary containing task information and metadata.

required
state str

The new state of the task.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
257
258
259
260
261
262
263
264
async def task_state_cb(self, task: dict, state: str) -> None:
    """Callback function invoked when a task's state changes.

    Args:
        task: Dictionary containing task information and metadata.
        state: The new state of the task.
    """
    pass

build_task async

build_task(task: dict) -> None

Build or prepare a task for execution.

Parameters:

Name Type Description Default
task dict

Dictionary containing task definition, parameters, and metadata required for task construction.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
266
267
268
269
270
271
272
273
async def build_task(self, task: dict) -> None:
    """Build or prepare a task for execution.

    Args:
        task: Dictionary containing task definition, parameters, and metadata
            required for task construction.
    """
    pass

shutdown async

shutdown() -> None

Shutdown the Dask client and clean up resources.

Closes the Dask client connection, clears task storage, and handles any cleanup exceptions gracefully.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def shutdown(self) -> None:
    """Shutdown the Dask client and clean up resources.

    Closes the Dask client connection, clears task storage, and handles
    any cleanup exceptions gracefully.
    """
    if self._client is not None:
        try:
            # Cancel all running tasks first
            await self.cancel_all_tasks()

            # Close the client
            await self._client.close()
            print("Dask client shutdown complete")
        except Exception as e:
            print(f"Error during shutdown: {str(e)}")
        finally:
            self._client = None
            self.tasks.clear()
            self._initialized = False

__aenter__ async

__aenter__()

Async context manager entry.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
304
305
306
307
308
async def __aenter__(self):
    """Async context manager entry."""
    if not self._initialized:
        await self._async_init()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
310
311
312
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.shutdown()

create async classmethod

create(resources: Optional[Dict] = None)

Alternative factory method for creating initialized backend.

Parameters:

Name Type Description Default
resources Optional[Dict]

Configuration parameters for Dask client initialization.

None

Returns:

Type Description

Fully initialized DaskExecutionBackend instance.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/dask_parallel.py
315
316
317
318
319
320
321
322
323
324
325
326
@classmethod
async def create(cls, resources: Optional[Dict] = None):
    """Alternative factory method for creating initialized backend.

    Args:
        resources: Configuration parameters for Dask client initialization.

    Returns:
        Fully initialized DaskExecutionBackend instance.
    """
    backend = cls(resources)
    return await backend