Skip to content

radical.asyncflow.constants

TasksMainStates

Bases: Enum

Enumeration of standard task states used across all backends.

This enum defines the canonical task states that are common to all execution backends, providing a unified interface for task state management.

Attributes:

Name Type Description
DONE

Task completed successfully.

FAILED

Task execution failed.

CANCELED

Task was canceled before completion.

RUNNING

Task is currently executing.

StateMapper

StateMapper(backend: Union[str, Any])

Unified interface for mapping task states between main workflow and backend systems.

StateMapper provides a centralized mechanism for translating task states between the main workflow system and various backend execution systems (e.g., 'radical', 'dask'). It supports dynamic registration of backend-specific state mappings and bidirectional conversion between main states and backend-specific states.

The class maintains a registry of backend state mappings and provides methods for state conversion, backend detection, and direct state access through attribute notation.

Attributes:

Name Type Description
_backend_registry Dict[str, Dict[TasksMainStates, Any]]

Class-level registry mapping backend identifiers to their state mappings.

backend_name str

Name of the current backend.

backend_module Optional[Any]

Reference to the backend module/object.

_state_map Dict[TasksMainStates, Any]

Current backend's state mappings.

_reverse_map Dict[Any, TasksMainStates]

Reverse mapping from backend states to main states.

Parameters:

Name Type Description Default
backend Union[str, Any]

The backend identifier, either as a string (e.g., 'radical', 'dask') or as a backend module/object instance.

required

Raises:

Type Description
ValueError

If the specified backend is not registered or cannot be detected.

Example

:: # Register a backend with custom states StateMapper.register_backend_states( backend='my_backend', done_state='COMPLETED', failed_state='ERROR', canceled_state='ABORTED', running_state='ACTIVE' )

1
2
3
4
# Use the mapper
mapper = StateMapper('my_backend')
backend_state = mapper.DONE  # Returns 'COMPLETED'
main_state = mapper.to_main_state('COMPLETED')  # Returns TasksMainStates.DONE

Initialize StateMapper with a specific backend.

Creates a StateMapper instance configured for the specified backend. The backend can be provided as either a string identifier or a module/object instance.

Parameters:

Name Type Description Default
backend Union[str, Any]

Backend identifier. Can be: - String: Backend name like 'radical', 'dask', etc. - Object: Backend module or instance from which name is detected.

required

Raises:

Type Description
ValueError

If the backend is not registered in the registry or if backend name cannot be detected from the provided object.

Note

The backend must be registered using register_backend_states() or register_backend_states_with_defaults() before initialization.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/constants.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(self, backend: Union[str, Any]):
    """Initialize StateMapper with a specific backend.

    Creates a StateMapper instance configured for the specified backend.
    The backend can be provided as either a string identifier or a
    module/object instance.

    Args:
        backend (Union[str, Any]): Backend identifier. Can be:
            - String: Backend name like 'radical', 'dask', etc.
            - Object: Backend module or instance from which name is detected.

    Raises:
        ValueError: If the backend is not registered in the registry or
            if backend name cannot be detected from the provided object.

    Note:
        The backend must be registered using register_backend_states() or
        register_backend_states_with_defaults() before initialization.
    """
    self.backend_name: str
    self.backend_module: Optional[Any] = None

    if isinstance(backend, str):
        self.backend_name = backend.lower()
    else:
        self.backend_module = backend
        self.backend_name = self._detect_backend_name()

    if self.backend_module not in self._backend_registry:
        raise ValueError(
            f"Backend '{self.backend_module}' not registered. "
            f"Available backends: {list(self._backend_registry.keys())}")

    self._state_map = self._backend_registry[self.backend_module]
    self._reverse_map = {v: k for k, v in self._state_map.items()}

terminal_states property

terminal_states: tuple

Get all terminal states for the current backend.

Returns a tuple containing the backend-specific representations of all terminal states (DONE, FAILED, CANCELED). These are states that indicate a task has finished execution and will not transition further.

Returns:

Name Type Description
tuple tuple

Backend-specific terminal state values (DONE, FAILED, CANCELED).

Example

:: mapper = StateMapper('my_backend') terminals = mapper.terminal_states # Returns ('COMPLETED', 'ERROR', 'ABORTED') for example backend

