Skip to content

radical.asyncflow.backends

StateMapper

Simple state mapper for noop backend.

NoopExecutionBackend

NoopExecutionBackend(name: str = 'default')

A no-operation execution backend for testing and development purposes.

This backend simulates task execution without actually running any tasks. All submitted tasks immediately return dummy output and transition to DONE state. Useful for testing workflow logic without computational overhead.

Attributes:

Name Type Description
_work_dir str

Output directory for capture_stdio files. Defaults to cwd; overwritten by WorkflowEngine._attach_backend.

is_attached bool

True once registered with a WorkflowEngine or Session.

attached_to list[str]

Ordered list of engine/session UIDs this backend has been attached to (most recent last).

Initialize the no-op execution backend.

Sets up dummy task storage, and default callback function.

Parameters:

Name Type Description Default
name str

Name used to identify this backend in the multi-backend registry.

'default'
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, name: str = "default"):
    """Initialize the no-op execution backend.

    Sets up dummy task storage, and default callback function.

    Args:
        name: Name used to identify this backend in the multi-backend registry.
    """
    self.name = name
    self.tasks = {}
    self._callback_func: Callable = lambda task, state: None  # default no-op
    self._work_dir: str = os.getcwd()
    self.is_attached: bool = False
    self.attached_to: list[str] = []

state

state() -> str

Get the current state of the no-op execution backend.

Returns:

Name Type Description
str str

Always returns 'IDLE' as this backend performs no actual work.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
55
56
57
58
59
60
61
def state(self) -> str:
    """Get the current state of the no-op execution backend.

    Returns:
        str: Always returns 'IDLE' as this backend performs no actual work.
    """
    return "IDLE"

task_state_cb

task_state_cb(task: dict, state: str) -> None

Callback function invoked when a task's state changes.

Parameters:

Name Type Description Default
task dict

Dictionary containing task information and metadata.

required
state str

The new state of the task.

required
Note

This is a no-op implementation that performs no actions.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
63
64
65
66
67
68
69
70
71
72
73
def task_state_cb(self, task: dict, state: str) -> None:
    """Callback function invoked when a task's state changes.

    Args:
        task: Dictionary containing task information and metadata.
        state: The new state of the task.

    Note:
        This is a no-op implementation that performs no actions.
    """
    pass

get_task_states_map

get_task_states_map() -> StateMapper

Retrieve a mapping of task IDs to their current states.

Returns:

Name Type Description
StateMapper StateMapper

Object containing the mapping of task states for this backend.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
75
76
77
78
79
80
81
def get_task_states_map(self) -> StateMapper:
    """Retrieve a mapping of task IDs to their current states.

    Returns:
        StateMapper: Object containing the mapping of task states for this backend.
    """
    return StateMapper()

register_callback

register_callback(func: Callable) -> None

Register a callback for task state changes.

Parameters:

Name Type Description Default
func Callable

Function to be called when task states change. Should accept task and state parameters.

required
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
83
84
85
86
87
88
89
90
def register_callback(self, func: Callable) -> None:
    """Register a callback for task state changes.

    Args:
        func: Function to be called when task states change. Should accept
            task and state parameters.
    """
    self._callback_func = func

build_task

build_task(uid: str, task_desc: dict, task_specific_kwargs: dict) -> None

Build or prepare a task for execution.

Parameters:

Name Type Description Default
uid str

Unique identifier for the task.

required
task_desc dict

Dictionary containing task description and metadata.

required
task_specific_kwargs dict

Backend-specific keyword arguments for the task.

required
Note

This is a no-op implementation that performs no actual task building.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def build_task(self, uid: str, task_desc: dict, task_specific_kwargs: dict) -> None:
    """Build or prepare a task for execution.

    Args:
        uid: Unique identifier for the task.
        task_desc: Dictionary containing task description and metadata.
        task_specific_kwargs: Backend-specific keyword arguments for the task.

    Note:
        This is a no-op implementation that performs no actual task building.
    """
    pass

submit_tasks async

submit_tasks(tasks: list) -> None

Submit tasks for mock execution.

Immediately marks all tasks as completed with dummy output without performing any actual computation.

Parameters:

Name Type Description Default
tasks list

