Skip to content

radical.asyncflow.workflow_manager

WorkflowEngine

WorkflowEngine(backend: Any = None, dry_run: bool = False, implicit_data: bool = True, uid: Optional[str] = None, work_dir: Optional[str] = None)

An asynchronous workflow manager that uses asyncio event loops and coroutines to manage and execute workflow components (blocks and/or tasks) within Directed Acyclic Graph (DAG) or Chain Graph (CG) structures.

This class provides async/await operations and handles task dependencies, input/output data staging, and execution.

Attributes:

Name Type Description
loop AbstractEventLoop

The asyncio event loop (current running loop).

backend

The execution backend used for task execution.

dry_run bool

Indicates whether the engine is in dry-run mode.

log Logger

Logger instance for logging workflow events.

prof Profiler

Profiler instance for profiling workflow execution.

Initialize the WorkflowEngine (sync part only).

Note: This is a private constructor. Use WorkflowEngine.create() instead.

Parameters:

Name Type Description Default
backend Any

Execution backend (required, pre-validated)

None
dry_run bool

Whether to run in dry-run mode

False
implicit_data bool

Whether to enable implicit data dependency linking

True
uid Optional[str]

Optional unique identifier for this engine. Defaults to asyncflow.session.{hex8} (UUID-based).

None
work_dir Optional[str]

Working directory for output files (default: cwd)

None
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@typeguard.typechecked
def __init__(
    self,
    backend: Any = None,
    dry_run: bool = False,
    implicit_data: bool = True,
    uid: Optional[str] = None,
    work_dir: Optional[str] = None,
) -> None:
    """Initialize the WorkflowEngine (sync part only).

    Note: This is a private constructor. Use WorkflowEngine.create() instead.

    Args:
        backend: Execution backend (required, pre-validated)
        dry_run: Whether to run in dry-run mode
        implicit_data: Whether to enable implicit data dependency linking
        uid: Optional unique identifier for this engine. Defaults to
            ``asyncflow.session.{hex8}`` (UUID-based).
        work_dir: Working directory for output files (default: cwd)
    """
    # Get the current running loop - assume it exists
    self.loop = get_event_loop_or_raise("WorkflowEngine")

    self.uid = uid or f"asyncflow.session.{uuid.uuid4().hex[:8]}"
    self.work_dir = work_dir or os.getcwd()
    # Per-engine ContextVar avoids cross-engine context leakage in shared coroutines,
    # while asyncio still propagates it across create_task/gather branches.
    self._workflow_id_ctx: ContextVar[str | None] = ContextVar(
        f"asyncflow_workflow_id.{self.uid}", default=None
    )

    # Normalize backend: accept a single backend or a list of backends
    if not isinstance(backend, list):
        backend = [backend]
    self._backends: dict = {}
    for b in backend:
        self._attach_backend(b)
    self._default_backend_name: str = backend[0].name

    # Initialize core attributes
    self.running = []
    self.components = {}
    self.resolved = set()
    self.dependencies = {}
    self.dry_run = dry_run
    self.queue = asyncio.Queue()
    self.implicit_data_mode = implicit_data

    # Optimization: Track component state changes
    self._ready_queue = deque()
    self._dependents_map = defaultdict(set)
    self._dependency_count = {}
    self._component_change_event = asyncio.Event()

    self.task_states_map = self.backend.get_task_states_map()

    # Define decorators
    self.block = self._register_decorator(comp_type=BLOCK)
    self.function_task = self._register_decorator(
        comp_type=TASK, task_type=FUNCTION
    )
    self.executable_task = self._register_decorator(
        comp_type=TASK, task_type=EXECUTABLE
    )
    self.prompt_task = self._register_decorator(comp_type=TASK, task_type=PROMPT)

    # Initialize async task references (will be set in _start_async_components)
    self._run_task = None
    self._shutdown_event = asyncio.Event()  # Added shutdown signal

    # Telemetry (opt-in via start_telemetry(), None = disabled, zero cost)
    self._telemetry = None
    self._tel_make_event = None  # cached to avoid per-call import lookup
    self._tel_events: dict = {}  # cached event classes: name -> class
    self._task_submit_times: dict[str, float] = {}
    self._task_start_times: dict[str, float] = {}

    self._setup_signal_handlers()

backend property

backend

Return the default execution backend.

start_telemetry async

start_telemetry(resource_poll_interval: float = 5.0, checkpoint_interval: float | None = None, checkpoint_path: str | None = None, span_processors: list | None = None, metric_readers: list | None = None, resource: object | None = None) -> Any

Create and start a RHAPSODY TelemetryManager for this workflow engine.

Mirrors Session.start_telemetry(): creates the manager, wires all registered backends (silently skipping LocalExecutionBackend and NoopExecutionBackend which have no adapter), starts the async dispatch loop, and returns the manager.

