Skip to content

radical.asyncflow.backends.execution.base

BaseExecutionBackend

Bases: ABC

Abstract base class for execution backends that manage task execution and state.

This class defines the interface for execution backends that handle task submission, state management, and dependency linking in a distributed or parallel execution environment.

submit_tasks abstractmethod

submit_tasks(tasks: List[dict]) -> None

Submit a list of tasks for execution.

Parameters:

Name Type Description Default
tasks List[dict]

A list of dictionaries containing task definitions and metadata. Each task dictionary should contain the necessary information for task execution.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
14
15
16
17
18
19
20
21
22
23
@abstractmethod
def submit_tasks(self, tasks: List[dict]) -> None:
    """Submit a list of tasks for execution.

    Args:
        tasks: A list of dictionaries containing task definitions and metadata.
            Each task dictionary should contain the necessary information for
            task execution.
    """
    pass

shutdown abstractmethod

shutdown() -> None

Gracefully shutdown the execution backend.

This method should clean up resources, terminate running tasks if necessary, and prepare the backend for termination.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
25
26
27
28
29
30
31
32
@abstractmethod
def shutdown(self) -> None:
    """Gracefully shutdown the execution backend.

    This method should clean up resources, terminate running tasks if necessary,
    and prepare the backend for termination.
    """
    pass

state abstractmethod

state() -> str

Get the current state of the execution backend.

Returns:

Type Description
str

A string representing the current state of the backend (e.g., 'running',

str

'idle', 'shutting_down', 'error').

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
34
35
36
37
38
39
40
41
42
@abstractmethod
def state(self) -> str:
    """Get the current state of the execution backend.

    Returns:
        A string representing the current state of the backend (e.g., 'running',
        'idle', 'shutting_down', 'error').
    """
    pass

task_state_cb abstractmethod

task_state_cb(task: dict, state: str) -> None

Callback function invoked when a task's state changes.

Parameters:

Name Type Description Default
task dict

Dictionary containing task information and metadata.

required
state str

The new state of the task (e.g., 'pending', 'running', 'completed', 'failed').

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
44
45
46
47
48
49
50
51
52
53
@abstractmethod
def task_state_cb(self, task: dict, state: str) -> None:
    """Callback function invoked when a task's state changes.

    Args:
        task: Dictionary containing task information and metadata.
        state: The new state of the task (e.g., 'pending', 'running', 'completed',
            'failed').
    """
    pass

register_callback abstractmethod

register_callback(func) -> None

Register a callback function for task state changes.

Parameters:

Name Type Description Default
func

A callable that will be invoked when task states change. The function should accept task and state parameters.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
55
56
57
58
59
60
61
62
63
@abstractmethod
def register_callback(cls, func) -> None:
    """Register a callback function for task state changes.

    Args:
        func: A callable that will be invoked when task states change.
            The function should accept task and state parameters.
    """
    pass

get_task_states_map abstractmethod

get_task_states_map() -> None

Retrieve a mapping of task IDs to their current states.

Returns:

Type Description
None

A dictionary mapping task identifiers to their current execution states.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
65
66
67
68
69
70
71
72
@abstractmethod
def get_task_states_map(self) -> None:
    """Retrieve a mapping of task IDs to their current states.

    Returns:
        A dictionary mapping task identifiers to their current execution states.
    """
    pass

build_task abstractmethod

build_task(task: dict) -> None

Build or prepare a task for execution.

Parameters:

Name Type Description Default
task dict

Dictionary containing task definition, parameters, and metadata required for task construction.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
74
75
76
77
78
79
80
81
82
@abstractmethod
def build_task(cls, task: dict) -> None:
    """Build or prepare a task for execution.

    Args:
        task: Dictionary containing task definition, parameters, and metadata
            required for task construction.
    """
    pass
link_implicit_data_deps(src_task, dst_task)

Link implicit data dependencies between two tasks.

Creates a dependency relationship where the destination task depends on data produced by the source task, with the dependency being inferred automatically.

Parameters:

Name Type Description Default
src_task

The source task that produces data.

required
dst_task

The destination task that depends on the source task's output.

required
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
84
85
86
87
88
89
90
91
92
93
94
95
96
@abstractmethod
def link_implicit_data_deps(self, src_task, dst_task):
    """Link implicit data dependencies between two tasks.

    Creates a dependency relationship where the destination task depends on
    data produced by the source task, with the dependency being inferred
    automatically.

    Args:
        src_task: The source task that produces data.
        dst_task: The destination task that depends on the source task's output.
    """
    pass
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)

Link explicit data dependencies between tasks or files.

Creates explicit dependency relationships based on specified file names or paths, allowing for more precise control over task execution order.

Parameters:

Name Type Description Default
src_task

The source task that produces the dependency. Defaults to None.

None
dst_task

The destination task that depends on the source. Defaults to None.

None
file_name

Name of the file that represents the dependency. Defaults to None.

None
file_path

Full path to the file that represents the dependency. Defaults to None.

None
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@abstractmethod
def link_explicit_data_deps(self, src_task=None, dst_task=None, file_name=None, file_path=None):
    """Link explicit data dependencies between tasks or files.

    Creates explicit dependency relationships based on specified file names
    or paths, allowing for more precise control over task execution order.

    Args:
        src_task: The source task that produces the dependency. Defaults to None.
        dst_task: The destination task that depends on the source. Defaults to None.
        file_name: Name of the file that represents the dependency. Defaults to None.
        file_path: Full path to the file that represents the dependency. Defaults to None.
    """
    pass

cancel_task abstractmethod

cancel_task(uid: str) -> bool

Cancel a task in the execution backend.

Parameters:

Name Type Description Default
uid str

Task identifier

required

Raises:

Type Description
NotImplementedError

If the backend doesn't support cancellation

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
113
114
115
116
117
118
119
120
121
122
123
124
@abstractmethod
def cancel_task(self, uid: str) -> bool:
    """
    Cancel a task in the execution backend.

    Args:
        uid: Task identifier

    Raises:
        NotImplementedError: If the backend doesn't support cancellation
    """
    raise NotImplementedError("Task cancellation not implemented in the base backend")

Session

Session()

Manages execution session state and working directory.

This class maintains session-specific information including the current working directory path for task execution.

Initialize a new session with the current working directory.

Sets the session path to the current working directory at the time of initialization.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/base.py
134
135
136
137
138
139
140
def __init__(self):
    """Initialize a new session with the current working directory.

    Sets the session path to the current working directory at the time
    of initialization.
    """
    self.path = os.getcwd()