List of task dictionaries to be processed. Each task will receive dummy stdout and return_value before being marked as DONE.

required
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def submit_tasks(self, tasks: list) -> None:
    """Submit tasks for mock execution.

    Immediately marks all tasks as completed with dummy output without
    performing any actual computation.

    Args:
        tasks: List of task dictionaries to be processed. Each task will
            receive dummy stdout and return_value before being marked as DONE.
    """
    for task in tasks:
        if task.get("prompt"):
            task["stdout"] = "Dummy Prompt Output"
            task["return_value"] = "Dummy Prompt Output"
        else:
            task["stdout"] = "Dummy Output"
            task["return_value"] = "Dummy Output"
        self._callback_func(task, "DONE")
link_explicit_data_deps(src_task: dict | None = None, dst_task: dict | None = None, file_name: str | None = None, file_path: str | None = None) -> None

Handle explicit data dependencies between tasks.

Parameters:

Name Type Description Default
src_task dict | None

The source task that produces the dependency.

None
dst_task dict | None

The destination task that depends on the source.

None
file_name str | None

Name of the file that represents the dependency.

None
file_path str | None

Full path to the file that represents the dependency.

None
Note

This is a no-op implementation as this backend doesn't handle dependencies.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def link_explicit_data_deps(
    self,
    src_task: dict | None = None,
    dst_task: dict | None = None,
    file_name: str | None = None,
    file_path: str | None = None,
) -> None:
    """Handle explicit data dependencies between tasks.

    Args:
        src_task: The source task that produces the dependency.
        dst_task: The destination task that depends on the source.
        file_name: Name of the file that represents the dependency.
        file_path: Full path to the file that represents the dependency.

    Note:
        This is a no-op implementation as this backend doesn't handle dependencies.
    """
    pass
link_implicit_data_deps(src_task: dict, dst_task: dict) -> None

Handle implicit data dependencies for a task.

Parameters:

Name Type Description Default
src_task dict

The source task that produces data.

required
dst_task dict

The destination task that depends on the source task's output.

required
Note

This is a no-op implementation as this backend doesn't handle dependencies.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
147
148
149
150
151
152
153
154
155
156
157
def link_implicit_data_deps(self, src_task: dict, dst_task: dict) -> None:
    """Handle implicit data dependencies for a task.

    Args:
        src_task: The source task that produces data.
        dst_task: The destination task that depends on the source task's output.

    Note:
        This is a no-op implementation as this backend doesn't handle dependencies.
    """
    pass

shutdown async

shutdown() -> None

Shutdown the no-op execution backend.

Performs cleanup operations. Since this is a no-op backend, no actual resources need to be cleaned up.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
159
160
161
162
163
164
165
async def shutdown(self) -> None:
    """Shutdown the no-op execution backend.

    Performs cleanup operations. Since this is a no-op backend, no actual resources
    need to be cleaned up.
    """
    pass

LocalExecutionBackend

LocalExecutionBackend(executor: Executor = None, name: str = 'default')

Simple async-only concurrent execution backend.

Attributes:

Name Type Description
_work_dir str

Output directory for capture_stdio files. Defaults to cwd; overwritten by WorkflowEngine._attach_backend.

is_attached bool

True once registered with a WorkflowEngine or Session.

attached_to list[str]

Ordered list of engine/session UIDs this backend has been attached to (most recent last).

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def __init__(self, executor: Executor = None, name: str = "default"):
    if not executor:
        executor = ThreadPoolExecutor()
        logger.info(
            "No executor was provided. Falling back to default ThreadPoolExecutor"
        )

    if not isinstance(executor, Executor):
        err = "Executor must be ThreadPoolExecutor or ProcessPoolExecutor"
        raise TypeError(err)

    if isinstance(executor, ProcessPoolExecutor) and cloudpickle is None:
        raise ImportError(
            "ProcessPoolExecutor requires 'cloudpickle'. "
            "Install it with: pip install cloudpickle"
        )

    self.name = name
    self.executor = executor
    self.tasks: dict[str, asyncio.Task] = {}
    self._callback_func: Callable = lambda t, s: None
    self._initialized = False
    self._work_dir: str = os.getcwd()
    self.is_attached: bool = False
    self.attached_to: list[str] = []