register_backend_states classmethod

register_backend_states(backend: Any, done_state: Any, failed_state: Any, canceled_state: Any, running_state: Any, **additional_states) -> None

Register state mappings for a new backend.

Associates a backend identifier with its corresponding task state values, mapping the standard task states to backend-specific representations. Supports additional custom state mappings beyond the four core states.

Parameters:

Name Type Description Default
backend Any

The identifier for the backend to register. Can be a string, module, or any hashable object.

required
done_state Any

Backend's representation of the DONE state.

required
failed_state Any

Backend's representation of the FAILED state.

required
canceled_state Any

Backend's representation of the CANCELED state.

required
running_state Any

Backend's representation of the RUNNING state.

required
**additional_states

Additional state mappings where the key is the state name (str) and the value is the backend's representation. Keys will be converted to uppercase and matched against TasksMainStates.

{}

Returns:

Type Description
None

None

Example

:: StateMapper.register_backend_states( backend='slurm', done_state='COMPLETED', failed_state='FAILED', canceled_state='CANCELLED', running_state='RUNNING', pending='PENDING', timeout='TIMEOUT' )

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/constants.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def register_backend_states(
    cls,
    backend: Any,
    done_state: Any,
    failed_state: Any,
    canceled_state: Any,
    running_state: Any,
    **additional_states) -> None:
    """Register state mappings for a new backend.

    Associates a backend identifier with its corresponding task state values,
    mapping the standard task states to backend-specific representations.
    Supports additional custom state mappings beyond the four core states.

    Args:
        backend (Any): The identifier for the backend to register. Can be
            a string, module, or any hashable object.
        done_state (Any): Backend's representation of the DONE state.
        failed_state (Any): Backend's representation of the FAILED state.
        canceled_state (Any): Backend's representation of the CANCELED state.
        running_state (Any): Backend's representation of the RUNNING state.
        **additional_states: Additional state mappings where the key is the
            state name (str) and the value is the backend's representation.
            Keys will be converted to uppercase and matched against TasksMainStates.

    Returns:
        None

    Example:
        ::
            StateMapper.register_backend_states(
                backend='slurm',
                done_state='COMPLETED',
                failed_state='FAILED',
                canceled_state='CANCELLED',
                running_state='RUNNING',
                pending='PENDING',
                timeout='TIMEOUT'
            )
    """
    cls._backend_registry[backend] = {
        TasksMainStates.DONE: done_state,
        TasksMainStates.FAILED: failed_state,
        TasksMainStates.CANCELED: canceled_state,
        TasksMainStates.RUNNING: running_state,
        **{TasksMainStates[k.upper()]: v for k, v in additional_states.items()}}

register_backend_states_with_defaults classmethod

register_backend_states_with_defaults(backend: Any)

Register a backend using default main state values.

Convenience method that registers a backend where the backend-specific states are identical to the main state values (i.e., the string values of the TasksMainStates enum).

Parameters:

Name Type Description Default
backend Any

The backend identifier to register.

required

Returns:

Type Description

The result of register_backend_states() with default values.

Example

:: # This registers backend states as: # DONE -> "DONE", FAILED -> "FAILED", etc. StateMapper.register_backend_states_with_defaults('thread_backend')

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/constants.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@classmethod
def register_backend_states_with_defaults(cls, backend: Any):
    """Register a backend using default main state values.

    Convenience method that registers a backend where the backend-specific
    states are identical to the main state values (i.e., the string values
    of the TasksMainStates enum).

    Args:
        backend (Any): The backend identifier to register.

    Returns:
        The result of register_backend_states() with default values.

    Example:
        ::
            # This registers backend states as:
            # DONE -> "DONE", FAILED -> "FAILED", etc.
            StateMapper.register_backend_states_with_defaults('thread_backend')
    """
    return cls.register_backend_states(backend,
                                       done_state=TasksMainStates.DONE.value,
                                       failed_state=TasksMainStates.FAILED.value,
                                       canceled_state=TasksMainStates.CANCELED.value,
                                       running_state=TasksMainStates.RUNNING.value)

__getattr__

__getattr__(name: str) -> Any

Access backend-specific states directly via attribute notation.

Enables direct access to backend states using the main state names as attributes (e.g., mapper.DONE, mapper.FAILED).

