An asynchronous workflow manager that uses asyncio event loops and coroutines to manage and execute workflow components (blocks and/or tasks) within Directed Acyclic Graph (DAG) or Chain Graph (CG) structures.
This class provides async/await operations and handles task dependencies, input/output data staging, and execution.
Attributes:
Name | Type | Description |
loop | AbstractEventLoop | The asyncio event loop (current running loop). |
backend | BaseExecutionBackend | The execution backend used for task execution. |
dry_run | bool | Indicates whether the engine is in dry-run mode. |
work_dir | str | The working directory for the workflow session. |
log | Logger | Logger instance for logging workflow events. |
prof | Profiler | Profiler instance for profiling workflow execution. |
Initialize the WorkflowEngine (sync part only).
Note: This is a private constructor. Use WorkflowEngine.create() instead.
Parameters:
Name | Type | Description | Default |
backend | BaseExecutionBackend | Execution backend (required, pre-validated) | required |
dry_run | bool | Whether to run in dry-run mode | False |
implicit_data | bool | Whether to enable implicit data dependency linking | True |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 | @typeguard.typechecked
def __init__(self, backend: BaseExecutionBackend,
dry_run: bool = False, implicit_data: bool = True) -> None:
"""
Initialize the WorkflowEngine (sync part only).
Note: This is a private constructor. Use WorkflowEngine.create() instead.
Args:
backend: Execution backend (required, pre-validated)
dry_run: Whether to run in dry-run mode
implicit_data: Whether to enable implicit data dependency linking
"""
# Get the current running loop - assume it exists
self.loop = _get_event_loop_or_raise("WorkflowEngine")
# Store backend (already validated by create method)
self.backend = backend
# Initialize core attributes
self.running = []
self.components = {}
self.resolved = set()
self.dependencies = {}
self.dry_run = dry_run
self.queue = asyncio.Queue()
self.implicit_data_mode = implicit_data
# Optimization: Track component state changes
self._ready_queue = deque()
self._dependents_map = defaultdict(set)
self._dependency_count = {}
self._component_change_event = asyncio.Event()
self.task_states_map = self.backend.get_task_states_map()
# Setup working directory
self.work_dir = self.backend.session.path or os.getcwd()
# Setup logging and profiling
self.log = ru.Logger(name='workflow_manager', ns='radical.asyncflow', path=self.work_dir)
self.prof = ru.Profiler(name='workflow_manager', ns='radical.asyncflow', path=self.work_dir)
# Register callback with backend
self.backend.register_callback(self.task_callbacks)
# Define decorators
self.block = self._register_decorator(comp_type=BLOCK)
self.function_task = self._register_decorator(comp_type=TASK, task_type=FUNCTION)
self.executable_task = self._register_decorator(comp_type=TASK, task_type=EXECUTABLE)
# Initialize async task references (will be set in _start_async_components)
self._run_task = None
self._submit_task = None
self._shutdown_event = asyncio.Event() # Added shutdown signal
self._setup_signal_handlers()
|
create async
classmethod
Factory method to create and initialize a WorkflowEngine.
Parameters:
Name | Type | Description | Default |
backend | Optional[BaseExecutionBackend] | Execution backend. If None and dry_run=True, uses NoopExecutionBackend | None |
dry_run | bool | Whether to run in dry-run mode | False |
implicit_data | bool | Whether to enable implicit data dependency linking | True |
Returns:
Name | Type | Description |
WorkflowEngine | WorkflowEngine | Fully initialized workflow engine |
Example
engine = await WorkflowEngine.create(dry_run=True)
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 | @classmethod
async def create(cls, backend: Optional[BaseExecutionBackend] = None,
dry_run: bool = False, implicit_data: bool = True) -> 'WorkflowEngine':
"""
Factory method to create and initialize a WorkflowEngine.
Args:
backend: Execution backend. If None and dry_run=True, uses NoopExecutionBackend
dry_run: Whether to run in dry-run mode
implicit_data: Whether to enable implicit data dependency linking
Returns:
WorkflowEngine: Fully initialized workflow engine
Example:
engine = await WorkflowEngine.create(dry_run=True)
"""
# Setup and validate backend first
validated_backend = cls._setup_execution_backend(backend, dry_run)
# Create instance with validated backend
instance = cls(backend=validated_backend, dry_run=dry_run, implicit_data=implicit_data)
# Initialize async components
await instance._start_async_components()
return instance
|
run async
Manages asynchronous execution of workflow components.
Continuously monitors and manages workflow components, handling their dependencies and execution states. Performs dependency resolution and prepares components for execution when their dependencies are satisfied.
Workflow Process
- Monitors unresolved components
- Checks dependency resolution status
- Prepares resolved components for execution
- Handles data staging between components
- Submits ready components to execution queue
Returns:
Raises:
Type | Description |
CancelledError | If the coroutine is cancelled during execution |
State Management
- unresolved (set): Component UIDs with pending dependencies
- resolved (set): Component UIDs with satisfied dependencies
- running (list): Currently executing component UIDs
- dependencies (dict): Maps component UIDs to dependency info
- components (dict): Maps UIDs to component descriptions and futures
- queue (asyncio.Queue): Execution queue for ready components
Note
- Runs indefinitely until cancelled or shutdown is signaled
- Uses sleep intervals to prevent busy-waiting
- Handles both implicit and explicit data dependencies
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845 | async def run(self):
"""Manages asynchronous execution of workflow components.
Continuously monitors and manages workflow components, handling their
dependencies and execution states. Performs dependency resolution and
prepares components for execution when their dependencies are satisfied.
Workflow Process:
1. Monitors unresolved components
2. Checks dependency resolution status
3. Prepares resolved components for execution
4. Handles data staging between components
5. Submits ready components to execution queue
Args:
None
Returns:
None
Raises:
asyncio.CancelledError: If the coroutine is cancelled during execution
State Management:
- unresolved (set): Component UIDs with pending dependencies
- resolved (set): Component UIDs with satisfied dependencies
- running (list): Currently executing component UIDs
- dependencies (dict): Maps component UIDs to dependency info
- components (dict): Maps UIDs to component descriptions and futures
- queue (asyncio.Queue): Execution queue for ready components
Note:
- Runs indefinitely until cancelled or shutdown is signaled
- Uses sleep intervals to prevent busy-waiting
- Handles both implicit and explicit data dependencies
"""
while not self._shutdown_event.is_set():
try:
to_submit = []
# Process ready components
while self._ready_queue and not self._shutdown_event.is_set():
comp_uid = self._ready_queue.popleft()
# Skip if already processed
if comp_uid in self.resolved or comp_uid in self.running:
continue
# Check if future is already done (could be cancelled/failed)
if self.components[comp_uid]['future'].done():
self.resolved.add(comp_uid)
self._notify_dependents(comp_uid)
continue
# Verify dependencies are still met
dependencies = self.dependencies[comp_uid]
dep_futures = [self.components[dep['uid']]['future'] for dep in dependencies]
failed_deps = []
cancelled_deps = []
for fut in dep_futures:
if fut.cancelled():
cancelled_deps.append(fut)
elif fut.exception() is not None:
failed_deps.append(fut.exception())
# Handle dependency issues
if cancelled_deps or failed_deps:
comp_desc = self.components[comp_uid]['description']
if cancelled_deps:
self.log.info(f"Cancelling {comp_desc['name']} due to cancelled dependencies")
self.handle_task_cancellation(comp_desc, self.components[comp_uid]['future'])
else: # failed_deps
chained_exception = self._create_dependency_failure_exception(comp_desc, failed_deps)
self.log.error(f"Dependency failure for {comp_desc['name']}: {chained_exception}")
self.handle_task_failure(comp_desc, self.components[comp_uid]['future'], chained_exception)
# Common cleanup
self.resolved.add(comp_uid)
self._notify_dependents(comp_uid)
continue
# Handle data dependencies for tasks
comp_desc = self.components[comp_uid]['description']
if self.components[comp_uid]['type'] == TASK:
explicit_files_to_stage = []
for dep in dependencies:
dep_desc = self.components[dep['uid']]['description']
# Link implicit data dependencies
if self.implicit_data_mode and not dep_desc['metadata'].get('output_files'):
self.log.debug(f'Linking implicit file(s): from {dep_desc["name"]} to {comp_desc["name"]}')
self.backend.link_implicit_data_deps(dep_desc, comp_desc)
# Link explicit data dependencies
for output_file in dep_desc['metadata']['output_files']:
if output_file in comp_desc['metadata']['input_files']:
self.log.debug(f'Linking explicit file ({output_file}) from {dep_desc["name"]} to {comp_desc["name"]}')
data_dep = self.backend.link_explicit_data_deps(
src_task=dep_desc,
dst_task=comp_desc,
file_name=output_file
)
explicit_files_to_stage.append(data_dep)
# Input staging data dependencies
dependency_output_files = self._get_dependency_output_files(dependencies)
staged_targets = {Path(item['target']).name for item in explicit_files_to_stage}
for input_file in comp_desc['metadata']['input_files']:
input_basename = Path(input_file).name
if input_basename not in staged_targets and input_basename not in dependency_output_files:
self.log.debug(f'Staging {input_file} to {comp_desc["name"]} work dir')
data_dep = self.backend.link_explicit_data_deps(
src_task=None,
dst_task=comp_desc,
file_name=input_basename,
file_path=input_file
)
explicit_files_to_stage.append(data_dep)
try:
# Update the component description with resolved values
comp_desc['args'], comp_desc['kwargs'] = await self._extract_dependency_values(comp_desc)
except Exception as e:
self.log.error(f"Failed to resolve future for {comp_desc['name']}: {e}")
self.handle_task_failure(comp_desc, self.components[comp_uid]['future'], e)
self.resolved.add(comp_uid)
self._notify_dependents(comp_uid)
continue
to_submit.append(comp_desc)
msg = f"Ready to submit: {comp_desc['name']}"
msg += f" with resolved dependencies: {[dep['name'] for dep in dependencies]}"
self.log.debug(msg)
# Submit ready components
if to_submit:
await self.queue.put(to_submit)
for comp_desc in to_submit:
comp_uid = comp_desc['uid']
self.running.append(comp_uid)
self.resolved.add(comp_uid)
# Check for completed components and update dependency tracking
completed_components = []
for comp_uid in list(self.running):
if self.components[comp_uid]['future'].done():
completed_components.append(comp_uid)
self.running.remove(comp_uid)
# Notify dependents of completed components
for comp_uid in completed_components:
self._notify_dependents(comp_uid)
# Signal changes
if completed_components:
self._component_change_event.set()
# If nothing is ready and nothing is running, wait for changes or shutdown
if not self._ready_queue and not to_submit and not completed_components:
try:
# Create tasks for event waiting
event_task = asyncio.create_task(
self._component_change_event.wait(),
name='component-event-task')
shutdown_task = asyncio.create_task(
self._shutdown_event.wait(),
name='shutdown-event-task')
done, pending = await asyncio.wait(
[event_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
timeout=1.0
)
# Cancel any pending tasks to clean up
for task in pending:
task.cancel()
# Clear component change event if it was set
if event_task in done:
self._component_change_event.clear()
except asyncio.CancelledError:
# If we get cancelled, make sure to clean up our tasks
for task in [event_task, shutdown_task]:
if not task.done():
task.cancel()
raise
else:
await asyncio.sleep(0.01)
except asyncio.CancelledError:
self.log.debug("Run component cancelled")
break
except Exception as e:
self.log.exception(f"Error in run loop: {e}")
await asyncio.sleep(0.1)
|
submit async
Manages asynchronous submission of tasks and blocks to the execution backend.
Continuously monitors the internal queue, retrieves ready tasks and blocks, and submits them for execution. Separates incoming objects into tasks and blocks based on their UID pattern, and dispatches each to the appropriate backend method.
Submission Process
- Waits for a batch of objects from the queue
- Filters objects into tasks and blocks
- Submits tasks via
backend.submit_tasks
- Submits blocks via
_submit_blocks
asynchronously - Retries on timeout with a short delay
Returns:
Raises:
Type | Description |
Exception | If an unexpected error occurs during submission |
Queue Management
- queue (asyncio.Queue): Holds lists of objects ready for execution
- Each queue item is a list of dicts representing tasks and/or blocks
- Tasks and blocks are identified by
uid
field content
Note
- Runs indefinitely until cancelled or shutdown is signaled
- Uses a 1-second timeout to avoid blocking indefinitely
- Handles
asyncio.TimeoutError
gracefully with a sleep interval
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906 | async def submit(self):
"""Manages asynchronous submission of tasks and blocks to the execution backend.
Continuously monitors the internal queue, retrieves ready tasks and blocks,
and submits them for execution. Separates incoming objects into tasks and
blocks based on their UID pattern, and dispatches each to the appropriate
backend method.
Submission Process:
1. Waits for a batch of objects from the queue
2. Filters objects into tasks and blocks
3. Submits tasks via `backend.submit_tasks`
4. Submits blocks via `_submit_blocks` asynchronously
5. Retries on timeout with a short delay
Args:
None
Returns:
None
Raises:
Exception: If an unexpected error occurs during submission
Queue Management:
- queue (asyncio.Queue): Holds lists of objects ready for execution
- Each queue item is a list of dicts representing tasks and/or blocks
- Tasks and blocks are identified by `uid` field content
Note:
- Runs indefinitely until cancelled or shutdown is signaled
- Uses a 1-second timeout to avoid blocking indefinitely
- Handles `asyncio.TimeoutError` gracefully with a sleep interval
"""
while not self._shutdown_event.is_set():
try:
objects = await asyncio.wait_for(self.queue.get(), timeout=1)
# Separate tasks and blocks
tasks = [t for t in objects if t and BLOCK not in t['uid']]
blocks = [b for b in objects if b and TASK not in b['uid']]
self.log.debug(f'Submitting {[b["name"] for b in objects]} for execution')
if tasks:
await self.backend.submit_tasks(tasks)
if blocks:
await self._submit_blocks(blocks)
except asyncio.TimeoutError:
# Check shutdown signal during timeout
if self._shutdown_event.is_set():
self.log.debug("Submit component exiting due to shutdown signal")
break
await asyncio.sleep(0.5)
except asyncio.CancelledError:
self.log.debug("Submit component cancelled")
break
except Exception as e:
self.log.exception(f"Error in submit component: {e}")
raise
|
execute_block async
execute_block(block_fut: Future, func: Callable, *args, **kwargs)
Executes a block function and sets its result on the associated future.
Calls the given function with provided arguments, awaiting it if it's a coroutine, or running it in the executor otherwise. On completion, updates the block_fut
with the result or exception.
Parameters:
Name | Type | Description | Default |
block_fut | Future | Future to store the result or exception. | required |
func | Callable | Function or coroutine function to execute. | required |
*args | | Positional arguments to pass to the function. | () |
**kwargs | | Keyword arguments to pass to the function. | {} |
Returns:
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981 | async def execute_block(self, block_fut: asyncio.Future, func: Callable, *args, **kwargs):
"""Executes a block function and sets its result on the associated future.
Calls the given function with provided arguments, awaiting it if it's a coroutine,
or running it in the executor otherwise. On completion, updates the `block_fut`
with the result or exception.
Args:
block_fut (asyncio.Future): Future to store the result or exception.
func (Callable): Function or coroutine function to execute.
*args: Positional arguments to pass to the function.
**kwargs: Keyword arguments to pass to the function.
Returns:
None
"""
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
# Run sync function in executor
result = await self.loop.run_in_executor(None, func, *args, **kwargs)
if not block_fut.done():
block_fut.set_result(result)
except Exception as e:
if not block_fut.done():
block_fut.set_exception(e)
|
handle_task_success
handle_task_success(task: dict, task_fut: Future)
Handles successful task completion and updates the associated future.
Sets the result of the task's future based on whether the task was a function or a shell command. Raises an error if the future is already resolved.
Parameters:
Name | Type | Description | Default |
task | dict | Completed task descriptor containing - 'uid' (str): Unique task identifier - 'return_value' / 'stdout': Result of the task execution | required |
task_fut | Future | Future to set the result on. | required |
Returns:
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009 | def handle_task_success(self, task: dict, task_fut: asyncio.Future):
"""Handles successful task completion and updates the associated future.
Sets the result of the task's future based on whether the task was a function
or a shell command. Raises an error if the future is already resolved.
Args:
task (dict): Completed task descriptor containing
- 'uid' (str): Unique task identifier
- 'return_value' / 'stdout': Result of the task execution
task_fut (asyncio.Future): Future to set the result on.
Returns:
None
Raises:
None
"""
internal_task = self.components[task['uid']]['description']
if not task_fut.done():
if internal_task[FUNCTION]:
task_fut.set_result(task['return_value'])
else:
task_fut.set_result(task['stdout'])
else:
self.log.warning(f'Attempted to handle an already finished task "{task["uid"]}"')
|
handle_task_failure
handle_task_failure(task: dict, task_fut: Future, override_error_message: Union[str, Exception] = None) -> None
Handles task failure and sets the appropriate exception on the future.
Marks the given task's future as failed by setting an exception derived from either a provided override error or the task's own recorded error/stderr. Logs a warning if the future is already resolved.
Parameters:
Name | Type | Description | Default |
task | dict | Dictionary with task details, including: - 'uid' (str): Unique task identifier - 'exception' or 'stderr': Error information from execution | required |
task_fut | Union[SyncFuture, AsyncFuture] | Future to mark as failed. | required |
override_error_message | Union[str, Exception] | Custom error message or exception to set instead of the task's recorded error. | None |
Returns:
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055 | def handle_task_failure(self, task: dict, task_fut: asyncio.Future,
override_error_message: Union[str, Exception] = None) -> None:
"""Handles task failure and sets the appropriate exception on the future.
Marks the given task's future as failed by setting an exception derived from
either a provided override error or the task's own recorded error/stderr. Logs
a warning if the future is already resolved.
Args:
task (dict): Dictionary with task details, including:
- 'uid' (str): Unique task identifier
- 'exception' or 'stderr': Error information from execution
task_fut (Union[SyncFuture, AsyncFuture]): Future to mark as failed.
override_error_message (Union[str, Exception], optional): Custom error message
or exception to set instead of the task's recorded error.
Returns:
None
"""
if task_fut.done():
self.log.warning(f'Attempted to handle an already failed task "{task["uid"]}"')
return
internal_task = self.components[task['uid']]['description']
# Determine the appropriate exception to set
if override_error_message is not None:
# If it's already an exception (like DependencyFailure), use it directly
if isinstance(override_error_message, Exception):
exception = override_error_message
else:
# If it's a string, wrap it in RuntimeError
exception = RuntimeError(str(override_error_message))
else:
# Use the task's original exception or stderr
original_error = task.get('exception') or task.get('stderr') or 'failed with unknown error'
# Ensure we have an Exception object
if isinstance(original_error, Exception):
exception = original_error
else:
# If it's a string (stderr) or any other type, wrap it in RuntimeError
exception = RuntimeError(str(original_error))
task_fut.set_exception(exception)
|
handle_task_cancellation
handle_task_cancellation(task: dict, task_fut: Future)
Handle task cancellation.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
1057
1058
1059
1060
1061
1062
1063
1064
1065 | def handle_task_cancellation(self, task: dict, task_fut: asyncio.Future):
"""Handle task cancellation."""
if task_fut.done():
self.log.warning(f'Attempted to handle an already cancelled task "{task["uid"]}"')
return
# Restore original cancel method
task_fut.cancel = task_fut.original_cancel
return task_fut.cancel()
|
task_callbacks
task_callbacks(task, state: str, service_callback: Optional[Callable] = None)
Processes task state changes and invokes appropriate handlers.
Handles state transitions for tasks, updates their futures, and triggers relevant state-specific handlers. Supports optional service-specific callbacks for extended functionality.
Parameters:
Name | Type | Description | Default |
task | Union[dict, object] | Task object or dictionary containing task state information. | required |
state | str | | required |
service_callback | Optional[Callable] | | None |
Returns:
State Transitions
- DONE: Calls handle_task_success
- RUNNING: Marks future as running
- CANCELED: Cancels the future
- FAILED: Calls handle_task_failure
Logging
- Debug: Non-relevant state received
- Info: Task state changes
- Warning: Unknown task received
Example
::
| def service_ready_callback(future, task, state):
def wait_and_set():
try:
# Synchronous operation
future.set_result(info)
except Exception as e:
future.set_exception(e)
threading.Thread(target=wait_and_set, daemon=True).start()
|
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142 | @typeguard.typechecked
def task_callbacks(self, task, state: str, service_callback: Optional[Callable] = None):
"""Processes task state changes and invokes appropriate handlers.
Handles state transitions for tasks, updates their futures, and triggers
relevant state-specific handlers. Supports optional service-specific
callbacks for extended functionality.
Args:
task (Union[dict, object]): Task object or dictionary containing task state information.
state (str): New state of the task.
service_callback (Optional[Callable], optional): Callback function
for service tasks. Must be daemon-threaded to avoid blocking.
Defaults to None.
Returns:
None
State Transitions:
- DONE: Calls handle_task_success
- RUNNING: Marks future as running
- CANCELED: Cancels the future
- FAILED: Calls handle_task_failure
Logging:
- Debug: Non-relevant state received
- Info: Task state changes
- Warning: Unknown task received
Example:
::
def service_ready_callback(future, task, state):
def wait_and_set():
try:
# Synchronous operation
future.set_result(info)
except Exception as e:
future.set_exception(e)
threading.Thread(target=wait_and_set, daemon=True).start()
"""
if state not in self.task_states_map.terminal_states and \
state != self.task_states_map.RUNNING:
self.log.debug(f"Non-relevant task state received: {state}. Skipping state.")
return
task_obj = task
if isinstance(task, dict):
task_dct = task
else:
task_dct = task.as_dict()
if task_dct['uid'] not in self.components:
self.log.warning(f'Received an unknown task and will skip it: {task_dct["uid"]}')
return
task_fut = self.components[task_dct['uid']]['future']
self.log.info(f'{task_dct["uid"]} is in {state} state')
if service_callback:
# service tasks are marked done by a backend specific
# mechanism that are provided during the callbacks only
service_callback(task_fut, task_obj, state)
if state == self.task_states_map.DONE:
self.handle_task_success(task_dct, task_fut)
elif state == self.task_states_map.RUNNING:
# NOTE: with asyncio future the running state is
# implicit: when a coroutine that awaits the future
# is scheduled and started by the event loop, that’s
# when the “work” is running.
pass
elif state == self.task_states_map.CANCELED:
self.handle_task_cancellation(task_dct, task_fut)
elif state == self.task_states_map.FAILED:
self.handle_task_failure(task_dct, task_fut)
|
shutdown async
shutdown(skip_execution_backend: bool = False)
Internal implementation of asynchronous shutdown for the workflow manager.
This method performs the following steps
- Sets the shutdown event to signal components to exit
- Cancels background tasks responsible for running and submitting workflows.
- Waits for the cancellation and completion of these tasks, with a timeout of 5 seconds.
- Cancel workflow tasks.
- Logs a warning if the tasks do not complete within the timeout period.
- Shuts down the backend using an executor to avoid blocking the event loop.
Parameters:
Name | Type | Description | Default |
skip_execution_backend | bool | If True, skips the shutdown of the execution backend. | False |
Returns:
Raises:
Type | Description |
TimeoutError | If the background tasks do not complete within the timeout period. |
CancelledError | If the shutdown is cancelled before completion. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/workflow_manager.py
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209 | async def shutdown(self, skip_execution_backend: bool = False):
"""
Internal implementation of asynchronous shutdown for the workflow manager.
This method performs the following steps:
1. Sets the shutdown event to signal components to exit
2. Cancels background tasks responsible for running and
submitting workflows.
3. Waits for the cancellation and completion of these tasks,
with a timeout of 5 seconds.
4. Cancel workflow tasks.
5. Logs a warning if the tasks do not complete within the timeout
period.
6. Shuts down the backend using an executor to avoid blocking the
event loop.
Args:
skip_execution_backend (bool): If True, skips the shutdown of the
execution backend.
Returns:
None
Raises:
asyncio.TimeoutError: If the background tasks do not complete
within the timeout period.
asyncio.CancelledError: If the shutdown is cancelled before
completion.
"""
self.log.debug("Initiating shutdown")
# Signal components to exit
self._shutdown_event.set()
internal_components = [t for t in (self._run_task, self._submit_task) if t]
# Cancel internal components tasks
for component in internal_components:
if component and not component.done():
component_name = component.get_coro().__name__
self.log.debug(f"Shutting down {component_name} component")
component.cancel()
# Wait for internal components shutdown to complete
try:
await asyncio.wait_for(
asyncio.gather(*internal_components, return_exceptions=True),
timeout=5.0
)
except asyncio.TimeoutError:
self.log.warning("Timeout waiting for internal components to shutdown")
except asyncio.CancelledError:
self.log.warning("Internal components shutdown cancelled")
# cancel workflow futures (tasks and blocks)
for comp in self.components.values():
future = comp['future']
comp_desc = comp['description']
if not future.done():
self.handle_task_cancellation(comp_desc, future)
# Shutdown execution backend
if not skip_execution_backend and self.backend:
await self.backend.shutdown()
self.log.debug("Shutting down execution backend")
else:
self.log.warning("Skipping execution backend shutdown as requested")
|