Skip to content

radical.asyncflow.backends

StateMapper

Simple state mapper for noop backend.

NoopExecutionBackend

NoopExecutionBackend(name: str = 'default')

A no-operation execution backend for testing and development purposes.

This backend simulates task execution without actually running any tasks. All submitted tasks immediately return dummy output and transition to DONE state. Useful for testing workflow logic without computational overhead.

Initialize the no-op execution backend.

Sets up dummy task storage, and default callback function.

Parameters:

Name Type Description Default
name str

Name used to identify this backend in the multi-backend registry.

'default'
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, name: str = "default"):
    """Initialize the no-op execution backend.

    Sets up dummy task storage, and default callback function.

    Args:
        name: Name used to identify this backend in the multi-backend registry.
    """
    self.name = name
    self.tasks = {}
    self._callback_func: Callable = lambda task, state: None  # default no-op

state

state()

Get the current state of the no-op execution backend.

Returns:

Name Type Description
str

Always returns 'IDLE' as this backend performs no actual work.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
42
43
44
45
46
47
48
def state(self):
    """Get the current state of the no-op execution backend.

    Returns:
        str: Always returns 'IDLE' as this backend performs no actual work.
    """
    return "IDLE"

task_state_cb

task_state_cb(task: dict, state: str) -> None

Callback function invoked when a task's state changes.

Parameters:

Name Type Description Default
task dict

Dictionary containing task information and metadata.

required
state str

The new state of the task.

required
Note

This is a no-op implementation that performs no actions.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
50
51
52
53
54
55
56
57
58
59
60
def task_state_cb(self, task: dict, state: str) -> None:
    """Callback function invoked when a task's state changes.

    Args:
        task: Dictionary containing task information and metadata.
        state: The new state of the task.

    Note:
        This is a no-op implementation that performs no actions.
    """
    pass

get_task_states_map

get_task_states_map()

Retrieve a mapping of task IDs to their current states.

Returns:

Name Type Description
StateMapper

Object containing the mapping of task states for this backend.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
62
63
64
65
66
67
68
def get_task_states_map(self):
    """Retrieve a mapping of task IDs to their current states.

    Returns:
        StateMapper: Object containing the mapping of task states for this backend.
    """
    return StateMapper()

register_callback

register_callback(func: Callable)

Register a callback for task state changes.

Parameters:

Name Type Description Default
func Callable

Function to be called when task states change. Should accept task and state parameters.

required
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
70
71
72
73
74
75
76
77
def register_callback(self, func: Callable):
    """Register a callback for task state changes.

    Args:
        func: Function to be called when task states change. Should accept
            task and state parameters.
    """
    self._callback_func = func

build_task

build_task(uid, task_desc, task_specific_kwargs)

Build or prepare a task for execution.

Parameters:

Name Type Description Default
uid

Unique identifier for the task.

required
task_desc

Dictionary containing task description and metadata.

required
task_specific_kwargs

Backend-specific keyword arguments for the task.

required
Note

This is a no-op implementation that performs no actual task building.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
79
80
81
82
83
84
85
86
87
88
89
90
def build_task(self, uid, task_desc, task_specific_kwargs):
    """Build or prepare a task for execution.

    Args:
        uid: Unique identifier for the task.
        task_desc: Dictionary containing task description and metadata.
        task_specific_kwargs: Backend-specific keyword arguments for the task.

    Note:
        This is a no-op implementation that performs no actual task building.
    """
    pass

submit_tasks async

submit_tasks(tasks)

Submit tasks for mock execution.

Immediately marks all tasks as completed with dummy output without performing any actual computation.

Parameters:

Name Type Description Default
tasks

List of task dictionaries to be processed. Each task will receive dummy stdout and return_value before being marked as DONE.

required
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def submit_tasks(self, tasks):
    """Submit tasks for mock execution.

    Immediately marks all tasks as completed with dummy output without
    performing any actual computation.

    Args:
        tasks: List of task dictionaries to be processed. Each task will
            receive dummy stdout and return_value before being marked as DONE.
    """
    for task in tasks:
        if task.get("prompt"):
            task["stdout"] = "Dummy Prompt Output"
            task["return_value"] = "Dummy Prompt Output"
        else:
            task["stdout"] = "Dummy Output"
            task["return_value"] = "Dummy Output"
        self._callback_func(task, "DONE")
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)

Handle explicit data dependencies between tasks.

Parameters:

Name Type Description Default
src_task

The source task that produces the dependency.

None
dst_task

The destination task that depends on the source.

None
file_name

Name of the file that represents the dependency.

None
file_path

Full path to the file that represents the dependency.

None
Note

This is a no-op implementation as this backend doesn't handle dependencies.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def link_explicit_data_deps(
    self, src_task=None, dst_task=None, file_name=None, file_path=None
):
    """Handle explicit data dependencies between tasks.

    Args:
        src_task: The source task that produces the dependency.
        dst_task: The destination task that depends on the source.
        file_name: Name of the file that represents the dependency.
        file_path: Full path to the file that represents the dependency.

    Note:
        This is a no-op implementation as this backend doesn't handle dependencies.
    """
    pass
link_implicit_data_deps(src_task, dst_task)

Handle implicit data dependencies for a task.

Parameters:

Name Type Description Default
src_task

The source task that produces data.

required
dst_task

The destination task that depends on the source task's output.

required
Note

This is a no-op implementation as this backend doesn't handle dependencies.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
130
131
132
133
134
135
136
137
138
139
140
def link_implicit_data_deps(self, src_task, dst_task):
    """Handle implicit data dependencies for a task.

    Args:
        src_task: The source task that produces data.
        dst_task: The destination task that depends on the source task's output.

    Note:
        This is a no-op implementation as this backend doesn't handle dependencies.
    """
    pass