Parameters:

Name Type Description Default
name str

The main state name to access (DONE, FAILED, CANCELED, RUNNING).

required

Returns:

Name Type Description
Any Any

The backend-specific state value corresponding to the main state.

Raises:

Type Description
AttributeError

If the specified state name is not valid.

Example

:: mapper = StateMapper('my_backend') done_state = mapper.DONE # Returns backend's DONE state running_state = mapper.RUNNING # Returns backend's RUNNING state

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/constants.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def __getattr__(self, name: str) -> Any:
    """Access backend-specific states directly via attribute notation.

    Enables direct access to backend states using the main state names
    as attributes (e.g., mapper.DONE, mapper.FAILED).

    Args:
        name (str): The main state name to access (DONE, FAILED, CANCELED, RUNNING).

    Returns:
        Any: The backend-specific state value corresponding to the main state.

    Raises:
        AttributeError: If the specified state name is not valid.

    Example:
        ::
            mapper = StateMapper('my_backend')
            done_state = mapper.DONE  # Returns backend's DONE state
            running_state = mapper.RUNNING  # Returns backend's RUNNING state
    """
    try:
        main_state = TasksMainStates[name]
        return self._state_map[main_state]
    except KeyError:
        raise AttributeError(f"'{self.__class__.__name__}' has no state '{name}'")

to_main_state

to_main_state(backend_state: Any) -> TasksMainStates

Convert backend-specific state to main state.

Translates a backend-specific state value back to the corresponding TasksMainStates enum value.

Parameters:

Name Type Description Default
backend_state Any

The backend-specific state value to convert.

required

Returns:

Name Type Description
TasksMainStates TasksMainStates

The corresponding main state enum value.

Raises:

Type Description
ValueError

If the backend state is not recognized.

Example

:: mapper = StateMapper('slurm') main_state = mapper.to_main_state('COMPLETED') # Returns TasksMainStates.DONE

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/constants.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def to_main_state(self, backend_state: Any) -> TasksMainStates:
    """Convert backend-specific state to main state.

    Translates a backend-specific state value back to the corresponding
    TasksMainStates enum value.

    Args:
        backend_state (Any): The backend-specific state value to convert.

    Returns:
        TasksMainStates: The corresponding main state enum value.

    Raises:
        ValueError: If the backend state is not recognized.

    Example:
        ::
            mapper = StateMapper('slurm')
            main_state = mapper.to_main_state('COMPLETED')  # Returns TasksMainStates.DONE
    """
    try:
        return self._reverse_map[backend_state]
    except KeyError:
        raise ValueError(f"Unknown backend state: {backend_state}")

get_backend_state

get_backend_state(main_state: Union[TasksMainStates, str]) -> Any

Get backend-specific state for a main state.

Retrieves the backend-specific state value that corresponds to the given main state. Accepts both TasksMainStates enum values and string representations.

Parameters:

Name Type Description Default
main_state Union[TasksMainStates, str]

The main state to convert. Can be a TasksMainStates enum value or its string representation.

required

Returns:

Name Type Description
Any Any

The backend-specific state value.

Raises:

Type Description
KeyError

If the main state is not found in the mapping.

Example

:: mapper = StateMapper('my_backend') backend_state = mapper.get_backend_state(TasksMainStates.DONE) # Or using string backend_state = mapper.get_backend_state('DONE')

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/constants.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def get_backend_state(self, main_state: Union[TasksMainStates, str]) -> Any:
    """Get backend-specific state for a main state.

    Retrieves the backend-specific state value that corresponds to the
    given main state. Accepts both TasksMainStates enum values and
    string representations.

    Args:
        main_state (Union[TasksMainStates, str]): The main state to convert.
            Can be a TasksMainStates enum value or its string representation.

    Returns:
        Any: The backend-specific state value.

    Raises:
        KeyError: If the main state is not found in the mapping.

    Example:
        ::
            mapper = StateMapper('my_backend')
            backend_state = mapper.get_backend_state(TasksMainStates.DONE)
            # Or using string
            backend_state = mapper.get_backend_state('DONE')
    """
    if isinstance(main_state, str):
        main_state = TasksMainStates[main_state]
    return self._state_map[main_state]