radical.asyncflow.backends.execution.radical_pilot¶
RadicalExecutionBackend ¶
RadicalExecutionBackend(resources: Dict, raptor_config: Optional[Dict] = None)
Bases: BaseExecutionBackend
Radical Pilot-based execution backend for large-scale HPC task execution.
The RadicalExecutionBackend manages computing resources and task execution using the Radical Pilot framework. It interfaces with various resource management systems (SLURM, FLUX, etc.) on diverse HPC machines, providing capabilities for session management, task lifecycle control, and resource allocation.
This backend supports both traditional task execution and advanced features like Raptor mode for high-throughput computing scenarios. It handles pilot submission, task management, and provides data dependency linking mechanisms.
Attributes:
Name | Type | Description |
---|---|---|
session | Session | Primary session for managing task execution context, uniquely identified by a generated ID. |
task_manager | TaskManager | Manages task lifecycle including submission, tracking, and completion within the session. |
pilot_manager | PilotManager | Coordinates computing resources (pilots) that are dynamically allocated based on resource requirements. |
resource_pilot | Pilot | Submitted computing resources configured according to the provided resource specifications. |
tasks | Dict | Dictionary storing task descriptions indexed by UID. |
raptor_mode | bool | Flag indicating whether Raptor mode is enabled. |
masters | list | List of master tasks when Raptor mode is enabled. |
workers | list | List of worker tasks when Raptor mode is enabled. |
master_selector | callable | Generator for load balancing across masters. |
_callback_func | Callable | Registered callback function for task events. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resources | Dict | Resource requirements for the pilot including CPU, GPU, and memory specifications. | required |
raptor_config | Optional[Dict] | Configuration for enabling Raptor mode. Contains master and worker task specifications. | None |
Raises:
Type | Description |
---|---|
Exception | If session creation, pilot submission, or task manager setup fails. |
SystemExit | If KeyboardInterrupt or SystemExit occurs during initialization. |
Example
:: resources = { "resource": "local.localhost", "runtime": 30, "exit_on_error": True, "cores": 4 } backend = await RadicalExecutionBackend(resources)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Initialize the RadicalExecutionBackend with resources and optional Raptor config.
Creates a new Radical Pilot session, initializes task and pilot managers, submits pilots based on resource configuration, and optionally enables Raptor mode for high-throughput computing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resources | Dict | Resource configuration for the Radical Pilot session. Must contain valid pilot description parameters such as: - resource: Target resource (e.g., "local.localhost") - runtime: Maximum runtime in minutes - cores: Number of CPU cores - gpus: Number of GPUs (optional) | required |
raptor_config | Optional[Dict] | Configuration for Raptor mode containing: - masters: List of master task configurations - Each master can have associated workers Defaults to None (Raptor mode disabled). | None |
Raises:
Type | Description |
---|---|
Exception | If RadicalPilot backend fails to initialize properly. |
SystemExit | If keyboard interrupt or system exit occurs during setup, with session path information for debugging. |
Note
- Automatically registers backend states with the global StateMapper
- Prints status messages for successful initialization or failures
- Session UID is generated using radical.utils for uniqueness
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
|
__await__ ¶
__await__()
Make RadicalExecutionBackend awaitable.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
142 143 144 |
|
get_task_states_map ¶
get_task_states_map()
Get the state mapper for this backend.
Returns:
Name | Type | Description |
---|---|---|
StateMapper | StateMapper instance configured for RadicalPilot backend with appropriate state mappings (DONE, FAILED, CANCELED, AGENT_EXECUTING). |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
194 195 196 197 198 199 200 201 |
|
setup_raptor_mode ¶
setup_raptor_mode(raptor_config)
Set up Raptor mode by configuring and submitting master and worker tasks.
Initializes Raptor mode by creating master tasks and their associated worker tasks based on the provided configuration. Masters coordinate work distribution while workers execute the actual computations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raptor_config | Dict | Configuration dictionary with the following structure: { 'masters': [ { 'executable': str, # Path to master executable 'arguments': list, # Arguments for master 'ranks': int, # Number of CPU processes 'workers': [ # Worker configurations { 'executable': str, # Worker executable path 'arguments': list, # Worker arguments 'ranks': int, # Worker CPU processes 'worker_type': str # Optional worker class }, ... ] }, ... ] } | required |
Raises:
Type | Description |
---|---|
Exception | If task description creation or submission fails. |
Note
- Creates unique UIDs for masters and workers using session namespace
- Sets up master selector for load balancing across masters
- Workers default to 'DefaultWorker' class if not specified
- All master and worker tasks are stored in respective class attributes
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
|
select_master ¶
select_master()
Create a generator for load balancing task submission across masters.
Provides a round-robin generator that cycles through available master UIDs to distribute tasks evenly across all masters in Raptor mode.
Returns:
Type | Description |
---|---|
Generator[str]: Generator yielding master UIDs in round-robin fashion. |
Raises:
Type | Description |
---|---|
RuntimeError | If Raptor mode is not enabled or no masters are available. |
Example
:: selector = backend.select_master() master_uid = next(selector) # Get next master for task assignment
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
|
register_callback ¶
register_callback(func)
Register a callback function for task state changes.
Sets up a callback mechanism that handles task state transitions, with special handling for service tasks that require additional readiness confirmation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func | Callable | Callback function that will be invoked on task state changes. Should accept parameters: (task, state, service_callback=None). | required |
Note
- Service tasks in AGENT_EXECUTING state get special service_ready_callback
- All other tasks use the standard callback mechanism
- The callback is registered with the underlying task manager
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
|
build_task ¶
build_task(uid, task_desc, task_backend_specific_kwargs) -> TaskDescription
Build a RadicalPilot task description from workflow task parameters.
Converts a workflow task description into a RadicalPilot TaskDescription, handling different task modes (executable, function, service) and applying appropriate configurations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uid | str | Unique identifier for the task. | required |
task_desc | Dict | Task description containing: - executable: Path to executable (for executable tasks) - function: Python function (for function tasks) - args: Function arguments - kwargs: Function keyword arguments - is_service: Boolean indicating service task | required |
task_backend_specific_kwargs | Dict | RadicalPilot-specific parameters for the task description. | required |
Returns:
Type | Description |
---|---|
TaskDescription | rp.TaskDescription: Configured RadicalPilot task description, or None if task creation failed. |
Note
- Function tasks require Raptor mode to be enabled
- Service tasks cannot be Python functions
- Failed tasks trigger callback with FAILED state
- Raptor tasks are assigned to masters via load balancing
Example
:: task_desc = { 'executable': '/bin/echo', 'args': ['Hello World'], 'is_service': False } rp_task = backend.build_task('task_001', task_desc, {})
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
|
link_explicit_data_deps ¶
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)
Link explicit data dependencies between tasks or from external sources.
Creates data staging entries to establish explicit dependencies where files are transferred or linked from source to destination tasks. Supports both task-to-task dependencies and external file staging.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src_task | Optional[Dict] | Source task dictionary containing the file. None when staging from external path. | None |
dst_task | Dict | Destination task dictionary that will receive the file. Must contain 'task_backend_specific_kwargs' key. | None |
file_name | Optional[str] | Name of the file to stage. Defaults to: - src_task UID if staging from task - basename of file_path if staging from external path | None |
file_path | Optional[str] | External file path to stage (alternative to task-sourced files). | None |
Returns:
Name | Type | Description |
---|---|---|
Dict | The data dependency dictionary that was added to input staging. |
Raises:
Type | Description |
---|---|
ValueError | If neither file_name nor file_path is provided, or if src_task is missing when file_path is not specified. |
Note
- External files use TRANSFER action
- Task-to-task dependencies use LINK action
- Files are staged to task:/// namespace in destination
- Input staging list is created if it doesn't exist
Example
:: # Link output from task1 to task2 backend.link_explicit_data_deps( src_task={'uid': 'task1'}, dst_task={'task_backend_specific_kwargs': {}}, file_name='output.dat' )
1 2 3 4 5 |
|
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 |
|
link_implicit_data_deps ¶
link_implicit_data_deps(src_task, dst_task)
Add implicit data dependencies through symbolic links in task sandboxes.
Creates pre-execution commands that establish symbolic links from the source task's sandbox to the destination task's sandbox, simulating implicit data dependencies without explicit file specifications.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src_task | Dict | Source task dictionary containing 'uid' key. | required |
dst_task | Dict | Destination task dictionary with 'task_backend_specific_kwargs'. | required |
Note
- Links all files from source sandbox except the task UID file itself
- Uses environment variables for source task identification
- Commands are added to the destination task's pre_exec list
- Symbolic links are created in the destination task's sandbox
Implementation Details
- Sets SRC_TASK_ID environment variable
- Sets SRC_TASK_SANDBOX path variable
- Creates symbolic links for all files except the task ID file
Example
:: src_task = {'uid': 'producer_task'} dst_task = {'task_backend_specific_kwargs': {}} backend.link_implicit_data_deps(src_task, dst_task)
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 |
|
submit_tasks async
¶
submit_tasks(tasks: list)
Submit a list of tasks for execution.
Processes a list of workflow tasks, builds RadicalPilot task descriptions, and submits them to the task manager for execution. Handles task building failures gracefully by skipping invalid tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks | list | List of task dictionaries, each containing: - uid: Unique task identifier - task_backend_specific_kwargs: RadicalPilot-specific parameters - Other task description fields | required |
Returns:
Type | Description |
---|---|
The result of task_manager.submit_tasks() with successfully built tasks. |
Note
- Failed task builds are skipped (build_task returns None)
- Only successfully built tasks are submitted to the task manager
- Task building includes validation and error handling
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 |
|
cancel_task async
¶
cancel_task(uid: str) -> bool
Cancel a task in the execution backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uid | str | The UID of the task to cancel. | required |
Returns:
Name | Type | Description |
---|---|---|
bool | bool | True if the task was found and cancellation was attempted, False otherwise. |
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
567 568 569 570 571 572 573 574 575 576 577 578 579 580 |
|
get_nodelist ¶
get_nodelist()
Get information about allocated compute nodes.
Retrieves the nodelist from the active resource pilot, providing details about the compute nodes allocated for task execution.
Returns:
Type | Description |
---|---|
rp.NodeList: NodeList object containing information about allocated nodes. Each node in nodelist.nodes is of type rp.NodeResource. Returns None if the pilot is not in PMGR_ACTIVE state. |
Note
- Only returns nodelist when pilot is in active state
- Nodelist provides detailed resource information for each node
- Useful for resource-aware task scheduling and monitoring
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 |
|
state ¶
state()
Retrieve the current state of the resource pilot.
Returns:
Type | Description |
---|---|
The current state of the resource pilot. |
Note
This method is currently not implemented and serves as a placeholder.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
604 605 606 607 608 609 610 611 612 613 |
|
task_state_cb ¶
task_state_cb(task, state)
Callback function for handling task state changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | The task object whose state has changed. | required | |
state | The new state of the task. | required |
Note
This method is currently not implemented and serves as a placeholder for custom task state change handling.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
615 616 617 618 619 620 621 622 623 624 625 626 |
|
shutdown async
¶
shutdown() -> None
Gracefully shutdown the backend and clean up resources.
Closes the RadicalPilot session with data download, ensuring proper cleanup of all resources including pilots, tasks, and session data.
Note
- Downloads session data before closing
- Ensures graceful termination of all backend resources
- Prints confirmation message when shutdown is triggered
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
628 629 630 631 632 633 634 635 636 637 638 639 640 |
|
__aenter__ async
¶
__aenter__()
Async context manager entry.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
642 643 644 645 646 |
|
__aexit__ async
¶
__aexit__(exc_type, exc_val, exc_tb)
Async context manager exit.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
648 649 650 |
|
create async
classmethod
¶
create(resources: Dict, raptor_config: Optional[Dict] = None)
Alternative factory method for creating initialized backend.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
652 653 654 655 656 |
|
service_ready_callback ¶
service_ready_callback(future, task, state)
Callback function for handling service task readiness.
This callback is specifically designed for service tasks that need to wait for additional information before being considered ready. It runs the wait_info() call in a separate daemon thread to avoid blocking the main execution flow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
future | Future | The future object to set the result or exception on. | required |
task | The task object that has the wait_info() method. | required | |
state | The current state of the task (unused in this callback). | required |
Note
The wait_info() call is synchronous and potentially blocking, so it's executed in a daemon thread. The future will be set with either the result or an exception based on the outcome.
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
|