shutdown async

shutdown() -> None

Shutdown the no-op execution backend.

Performs cleanup operations. Since this is a no-op backend, no actual resources need to be cleaned up.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
142
143
144
145
146
147
148
async def shutdown(self) -> None:
    """Shutdown the no-op execution backend.

    Performs cleanup operations. Since this is a no-op backend, no actual resources
    need to be cleaned up.
    """
    pass

LocalExecutionBackend

LocalExecutionBackend(executor: Executor = None, name: str = 'default')

Simple async-only concurrent execution backend.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def __init__(self, executor: Executor = None, name: str = "default"):
    if not executor:
        executor = ThreadPoolExecutor()
        logger.info(
            "No executor was provided. Falling back to default ThreadPoolExecutor"
        )

    if not isinstance(executor, Executor):
        err = "Executor must be ThreadPoolExecutor or ProcessPoolExecutor"
        raise TypeError(err)

    if isinstance(executor, ProcessPoolExecutor) and cloudpickle is None:
        raise ImportError(
            "ProcessPoolExecutor requires 'cloudpickle'. "
            "Install it with: pip install cloudpickle"
        )

    self.name = name
    self.executor = executor
    self.tasks: dict[str, asyncio.Task] = {}
    self._callback_func: Callable = lambda t, s: None
    self._initialized = False

__await__

__await__()

Make backend awaitable.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
177
178
179
def __await__(self):
    """Make backend awaitable."""
    return self._async_init().__await__()

register_callback

register_callback(func: Callable)

Register a callback for task state changes.

Parameters:

Name Type Description Default
func Callable

Function to be called when task states change. Should accept task and state parameters.

required
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
211
212
213
214
215
216
217
218
def register_callback(self, func: Callable):
    """Register a callback for task state changes.

    Args:
        func: Function to be called when task states change. Should accept
            task and state parameters.
    """
    self._callback_func = func

submit_tasks async

submit_tasks(tasks: list[dict[str, Any]]) -> list[Task]

Submit tasks for execution.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
329
330
331
332
333
334
335
336
337
338
339
340
341
async def submit_tasks(self, tasks: list[dict[str, Any]]) -> list[asyncio.Task]:
    """Submit tasks for execution."""

    submitted_tasks = []

    for task in tasks:
        future = asyncio.create_task(self._handle_task(task))
        submitted_tasks.append(future)

        self.tasks[task["uid"]] = task
        self.tasks[task["uid"]]["future"] = future

    return submitted_tasks

cancel_task async

cancel_task(uid: str) -> bool

Cancel a task by its UID.

Parameters:

Name Type Description Default
uid str

The UID of the task to cancel.

required

Returns:

Name Type Description
bool bool

True if the task was found and cancellation was attempted, False otherwise.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
async def cancel_task(self, uid: str) -> bool:
    """Cancel a task by its UID.

    Args:
        uid (str): The UID of the task to cancel.

    Returns:
        bool: True if the task was found and cancellation was attempted,
              False otherwise.
    """
    if uid in self.tasks:
        task = self.tasks[uid]
        future = task["future"]
        if future and future.cancel():
            # Set state on the task object itself before callback
            self._callback_func(task, "CANCELED")
            return True
    return False

cancel_all_tasks async

cancel_all_tasks() -> int

Cancel all running tasks.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
362
363
364
365
366
367
368
369
370
async def cancel_all_tasks(self) -> int:
    """Cancel all running tasks."""
    cancelled_count = 0
    for task in self.tasks.values():
        future = task["future"]
        future.cancel()
        cancelled_count += 1
    self.tasks.clear()
    return cancelled_count

shutdown async

shutdown() -> None

Shutdown the executor.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
372
373
374
375
376
377
async def shutdown(self) -> None:
    """Shutdown the executor."""
    # Set backend state to SHUTDOWN
    await self.cancel_all_tasks()
    self.executor.shutdown(wait=True)
    logger.info("Concurrent execution backend shutdown complete")

state

state() -> str

Get backend state.

Returns:

Name Type Description
str str

Current backend state (INITIALIZED, RUNNING, SHUTDOWN)

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
390
391
392
393
394
395
396
def state(self) -> str:
    """Get backend state.

    Returns:
        str: Current backend state (INITIALIZED, RUNNING, SHUTDOWN)
    """
    return "INITIALIZED" if self._initialized else "IDLE"

__aenter__ async

__aenter__()

Async context manager entry.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
401
402
403
404
405
async def __aenter__(self):
    """Async context manager entry."""
    if not self._initialized:
        await self._async_init()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
407
408
409
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.shutdown()

create async classmethod

create(executor: Executor) -> LocalExecutionBackend

Alternative factory method for creating initialized backend.

Parameters:

Name Type Description Default
executor Executor

A concurrent.Executor instance (ThreadPoolExecutor or ProcessPoolExecutor).

required

Returns:

Type Description
LocalExecutionBackend

Fully initialized LocalExecutionBackend instance.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
411
412
413
414
415
416
417
418
419
420
421
422
423
@classmethod
async def create(cls, executor: Executor) -> "LocalExecutionBackend":
    """Alternative factory method for creating initialized backend.

    Args:
        executor: A concurrent.Executor instance (ThreadPoolExecutor
                  or ProcessPoolExecutor).

    Returns:
        Fully initialized LocalExecutionBackend instance.
    """
    backend = cls(executor)
    return await backend