Parameters:

Name Type Description Default
resource_poll_interval float

Seconds between resource metric polls (default: 5.0).

5.0
checkpoint_interval float | None

Seconds between periodic metric+span flushes to disk. None = no periodic flush (file still written at stop).

None
checkpoint_path str | None

Directory for the JSONL checkpoint file. None = no file output.

None
span_processors list | None

Optional list of OTel SpanProcessor instances added to RHAPSODY's TracerProvider alongside the internal SpanBuffer. Callers own exporter construction.

None
metric_readers list | None

Optional list of OTel MetricReader instances added to RHAPSODY's MeterProvider alongside InMemoryMetricReader.

None
resource object | None

Optional opentelemetry.sdk.resources.Resource. When None, Resource.create() reads OTEL_SERVICE_NAME from the environment automatically.

None

Returns:

Type Description
Any

The active TelemetryManager instance.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def start_telemetry(
    self,
    resource_poll_interval: float = 5.0,
    checkpoint_interval: float | None = None,
    checkpoint_path: str | None = None,
    span_processors: list | None = None,
    metric_readers: list | None = None,
    resource: object | None = None,
) -> Any:
    """Create and start a RHAPSODY TelemetryManager for this workflow engine.

    Mirrors Session.start_telemetry(): creates the manager, wires all
    registered backends (silently skipping LocalExecutionBackend and
    NoopExecutionBackend which have no adapter), starts the async dispatch
    loop, and returns the manager.

    Args:
        resource_poll_interval: Seconds between resource metric polls (default: 5.0).
        checkpoint_interval:    Seconds between periodic metric+span flushes to disk.
                                None = no periodic flush (file still written at stop).
        checkpoint_path:        Directory for the JSONL checkpoint file.
                                None = no file output.
        span_processors:        Optional list of OTel SpanProcessor instances added to
                                RHAPSODY's TracerProvider alongside the internal
                                SpanBuffer. Callers own exporter construction.
        metric_readers:         Optional list of OTel MetricReader instances added to
                                RHAPSODY's MeterProvider alongside InMemoryMetricReader.
        resource:               Optional ``opentelemetry.sdk.resources.Resource``.
                                When None, ``Resource.create()`` reads
                                ``OTEL_SERVICE_NAME`` from the environment automatically.

    Returns:
        The active TelemetryManager instance.
    """
    from rhapsody.telemetry.manager import TelemetryManager  # noqa: PLC0415

    self._telemetry = TelemetryManager(
        session_id=self.uid,
        checkpoint_interval=checkpoint_interval,
        checkpoint_path=checkpoint_path,
        span_processors=span_processors,
        metric_readers=metric_readers,
        resource=resource,
    )

    for backend in self._backends.values():
        self._telemetry.attach_backend(
            backend,
            session_id=self.uid,
            backend_name=backend.name,
            interval=resource_poll_interval,
        )

    await self._telemetry.start()

    self._telemetry.register_span_enricher(
        lambda event: (
            {"asyncflow.workflow_id": event.attributes["asyncflow.workflow_id"]}
            if isinstance(event.attributes, dict)
            and "asyncflow.workflow_id" in event.attributes
            else {}
        )
    )

    from rhapsody.telemetry.events import (  # noqa: PLC0415
        TaskCanceled,
        TaskCompleted,
        TaskCreated,
        TaskFailed,
        TaskQueued,
        TaskStarted,
        TaskSubmitted,
        define_event,
        make_event,
    )

    # asyncflow.TaskResolved is DAG-specific — not a RHAPSODY core event.
    # Defined here with define_event so RHAPSODY stays unaware of DAG semantics.
    TaskResolved = define_event("asyncflow.TaskResolved")

    self._tel_make_event = make_event
    self._tel_events = {
        "TaskCreated": TaskCreated,
        "TaskQueued": TaskQueued,
        "asyncflow.TaskResolved": TaskResolved,
        "TaskSubmitted": TaskSubmitted,
        "TaskStarted": TaskStarted,
        "TaskCompleted": TaskCompleted,
        "TaskFailed": TaskFailed,
        "TaskCanceled": TaskCanceled,
    }

    return self._telemetry

workflow_scope async

workflow_scope(workflow_id: str | None = None) -> AsyncIterator[str]

Tag all tasks registered inside this scope with a shared workflow_id.

Also creates an OTel workflow span (when telemetry is active) so that task spans registered inside become structural children in the trace hierarchy.

Parameters:

Name Type Description Default
workflow_id str | None

Explicit id for this workflow instance. If omitted, a short UUID is generated automatically.

None

Yields:

Type Description
AsyncIterator[str]