__await__

__await__()

Make backend awaitable.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
205
206
207
def __await__(self):
    """Make backend awaitable."""
    return self._async_init().__await__()

register_callback

register_callback(func: Callable) -> None

Register a callback for task state changes.

Parameters:

Name Type Description Default
func Callable

Function to be called when task states change. Should accept task and state parameters.

required
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
239
240
241
242
243
244
245
246
def register_callback(self, func: Callable) -> None:
    """Register a callback for task state changes.

    Args:
        func: Function to be called when task states change. Should accept
            task and state parameters.
    """
    self._callback_func = func

submit_tasks async

submit_tasks(tasks: list[dict[str, Any]]) -> list[Task]

Submit tasks for execution.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
357
358
359
360
361
362
363
364
365
366
367
368
369
async def submit_tasks(self, tasks: list[dict[str, Any]]) -> list[asyncio.Task]:
    """Submit tasks for execution."""

    submitted_tasks = []

    for task in tasks:
        future = asyncio.create_task(self._handle_task(task))
        submitted_tasks.append(future)

        self.tasks[task["uid"]] = task
        self.tasks[task["uid"]]["future"] = future

    return submitted_tasks

cancel_task async

cancel_task(uid: str) -> bool

Cancel a task by its UID.

Parameters:

Name Type Description Default
uid str

The UID of the task to cancel.

required

Returns:

Name Type Description
bool bool

True if the task was found and cancellation was attempted, False otherwise.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
async def cancel_task(self, uid: str) -> bool:
    """Cancel a task by its UID.

    Args:
        uid (str): The UID of the task to cancel.

    Returns:
        bool: True if the task was found and cancellation was attempted,
              False otherwise.
    """
    if uid in self.tasks:
        task = self.tasks[uid]
        future = task["future"]
        if future and future.cancel():
            # Set state on the task object itself before callback
            self._callback_func(task, "CANCELED")
            return True
    return False

cancel_all_tasks async

cancel_all_tasks() -> int

Cancel all running tasks.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
390
391
392
393
394
395
396
397
398
async def cancel_all_tasks(self) -> int:
    """Cancel all running tasks."""
    cancelled_count = 0
    for task in self.tasks.values():
        future = task["future"]
        future.cancel()
        cancelled_count += 1
    self.tasks.clear()
    return cancelled_count

shutdown async

shutdown() -> None

Shutdown the executor.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
400
401
402
403
404
405
async def shutdown(self) -> None:
    """Shutdown the executor."""
    # Set backend state to SHUTDOWN
    await self.cancel_all_tasks()
    self.executor.shutdown(wait=True)
    logger.info("Concurrent execution backend shutdown complete")

state

state() -> str

Get backend state.

Returns:

Name Type Description
str str

Current backend state (INITIALIZED, RUNNING, SHUTDOWN)

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
422
423
424
425
426
427
428
def state(self) -> str:
    """Get backend state.

    Returns:
        str: Current backend state (INITIALIZED, RUNNING, SHUTDOWN)
    """
    return "INITIALIZED" if self._initialized else "IDLE"

__aenter__ async

__aenter__()

Async context manager entry.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
433
434
435
436
437
async def __aenter__(self):
    """Async context manager entry."""
    if not self._initialized:
        await self._async_init()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
439
440
441
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.shutdown()

create async classmethod

create(executor: Executor) -> LocalExecutionBackend

Alternative factory method for creating initialized backend.

Parameters:

Name Type Description Default
executor Executor

A concurrent.Executor instance (ThreadPoolExecutor or ProcessPoolExecutor).

required

Returns:

Type Description
LocalExecutionBackend

Fully initialized LocalExecutionBackend instance.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/backends.py
443
444
445
446
447
448
449
450
451
452
453
454
455
@classmethod
async def create(cls, executor: Executor) -> LocalExecutionBackend:
    """Alternative factory method for creating initialized backend.

    Args:
        executor: A concurrent.Executor instance (ThreadPoolExecutor
                  or ProcessPoolExecutor).

    Returns:
        Fully initialized LocalExecutionBackend instance.
    """
    backend = cls(executor)
    return await backend