The workflow_id string in use.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@asynccontextmanager
async def workflow_scope(
    self, workflow_id: str | None = None
) -> AsyncIterator[str]:
    """Tag all tasks registered inside this scope with a shared workflow_id.

    Also creates an OTel workflow span (when telemetry is active) so that task spans
    registered inside become structural children in the trace hierarchy.

    Args:
        workflow_id: Explicit id for this workflow instance.
            If omitted, a short UUID is generated automatically.

    Yields:
        The workflow_id string in use.
    """
    wid = workflow_id or f"wf-{uuid.uuid4().hex[:8]}"
    token = self._workflow_id_ctx.set(wid)
    try:
        scope = (
            self._telemetry.span_scope("workflow", {"asyncflow.workflow_id": wid})
            if self._telemetry is not None
            else nullcontext()
        )
        with scope:
            yield wid
    finally:
        self._workflow_id_ctx.reset(token)

create async classmethod

create(backend: Any = None, dry_run: bool = False, implicit_data: bool = True, uid: Optional[str] = None, work_dir: Optional[str] = None) -> 'WorkflowEngine'

Factory method to create and initialize a WorkflowEngine.

Parameters:

Name Type Description Default
backend Any

Execution backend. If None and dry_run=True, uses NoopExecutionBackend

None
dry_run bool

Whether to run in dry-run mode

False
implicit_data bool

Whether to enable implicit data dependency linking

True
uid Optional[str]

Optional unique identifier for this engine. Defaults to asyncflow.session.{hex8} (UUID-based).

None
work_dir Optional[str]

Working directory for output files (default: cwd)

None

Returns:

Name Type Description
WorkflowEngine 'WorkflowEngine'

Fully initialized workflow engine

Example

engine = await WorkflowEngine.create(dry_run=True)

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@classmethod
async def create(
    cls,
    backend: Any = None,
    dry_run: bool = False,
    implicit_data: bool = True,
    uid: Optional[str] = None,
    work_dir: Optional[str] = None,
) -> "WorkflowEngine":
    """Factory method to create and initialize a WorkflowEngine.

    Args:
        backend: Execution backend. If None and dry_run=True,
                 uses NoopExecutionBackend
        dry_run: Whether to run in dry-run mode
        implicit_data: Whether to enable implicit data dependency linking
        uid: Optional unique identifier for this engine. Defaults to
            ``asyncflow.session.{hex8}`` (UUID-based).
        work_dir: Working directory for output files (default: cwd)

    Returns:
        WorkflowEngine: Fully initialized workflow engine

    Example:
        engine = await WorkflowEngine.create(dry_run=True)
    """
    # Setup and validate backend first
    validated_backend = await cls._setup_execution_backend(backend, dry_run)

    # Create instance with validated backend
    instance = cls(
        backend=validated_backend,
        dry_run=dry_run,
        implicit_data=implicit_data,
        uid=uid,
        work_dir=work_dir,
    )

    # Initialize async components
    await instance._start_async_components()

    return instance

run async

run()

Manages asynchronous execution of workflow components.

Continuously monitors and manages workflow components, handling their dependencies and execution states. Performs dependency resolution and prepares components for execution when their dependencies are satisfied.

Workflow Process
  1. Monitors unresolved components
  2. Checks dependency resolution status
  3. Prepares resolved components for execution
  4. Handles data staging between components
  5. Submits ready components to execution

Raises:

Type Description
CancelledError

If the coroutine is cancelled during execution

State Management
  • unresolved (set): Component UIDs with pending dependencies
  • resolved (set): Component UIDs with satisfied dependencies
  • running (list): Currently executing component UIDs
  • dependencies (dict): Maps component UIDs to dependency info
  • components (dict): Maps UIDs to component descriptions and futures
  • queue (asyncio.Queue): Execution queue for ready components
Note
  • Runs indefinitely until cancelled or shutdown is signaled
  • Uses sleep intervals to prevent busy-waiting
  • Handles both implicit and explicit data dependencies
  • Trigger internal shutdown on loop failure
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
async def run(self):
    """Manages asynchronous execution of workflow components.

    Continuously monitors and manages workflow components, handling their
    dependencies and execution states. Performs dependency resolution and
    prepares components for execution when their dependencies are satisfied.

    Workflow Process:
        1. Monitors unresolved components
        2. Checks dependency resolution status
        3. Prepares resolved components for execution
        4. Handles data staging between components
        5. Submits ready components to execution

    Raises:
        asyncio.CancelledError: If the coroutine is cancelled during execution

    State Management:
        - unresolved (set): Component UIDs with pending dependencies
        - resolved (set): Component UIDs with satisfied dependencies
        - running (list): Currently executing component UIDs
        - dependencies (dict): Maps component UIDs to dependency info
        - components (dict): Maps UIDs to component descriptions and futures
        - queue (asyncio.Queue): Execution queue for ready components

    Note:
        - Runs indefinitely until cancelled or shutdown is signaled
        - Uses sleep intervals to prevent busy-waiting
        - Handles both implicit and explicit data dependencies
        - Trigger internal shutdown on loop failure
    """
    while not self._shutdown_event.is_set():
        try:
            to_submit = []

            # Process ready components
            while self._ready_queue and not self._shutdown_event.is_set():
                comp_uid = self._ready_queue.popleft()

                # Skip if already processed
                if comp_uid in self.resolved or comp_uid in self.running:
                    continue

                # Check if future is already done (could be cancelled/failed)
                if self.components[comp_uid]["future"].done():
                    self.resolved.add(comp_uid)
                    self._notify_dependents(comp_uid)
                    continue

                # Verify dependencies are still met
                dependencies = self.dependencies[comp_uid]
                dep_futures = [
                    self.components[dep["uid"]]["future"] for dep in dependencies
                ]

                failed_deps = []
                cancelled_deps = []

                for fut in dep_futures:
                    if fut.cancelled():
                        cancelled_deps.append(fut)
                    elif fut.exception() is not None:
                        failed_deps.append(fut.exception())

                # Handle dependency issues
                if cancelled_deps or failed_deps:
                    comp_desc = self.components[comp_uid]["description"]

                    if cancelled_deps:
                        logger.info(
                            f"Cancelling {comp_desc['name']} "
                            "due to cancelled dependencies"
                        )
                        self.handle_task_cancellation(
                            comp_desc, self.components[comp_uid]["future"]
                        )
                    else:  # failed_deps
                        chained_exception = (
                            self._create_dependency_failure_exception(
                                comp_desc, failed_deps
                            )
                        )
                        self.handle_task_failure(
                            comp_desc,
                            self.components[comp_uid]["future"],
                            chained_exception,
                        )

                    # Common cleanup
                    self.resolved.add(comp_uid)
                    self._notify_dependents(comp_uid)
                    continue

                # Handle data dependencies for tasks
                comp_desc = self.components[comp_uid]["description"]
                if self.components[comp_uid]["type"] == TASK:
                    explicit_files_to_stage = []

                    for dep in dependencies:
                        dep_desc = self.components[dep["uid"]]["description"]

                        # Link implicit data dependencies
                        if self.implicit_data_mode and not dep_desc["metadata"].get(
                            "output_files"
                        ):
                            logger.debug(
                                f"Linking implicit file(s): from {dep_desc['name']} "
                                f"to {comp_desc['name']}"
                            )
                            self.backend.link_implicit_data_deps(
                                dep_desc, comp_desc
                            )

                        # Link explicit data dependencies
                        for output_file in dep_desc["metadata"]["output_files"]:
                            if output_file in comp_desc["metadata"]["input_files"]:
                                logger.debug(
                                    f"Linking explicit file ({output_file}) "
                                    f"from {dep_desc['name']} "
                                    f"to {comp_desc['name']}"
                                )
                                data_dep = self.backend.link_explicit_data_deps(
                                    src_task=dep_desc,
                                    dst_task=comp_desc,
                                    file_name=output_file,
                                )
                                explicit_files_to_stage.append(data_dep)

                    # Input staging data dependencies
                    dependency_output_files = self._get_dependency_output_files(
                        dependencies
                    )
                    staged_targets = {
                        Path(item["target"]).name
                        for item in explicit_files_to_stage
                    }

                    for input_file in comp_desc["metadata"]["input_files"]:
                        input_basename = Path(input_file).name
                        if (
                            input_basename not in staged_targets
                            and input_basename not in dependency_output_files
                        ):
                            logger.debug(
                                f"Staging {input_file} "
                                f"to {comp_desc['name']} work dir"
                            )
                            data_dep = self.backend.link_explicit_data_deps(
                                src_task=None,
                                dst_task=comp_desc,
                                file_name=input_basename,
                                file_path=input_file,
                            )
                            explicit_files_to_stage.append(data_dep)

                try:
                    # Update the component description with resolved values
                    (
                        comp_desc["args"],
                        comp_desc["kwargs"],
                    ) = await self._extract_dependency_values(comp_desc)
                except Exception as e:
                    logger.error(
                        f"Failed to resolve future for {comp_desc['name']}: {e}"
                    )
                    self.handle_task_failure(
                        comp_desc, self.components[comp_uid]["future"], e
                    )
                    self.resolved.add(comp_uid)
                    self._notify_dependents(comp_uid)
                    continue

                to_submit.append(comp_desc)
                res_deps = [dep["name"] for dep in dependencies]
                msg = f"Ready to submit: {comp_desc['name']}"
                msg += f" with resolved dependencies: {res_deps}"
                logger.debug(msg)

                if self.components[comp_uid]["type"] == TASK:
                    self._emit(
                        "TaskQueued",
                        task_id=comp_uid,
                        backend=comp_desc.get("target_backend"),
                        workflow_id=comp_desc.get("workflow_id"),
                        attributes={
                            "executable": comp_desc["_tel_executable"],
                            "task_type": comp_desc["_tel_task_type"],
                        },
                    )

            # Submit ready components
            if to_submit:
                await self.submit(to_submit)
                for comp_desc in to_submit:
                    comp_uid = comp_desc["uid"]
                    self.running.append(comp_uid)
                    self.resolved.add(comp_uid)

            # Check for completed components and update dependency tracking
            completed_components = []
            for comp_uid in list(self.running):
                if self.components[comp_uid]["future"].done():
                    completed_components.append(comp_uid)
                    self.running.remove(comp_uid)

            # Notify dependents of completed components
            for comp_uid in completed_components:
                self._notify_dependents(comp_uid)

            # Signal changes
            if completed_components:
                self._component_change_event.set()

            # If nothing is ready/running, wait for changes or shutdown
            if not self._ready_queue and not to_submit and not completed_components:
                try:
                    # Create tasks for event waiting
                    event_task = asyncio.create_task(
                        self._component_change_event.wait(),
                        name="component-event-task",
                    )
                    shutdown_task = asyncio.create_task(
                        self._shutdown_event.wait(), name="shutdown-event-task"
                    )

                    done, pending = await asyncio.wait(
                        [event_task, shutdown_task],
                        return_when=asyncio.FIRST_COMPLETED,
                        timeout=1.0,
                    )
                    # Cancel any pending tasks to clean up
                    for task in pending:
                        task.cancel()
                    # Clear component change event if it was set
                    if event_task in done:
                        self._component_change_event.clear()
                except asyncio.CancelledError:
                    # If we get cancelled, make sure to clean up our tasks
                    for task in [event_task, shutdown_task]:
                        if not task.done():
                            task.cancel()
                    raise
            else:
                await asyncio.sleep(0.01)

        except asyncio.CancelledError:
            logger.debug("Run component stopped")
            break
        except Exception as e:
            logger.exception(f"Error in run loop: {e}")
            await self._handle_shutdown_signal(signal.SIGUSR1, source="internal")
            break

submit async

submit(objects: list) -> None

Manages asynchronous submission of tasks/blocks to the execution backend.

Retrieves and submit ready tasks and blocks for execution. Separates incoming objects into tasks and blocks based on their UID pattern, and dispatches each to the appropriate backend method.

Submission Process
  1. Receive batch of objects
  2. Filters objects into tasks and blocks
  3. Submits tasks via backend.submit_tasks
  4. Submits blocks via _submit_blocks asynchronously

Parameters:

Name Type Description Default
objects list

Tasks and blocks are identified by uid field content

required

Returns:

Type Description
None

None

Raises:

Type Description
Exception

If an unexpected error occurs during submission

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
async def submit(self, objects: list) -> None:
    """Manages asynchronous submission of tasks/blocks to the execution backend.

    Retrieves and submit ready tasks and blocks for execution. Separates incoming
    objects into tasks and blocks based on their UID pattern, and dispatches each
    to the appropriate backend method.

    Submission Process:
        1. Receive batch of objects
        2. Filters objects into tasks and blocks
        3. Submits tasks via `backend.submit_tasks`
        4. Submits blocks via `_submit_blocks` asynchronously

    Args:
        objects: Tasks and blocks are identified by `uid` field content

    Returns:
        None

    Raises:
        Exception: If an unexpected error occurs during submission
    """
    try:
        # Separate tasks and blocks
        tasks = [t for t in objects if t and BLOCK not in t["uid"]]
        blocks = [b for b in objects if b and TASK not in b["uid"]]

        logger.info(f"Submitting {len(objects)} tasks/blocks for execution")
        logger.debug(f"Submitting: {[b['name'] for b in objects]}")

        if tasks:
            if self._telemetry is not None:
                now = time.time()
                for t in tasks:
                    self._task_submit_times[t["uid"]] = now
                    self._emit(
                        "TaskSubmitted",
                        task_id=t["uid"],
                        backend=t.get("target_backend"),
                        workflow_id=t.get("workflow_id"),
                        event_time=now,
                        attributes={
                            "executable": t["_tel_executable"],
                            "task_type": t["_tel_task_type"],
                        },
                    )

            if not any(t.get("target_backend") for t in tasks):
                # Fast path: all tasks go to the default backend
                await self.backend.submit_tasks(tasks)
            else:
                by_backend: dict = {}
                for t in tasks:
                    by_backend.setdefault(
                        t.get("target_backend") or self._default_backend_name, []
                    ).append(t)
                # Submit to all target backends in parallel
                await asyncio.gather(
                    *(
                        self._backends[b_name].submit_tasks(b_tasks)
                        for b_name, b_tasks in by_backend.items()
                    )
                )
        if blocks:
            await self._submit_blocks(blocks)
    except Exception as e:
        logger.exception(f"Error in submit component: {e}")
        raise

execute_block async

execute_block(block_fut: Future, func: Callable, *args: Any, **kwargs: Any) -> None

Executes a block function and sets its result on the associated future.

Calls the given function with provided args, awaiting it if it's a coroutine, or running it in the executor otherwise. On completion, updates the block_fut with the result or exception.

Parameters:

Name Type Description Default
block_fut Future

Future to store the result or exception.

required
func Callable

Function or coroutine function to execute.

required
*args Any

Positional arguments to pass to the function.

()
**kwargs Any

Keyword arguments to pass to the function.

{}

Returns:

Type Description
None

None

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
async def execute_block(
    self, block_fut: asyncio.Future, func: Callable, *args: Any, **kwargs: Any
) -> None:
    """Executes a block function and sets its result on the associated future.

    Calls the given function with provided args, awaiting it if it's a coroutine,
    or running it in the executor otherwise. On completion, updates the `block_fut`
    with the result or exception.

    Args:
        block_fut (asyncio.Future): Future to store the result or exception.
        func (Callable): Function or coroutine function to execute.
        *args: Positional arguments to pass to the function.
        **kwargs: Keyword arguments to pass to the function.

    Returns:
        None
    """

    block_uid = block_fut.block["uid"]
    token = self._workflow_id_ctx.set(block_uid)
    try:
        scope = (
            self._telemetry.span_scope(
                "block", {"asyncflow.workflow_id": block_uid}
            )
            if self._telemetry is not None
            else nullcontext()
        )
        with scope:
            result = await func(*args, **kwargs)
        if not block_fut.done():
            block_fut.set_result(result)
    except Exception as e:
        if not block_fut.done():
            block_fut.set_exception(e)
    finally:
        self._workflow_id_ctx.reset(token)

handle_task_success

handle_task_success(task: dict, task_fut: Future) -> None

Handles successful task completion and updates the associated future.

Sets the result of the task's future based on whether the task was a function or a shell command. Raises an error if the future is already resolved.

Parameters:

Name Type Description Default
task dict

Completed task descriptor containing - 'uid' (str): Unique task identifier - 'return_value' / 'stdout': Result of the task execution

required
task_fut Future

Future to set the result on.

required
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
def handle_task_success(self, task: dict, task_fut: asyncio.Future) -> None:
    """Handles successful task completion and updates the associated future.

    Sets the result of the task's future based on whether the task was a function
    or a shell command. Raises an error if the future is already resolved.

    Args:
        task (dict): Completed task descriptor containing
            - 'uid' (str): Unique task identifier
            - 'return_value' / 'stdout': Result of the task execution
        task_fut (asyncio.Future): Future to set the result on.
    """
    internal_task = self.components[task["uid"]]["description"]

    if not task_fut.done():
        if internal_task[FUNCTION]:
            task_fut.set_result(task["return_value"])
        else:
            task_fut.set_result(task["stdout"])
    else:
        logger.warning(
            f'Attempted to handle an already finished task "{task["uid"]}"'
        )

handle_task_failure

handle_task_failure(task: dict, task_fut: Future, override_error_message: Union[str, Exception] = None) -> None

Handles task failure and sets the appropriate exception on the future.

Marks the given task's future as failed by setting an exception derived from either a provided override error or the task's own recorded error/stderr. Logs a warning if the future is already resolved.

Parameters:

Name Type Description Default
task dict

Dictionary with task details, including: - 'uid' (str): Unique task identifier - 'exception' or 'stderr': Error information from execution

required
task_fut Union[SyncFuture, AsyncFuture]

Future to mark as failed.

required
override_error_message Union[str, Exception]

Custom error message or exception to set instead of the task's recorded error.

None

Returns:

Type Description
None

None

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
def handle_task_failure(
    self,
    task: dict,
    task_fut: asyncio.Future,
    override_error_message: Union[str, Exception] = None,
) -> None:
    """Handles task failure and sets the appropriate exception on the future.

    Marks the given task's future as failed by setting an exception derived from
    either a provided override error or the task's own recorded error/stderr. Logs
    a warning if the future is already resolved.

    Args:
        task (dict): Dictionary with task details, including:
            - 'uid' (str): Unique task identifier
            - 'exception' or 'stderr': Error information from execution
        task_fut (Union[SyncFuture, AsyncFuture]): Future to mark as failed.
        override_error_message (Union[str, Exception], optional): Custom
            error message or exception to set instead of the task's recorded error.

    Returns:
        None
    """
    if task_fut.done():
        logger.warning(
            f'Attempted to handle an already failed task "{task["uid"]}"'
        )
        return

    # Determine the appropriate exception to set
    if override_error_message is not None:
        # If it's already an exception (like DependencyFailureError),
        # use it directly
        if isinstance(override_error_message, Exception):
            exception = override_error_message
        else:
            # If it's a string, wrap it in RuntimeError
            exception = RuntimeError(str(override_error_message))
    else:
        # Use the task's original exception or stderr
        original_error = (
            task.get("exception")
            or task.get("stderr")
            or "failed with unknown error"
        )

        # Ensure we have an Exception object
        if isinstance(original_error, Exception):
            exception = original_error
        else:
            # If it's a string (stderr) or any other type, wrap it in RuntimeError
            exception = RuntimeError(str(original_error))

    task_fut.set_exception(exception)

handle_task_cancellation

handle_task_cancellation(task: dict, task_fut: Future)

Handle task cancellation.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
def handle_task_cancellation(self, task: dict, task_fut: asyncio.Future):
    """Handle task cancellation."""
    if task_fut.done():
        logger.warning(
            f'Attempted to handle an already cancelled task "{task["uid"]}"'
        )
        return

    # Restore original cancel method
    task_fut.cancel = task_fut.original_cancel
    return task_fut.cancel()

task_callbacks

task_callbacks(task: Any, state: str, service_callback: Optional[Callable] = None) -> None

Processes task state changes and invokes appropriate handlers.

Handles state transitions for tasks, updates their futures, and triggers relevant state-specific handlers. Supports optional service-specific callbacks for extended functionality.

Parameters:

Name Type Description Default
task Union[dict, object]

Task object or dictionary containing task state information.

required
state str

New state of the task.

required
service_callback Optional[Callable]

Callback function for service tasks. Must be daemon-threaded to avoid blocking. Defaults to None.

None

Returns:

Type Description
None

None

State Transitions
  • DONE: Calls handle_task_success
  • RUNNING: Marks future as running
  • CANCELED: Cancels the future
  • FAILED: Calls handle_task_failure
Logging
  • Debug: Non-relevant state received
  • Info: Task state changes
  • Warning: Unknown task received
Example

::

1
2
3
4
5
6
7
8
def service_ready_callback(future, task, state):
    def wait_and_set():
        try:
            # Synchronous operation
            future.set_result(info)
        except Exception as e:
            future.set_exception(e)
    threading.Thread(target=wait_and_set, daemon=True).start()
Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
@typeguard.typechecked
def task_callbacks(
    self, task: Any, state: str, service_callback: Optional[Callable] = None
) -> None:
    """Processes task state changes and invokes appropriate handlers.

    Handles state transitions for tasks, updates their futures, and triggers
    relevant state-specific handlers. Supports optional service-specific
    callbacks for extended functionality.

    Args:
        task (Union[dict, object]): Task object or dictionary containing task
            state information.
        state (str): New state of the task.
        service_callback (Optional[Callable], optional): Callback function
            for service tasks. Must be daemon-threaded to avoid blocking.
            Defaults to None.

    Returns:
        None

    State Transitions:
        - DONE: Calls handle_task_success
        - RUNNING: Marks future as running
        - CANCELED: Cancels the future
        - FAILED: Calls handle_task_failure

    Logging:
        - Debug: Non-relevant state received
        - Info: Task state changes
        - Warning: Unknown task received

    Example:
        ::

            def service_ready_callback(future, task, state):
                def wait_and_set():
                    try:
                        # Synchronous operation
                        future.set_result(info)
                    except Exception as e:
                        future.set_exception(e)
                threading.Thread(target=wait_and_set, daemon=True).start()
    """
    if (
        state not in self.task_states_map.terminal_states
        and state != self.task_states_map.RUNNING
    ):
        logger.debug(f"Non-relevant task state received: {state}. Skipping state.")
        return

    task_obj = task
    if isinstance(task, dict):
        task_dct = task
    else:
        task_dct = task.as_dict()

    if task_dct["uid"] not in self.components:
        logger.warning(
            f"Received an unknown task and will skip it: {task_dct['uid']}"
        )
        return

    comp = self.components[task_dct["uid"]]
    task_fut = comp["future"]
    comp_desc = comp["description"]
    logger.info(f"{task_dct['uid']} is in {state} state")

    if service_callback:
        # service tasks are marked done by a backend specific
        # mechanism that are provided during the callbacks only
        service_callback(task_fut, task_obj, state)

    uid = task_dct["uid"]

    tel_attrs = {
        "executable": comp_desc["_tel_executable"],
        "task_type": comp_desc["_tel_task_type"],
    }

    if state == self.task_states_map.DONE:
        self.handle_task_success(task_dct, task_fut)
        if self._telemetry is not None:
            now = time.time()
            start = self._task_start_times.pop(
                uid, self._task_submit_times.get(uid, now)
            )
            self._task_submit_times.pop(uid, None)
            self._emit(
                "TaskCompleted",
                task_id=uid,
                workflow_id=task_dct.get("workflow_id"),
                event_time=now,
                duration_seconds=now - start,
                attributes=tel_attrs,
            )
    elif state == self.task_states_map.RUNNING:
        # NOTE: with asyncio future the running state is
        # implicit: when a coroutine that awaits the future
        # is scheduled and started by the event loop, that's
        # when the "work" is running.
        if self._telemetry is not None:
            now = time.time()
            self._task_start_times[uid] = now
            self._emit(
                "TaskStarted",
                task_id=uid,
                workflow_id=task_dct.get("workflow_id"),
                event_time=now,
                attributes=tel_attrs,
            )
    elif state == self.task_states_map.CANCELED:
        self.handle_task_cancellation(task_dct, task_fut)
        if self._telemetry is not None:
            now = time.time()
            start = self._task_start_times.pop(
                uid, self._task_submit_times.get(uid, now)
            )
            self._task_submit_times.pop(uid, None)
            self._emit(
                "TaskCanceled",
                task_id=uid,
                workflow_id=task_dct.get("workflow_id"),
                event_time=now,
                duration_seconds=now - start,
                attributes=tel_attrs,
            )
    elif state == self.task_states_map.FAILED:
        self.handle_task_failure(task_dct, task_fut)
        if self._telemetry is not None:
            now = time.time()
            start = self._task_start_times.pop(
                uid, self._task_submit_times.get(uid, now)
            )
            self._task_submit_times.pop(uid, None)
            self._emit(
                "TaskFailed",
                task_id=uid,
                workflow_id=task_dct.get("workflow_id"),
                event_time=now,
                duration_seconds=now - start,
                error_type="unknown",
                attributes=tel_attrs,
            )

shutdown async

shutdown(skip_execution_backend: bool = False) -> None

Internal implementation of asynchronous shutdown for the workflow manager.

This method performs the following steps
  1. Sets the shutdown event to signal components to exit
  2. Cancels background tasks responsible for running and submitting workflows.
  3. Waits for the cancellation and completion of these tasks, with a timeout of 5 seconds.
  4. Cancel workflow tasks.
  5. Logs a warning if the tasks do not complete within the timeout period.
  6. Shuts down the backend using an executor to avoid blocking the event loop.

Parameters:

Name Type Description Default
skip_execution_backend bool

If True, skips the shutdown of the execution backend.

False

Returns:

Type Description
None

None

Raises:

Type Description
TimeoutError

If the background tasks do not complete within the timeout period.

CancelledError

If the shutdown is cancelled before completion.

Source code in doc_env/lib/python3.14/site-packages/radical/asyncflow/workflow_manager.py
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
async def shutdown(self, skip_execution_backend: bool = False) -> None:
    """Internal implementation of asynchronous shutdown for the workflow manager.

    This method performs the following steps:
        1. Sets the shutdown event to signal components to exit
        2. Cancels background tasks responsible for running and
        submitting workflows.
        3. Waits for the cancellation and completion of these tasks,
        with a timeout of 5 seconds.
        4. Cancel workflow tasks.
        5. Logs a warning if the tasks do not complete within the timeout
        period.
        6. Shuts down the backend using an executor to avoid blocking the
        event loop.

    Args:
        skip_execution_backend (bool): If True, skips the shutdown of the
            execution backend.

    Returns:
        None

    Raises:
        asyncio.TimeoutError: If the background tasks do not complete
            within the timeout period.
        asyncio.CancelledError: If the shutdown is cancelled before
            completion.
    """
    logger.info("Initiating shutdown")
    # Signal components to exit
    self._shutdown_event.set()

    # cancel workflow futures (tasks and blocks)
    for comp in self.components.values():
        future = comp["future"]
        comp_desc = comp["description"]
        if not future.done():
            self.handle_task_cancellation(comp_desc, future)

    # Cancel internal components task
    if not self._run_task.done():
        logger.debug(f"Shutting down run component")
        self._run_task.cancel()

    # Wait for internal components shutdown to complete
    try:
        await asyncio.wait_for(self._run_task, timeout=5.0)
    except asyncio.TimeoutError:
        logger.warning("Timeout waiting for internal components to shutdown")
    except asyncio.CancelledError:
        logger.warning("Internal components shutdown cancelled")

    # Shutdown all registered execution backends
    if not skip_execution_backend and self._backends:
        await asyncio.gather(*[b.shutdown() for b in self._backends.values()])
        self._clear_internal_records()
        logger.debug("Shutting down execution backend completed")
    else:
        logger.warning("Skipping execution backend shutdown as requested")

    logger.info("Shutdown completed for all components.")