Skip to content

radical.asyncflow.backends.execution.radical_pilot

RadicalExecutionBackend

RadicalExecutionBackend(resources: Dict, raptor_config: Optional[Dict] = None)

Bases: BaseExecutionBackend

Radical Pilot-based execution backend for large-scale HPC task execution.

The RadicalExecutionBackend manages computing resources and task execution using the Radical Pilot framework. It interfaces with various resource management systems (SLURM, FLUX, etc.) on diverse HPC machines, providing capabilities for session management, task lifecycle control, and resource allocation.

This backend supports both traditional task execution and advanced features like Raptor mode for high-throughput computing scenarios. It handles pilot submission, task management, and provides data dependency linking mechanisms.

Attributes:

Name Type Description
session Session

Primary session for managing task execution context, uniquely identified by a generated ID.

task_manager TaskManager

Manages task lifecycle including submission, tracking, and completion within the session.

pilot_manager PilotManager

Coordinates computing resources (pilots) that are dynamically allocated based on resource requirements.

resource_pilot Pilot

Submitted computing resources configured according to the provided resource specifications.

tasks Dict

Dictionary storing task descriptions indexed by UID.

raptor_mode bool

Flag indicating whether Raptor mode is enabled.

masters list

List of master tasks when Raptor mode is enabled.

workers list

List of worker tasks when Raptor mode is enabled.

master_selector callable

Generator for load balancing across masters.

_callback_func Callable

Registered callback function for task events.

Parameters:

Name Type Description Default
resources Dict

Resource requirements for the pilot including CPU, GPU, and memory specifications.

required
raptor_config Optional[Dict]

Configuration for enabling Raptor mode. Contains master and worker task specifications.

None

Raises:

Type Description
Exception

If session creation, pilot submission, or task manager setup fails.

SystemExit

If KeyboardInterrupt or SystemExit occurs during initialization.

Example

:: resources = { "resource": "local.localhost", "runtime": 30, "exit_on_error": True, "cores": 4 } backend = await RadicalExecutionBackend(resources)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# With Raptor mode
raptor_config = {
    "masters": [{
        "executable": "/path/to/master",
        "arguments": ["--config", "master.conf"],
        "ranks": 1,
        "workers": [{
            "executable": "/path/to/worker",
            "arguments": ["--mode", "compute"],
            "ranks": 4
        }]
    }]
}
backend = await RadicalExecutionBackend(resources, raptor_config)

Initialize the RadicalExecutionBackend with resources and optional Raptor config.

Creates a new Radical Pilot session, initializes task and pilot managers, submits pilots based on resource configuration, and optionally enables Raptor mode for high-throughput computing.

Parameters:

Name Type Description Default
resources Dict

Resource configuration for the Radical Pilot session. Must contain valid pilot description parameters such as: - resource: Target resource (e.g., "local.localhost") - runtime: Maximum runtime in minutes - cores: Number of CPU cores - gpus: Number of GPUs (optional)

required
raptor_config Optional[Dict]

Configuration for Raptor mode containing: - masters: List of master task configurations - Each master can have associated workers Defaults to None (Raptor mode disabled).

None

Raises:

Type Description
Exception

If RadicalPilot backend fails to initialize properly.

SystemExit

If keyboard interrupt or system exit occurs during setup, with session path information for debugging.

Note
  • Automatically registers backend states with the global StateMapper
  • Prints status messages for successful initialization or failures
  • Session UID is generated using radical.utils for uniqueness
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@typeguard.typechecked
def __init__(self, resources: Dict, raptor_config: Optional[Dict] = None) -> None:
    """Initialize the RadicalExecutionBackend with resources and optional Raptor config.

    Creates a new Radical Pilot session, initializes task and pilot managers,
    submits pilots based on resource configuration, and optionally enables
    Raptor mode for high-throughput computing.

    Args:
        resources (Dict): Resource configuration for the Radical Pilot session.
            Must contain valid pilot description parameters such as:
            - resource: Target resource (e.g., "local.localhost")
            - runtime: Maximum runtime in minutes
            - cores: Number of CPU cores
            - gpus: Number of GPUs (optional)
        raptor_config (Optional[Dict]): Configuration for Raptor mode containing:
            - masters: List of master task configurations
            - Each master can have associated workers
            Defaults to None (Raptor mode disabled).

    Raises:
        Exception: If RadicalPilot backend fails to initialize properly.
        SystemExit: If keyboard interrupt or system exit occurs during setup,
            with session path information for debugging.

    Note:
        - Automatically registers backend states with the global StateMapper
        - Prints status messages for successful initialization or failures
        - Session UID is generated using radical.utils for uniqueness
    """
    self.resources = resources
    self.raptor_config = raptor_config or {}
    self._initialized = False

__await__

__await__()

Make RadicalExecutionBackend awaitable.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
142
143
144
def __await__(self):
    """Make RadicalExecutionBackend awaitable."""
    return self._async_init().__await__()

get_task_states_map

get_task_states_map()

Get the state mapper for this backend.

Returns:

Name Type Description
StateMapper

StateMapper instance configured for RadicalPilot backend with appropriate state mappings (DONE, FAILED, CANCELED, AGENT_EXECUTING).

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
194
195
196
197
198
199
200
201
def get_task_states_map(self):
    """Get the state mapper for this backend.

    Returns:
        StateMapper: StateMapper instance configured for RadicalPilot backend
            with appropriate state mappings (DONE, FAILED, CANCELED, AGENT_EXECUTING).
    """
    return StateMapper(backend=self)

setup_raptor_mode

setup_raptor_mode(raptor_config)

Set up Raptor mode by configuring and submitting master and worker tasks.

Initializes Raptor mode by creating master tasks and their associated worker tasks based on the provided configuration. Masters coordinate work distribution while workers execute the actual computations.

Parameters:

Name Type Description Default
raptor_config Dict

Configuration dictionary with the following structure: { 'masters': [ { 'executable': str, # Path to master executable 'arguments': list, # Arguments for master 'ranks': int, # Number of CPU processes 'workers': [ # Worker configurations { 'executable': str, # Worker executable path 'arguments': list, # Worker arguments 'ranks': int, # Worker CPU processes 'worker_type': str # Optional worker class }, ... ] }, ... ] }

required

Raises:

Type Description
Exception

If task description creation or submission fails.

Note
  • Creates unique UIDs for masters and workers using session namespace
  • Sets up master selector for load balancing across masters
  • Workers default to 'DefaultWorker' class if not specified
  • All master and worker tasks are stored in respective class attributes
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def setup_raptor_mode(self, raptor_config):
    """Set up Raptor mode by configuring and submitting master and worker tasks.

    Initializes Raptor mode by creating master tasks and their associated
    worker tasks based on the provided configuration. Masters coordinate
    work distribution while workers execute the actual computations.

    Args:
        raptor_config (Dict): Configuration dictionary with the following structure:
            {
                'masters': [
                    {
                        'executable': str,  # Path to master executable
                        'arguments': list,  # Arguments for master
                        'ranks': int,       # Number of CPU processes
                        'workers': [        # Worker configurations
                            {
                                'executable': str,    # Worker executable path
                                'arguments': list,    # Worker arguments
                                'ranks': int,         # Worker CPU processes
                                'worker_type': str    # Optional worker class
                            },
                            ...
                        ]
                    },
                    ...
                ]
            }

    Raises:
        Exception: If task description creation or submission fails.

    Note:
        - Creates unique UIDs for masters and workers using session namespace
        - Sets up master selector for load balancing across masters
        - Workers default to 'DefaultWorker' class if not specified
        - All master and worker tasks are stored in respective class attributes
    """

    self.masters = []
    self.workers = []
    self.master_selector = self.select_master()

    cfg = copy.deepcopy(raptor_config)
    masters = cfg['masters']

    for master_description in masters:
        workers = master_description.pop('workers')

        md = rp.TaskDescription(master_description)
        md.uid = ru.generate_id('flow.master.%(item_counter)06d', ru.ID_CUSTOM,
                                 ns=self.session.uid)
        md.mode = rp.RAPTOR_MASTER
        master = self.resource_pilot.submit_raptors(md)[0]
        self.masters.append(master)

        for worker_description in workers:
            # Set default worker class and override if specified
            raptor_class = worker_description.pop('worker_type', 'DefaultWorker')

            # Create and configure worker
            worker = master.submit_workers(
                rp.TaskDescription({
                    **worker_description,
                    'raptor_id': md.uid,
                    'mode': rp.RAPTOR_WORKER,
                    'raptor_class': raptor_class,
                    'uid': ru.generate_id('flow.worker.%(item_counter)06d', 
                                          ru.ID_CUSTOM, ns=self.session.uid)}))

            self.workers.append(worker)

select_master

select_master()

Create a generator for load balancing task submission across masters.

Provides a round-robin generator that cycles through available master UIDs to distribute tasks evenly across all masters in Raptor mode.

Returns:

Type Description

Generator[str]: Generator yielding master UIDs in round-robin fashion.

Raises:

Type Description
RuntimeError

If Raptor mode is not enabled or no masters are available.

Example

:: selector = backend.select_master() master_uid = next(selector) # Get next master for task assignment

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def select_master(self):
    """Create a generator for load balancing task submission across masters.

    Provides a round-robin generator that cycles through available master
    UIDs to distribute tasks evenly across all masters in Raptor mode.

    Returns:
        Generator[str]: Generator yielding master UIDs in round-robin fashion.

    Raises:
        RuntimeError: If Raptor mode is not enabled or no masters are available.

    Example:
        ::
            selector = backend.select_master()
            master_uid = next(selector)  # Get next master for task assignment
    """
    if not self.raptor_mode or not self.masters:
        raise RuntimeError('Raptor mode is not enabled or no masters available')

    current_master = 0
    masters_uids = [m.uid for m in self.masters]

    while True:
        yield masters_uids[current_master]
        current_master = (current_master + 1) % len(self.masters)

register_callback

register_callback(func)

Register a callback function for task state changes.

Sets up a callback mechanism that handles task state transitions, with special handling for service tasks that require additional readiness confirmation.

Parameters:

Name Type Description Default
func Callable

Callback function that will be invoked on task state changes. Should accept parameters: (task, state, service_callback=None).

required
Note
  • Service tasks in AGENT_EXECUTING state get special service_ready_callback
  • All other tasks use the standard callback mechanism
  • The callback is registered with the underlying task manager
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def register_callback(self, func):
    """Register a callback function for task state changes.

    Sets up a callback mechanism that handles task state transitions,
    with special handling for service tasks that require additional
    readiness confirmation.

    Args:
        func (Callable): Callback function that will be invoked on task state changes.
            Should accept parameters: (task, state, service_callback=None).

    Note:
        - Service tasks in AGENT_EXECUTING state get special service_ready_callback
        - All other tasks use the standard callback mechanism
        - The callback is registered with the underlying task manager
    """

    self._callback_func = func

    def backend_callback(task, state):
        service_callback = None
        # Attach backend-specific done_callback for service tasks
        if task.mode == rp.TASK_SERVICE and state == rp.AGENT_EXECUTING:
            service_callback = service_ready_callback

        # Forward to workflow manager's standard callback
        func(task, state, service_callback=service_callback)

    self.task_manager.register_callback(backend_callback)

build_task

build_task(uid, task_desc, task_backend_specific_kwargs) -> TaskDescription

Build a RadicalPilot task description from workflow task parameters.

Converts a workflow task description into a RadicalPilot TaskDescription, handling different task modes (executable, function, service) and applying appropriate configurations.

Parameters:

Name Type Description Default
uid str

Unique identifier for the task.

required
task_desc Dict

Task description containing: - executable: Path to executable (for executable tasks) - function: Python function (for function tasks) - args: Function arguments - kwargs: Function keyword arguments - is_service: Boolean indicating service task

required
task_backend_specific_kwargs Dict

RadicalPilot-specific parameters for the task description.

required

Returns:

Type Description
TaskDescription

rp.TaskDescription: Configured RadicalPilot task description, or None if task creation failed.

Note
  • Function tasks require Raptor mode to be enabled
  • Service tasks cannot be Python functions
  • Failed tasks trigger callback with FAILED state
  • Raptor tasks are assigned to masters via load balancing
Example

:: task_desc = { 'executable': '/bin/echo', 'args': ['Hello World'], 'is_service': False } rp_task = backend.build_task('task_001', task_desc, {})

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def build_task(self, uid, task_desc, task_backend_specific_kwargs) -> rp.TaskDescription:
    """Build a RadicalPilot task description from workflow task parameters.

    Converts a workflow task description into a RadicalPilot TaskDescription,
    handling different task modes (executable, function, service) and applying
    appropriate configurations.

    Args:
        uid (str): Unique identifier for the task.
        task_desc (Dict): Task description containing:
            - executable: Path to executable (for executable tasks)
            - function: Python function (for function tasks)
            - args: Function arguments
            - kwargs: Function keyword arguments
            - is_service: Boolean indicating service task
        task_backend_specific_kwargs (Dict): RadicalPilot-specific parameters
            for the task description.

    Returns:
        rp.TaskDescription: Configured RadicalPilot task description, or None
            if task creation failed.

    Note:
        - Function tasks require Raptor mode to be enabled
        - Service tasks cannot be Python functions
        - Failed tasks trigger callback with FAILED state
        - Raptor tasks are assigned to masters via load balancing

    Example:
        ::
            task_desc = {
                'executable': '/bin/echo',
                'args': ['Hello World'],
                'is_service': False
            }
            rp_task = backend.build_task('task_001', task_desc, {})
    """

    is_service = task_desc.get('is_service', False)
    rp_task = rp.TaskDescription(from_dict=task_backend_specific_kwargs)
    rp_task.uid = uid

    if task_desc['executable']:
        rp_task.mode = rp.TASK_SERVICE if is_service else rp.TASK_EXECUTABLE
        rp_task.executable = task_desc['executable']
    elif task_desc['function']:
        if is_service:
            error_msg = 'RadicalExecutionBackend does not support function service tasks'
            rp_task['exception'] = ValueError(error_msg)
            self._callback_func(rp_task, rp.FAILED)
            return

        rp_task.mode = rp.TASK_FUNCTION
        rp_task.function = rp.PythonTask(task_desc['function'],
                                         task_desc['args'],
                                         task_desc['kwargs'])

    if rp_task.mode in [rp.TASK_FUNCTION, rp.TASK_EVAL,
                        rp.TASK_PROC, rp.TASK_METHOD]:
        if not self.raptor_mode:
            error_msg = f'Raptor mode is not enabled, cannot register {rp_task.mode}'
            rp_task['exception'] =  RuntimeError(error_msg)
            self._callback_func(rp_task, rp.FAILED)
            return

        rp_task.raptor_id = next(self.master_selector)

    self.tasks[uid] = rp_task

    return rp_task
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)

Link explicit data dependencies between tasks or from external sources.

Creates data staging entries to establish explicit dependencies where files are transferred or linked from source to destination tasks. Supports both task-to-task dependencies and external file staging.

Parameters:

Name Type Description Default
src_task Optional[Dict]

Source task dictionary containing the file. None when staging from external path.

None
dst_task Dict

Destination task dictionary that will receive the file. Must contain 'task_backend_specific_kwargs' key.

None
file_name Optional[str]

Name of the file to stage. Defaults to: - src_task UID if staging from task - basename of file_path if staging from external path

None
file_path Optional[str]

External file path to stage (alternative to task-sourced files).

None

Returns:

Name Type Description
Dict

The data dependency dictionary that was added to input staging.

Raises:

Type Description
ValueError

If neither file_name nor file_path is provided, or if src_task is missing when file_path is not specified.

Note
  • External files use TRANSFER action
  • Task-to-task dependencies use LINK action
  • Files are staged to task:/// namespace in destination
  • Input staging list is created if it doesn't exist
Example

:: # Link output from task1 to task2 backend.link_explicit_data_deps( src_task={'uid': 'task1'}, dst_task={'task_backend_specific_kwargs': {}}, file_name='output.dat' )

1
2
3
4
5
# Stage external file
backend.link_explicit_data_deps(
    dst_task={'task_backend_specific_kwargs': {}},
    file_path='/path/to/input.txt'
)
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def link_explicit_data_deps(self, src_task=None, dst_task=None, file_name=None, file_path=None):
    """Link explicit data dependencies between tasks or from external sources.

    Creates data staging entries to establish explicit dependencies where
    files are transferred or linked from source to destination tasks.
    Supports both task-to-task dependencies and external file staging.

    Args:
        src_task (Optional[Dict]): Source task dictionary containing the file.
            None when staging from external path.
        dst_task (Dict): Destination task dictionary that will receive the file.
            Must contain 'task_backend_specific_kwargs' key.
        file_name (Optional[str]): Name of the file to stage. Defaults to:
            - src_task UID if staging from task
            - basename of file_path if staging from external path
        file_path (Optional[str]): External file path to stage (alternative
            to task-sourced files).

    Returns:
        Dict: The data dependency dictionary that was added to input staging.

    Raises:
        ValueError: If neither file_name nor file_path is provided, or if
            src_task is missing when file_path is not specified.

    Note:
        - External files use TRANSFER action
        - Task-to-task dependencies use LINK action
        - Files are staged to task:/// namespace in destination
        - Input staging list is created if it doesn't exist

    Example:
        ::
            # Link output from task1 to task2
            backend.link_explicit_data_deps(
                src_task={'uid': 'task1'},
                dst_task={'task_backend_specific_kwargs': {}},
                file_name='output.dat'
            )

            # Stage external file
            backend.link_explicit_data_deps(
                dst_task={'task_backend_specific_kwargs': {}},
                file_path='/path/to/input.txt'
            )
    """
    if not file_name and not file_path:
        raise ValueError('Either file_name or file_path must be provided')

    dst_kwargs = dst_task['task_backend_specific_kwargs']

    # Determine the filename if not provided
    if not file_name:
        if file_path:
            file_name = file_path.split('/')[-1]  # Use basename from path
        elif src_task:
            file_name = src_task['uid']  # Fallback to task UID
        else:
            raise ValueError("Must provide either file_name, file_path, or src_task")

    # Create the appropriate data dependency
    if file_path:
        data_dep = {
            'source': file_path,
            'target': f"task:///{file_name}",
            'action': rp.TRANSFER}
    else:
        if not src_task:
            raise ValueError("src_task must be provided when file_path is not specified")
        data_dep = {
            'source': f"pilot:///{src_task['uid']}/{file_name}",
            'target': f"task:///{file_name}",
            'action': rp.LINK}

    # Add to input staging
    if 'input_staging' not in dst_kwargs:
        dst_kwargs['input_staging'] = [data_dep]
    else:
        dst_kwargs['input_staging'].append(data_dep)

    return data_dep
link_implicit_data_deps(src_task, dst_task)

Add implicit data dependencies through symbolic links in task sandboxes.

Creates pre-execution commands that establish symbolic links from the source task's sandbox to the destination task's sandbox, simulating implicit data dependencies without explicit file specifications.

Parameters:

Name Type Description Default
src_task Dict

Source task dictionary containing 'uid' key.

required
dst_task Dict

Destination task dictionary with 'task_backend_specific_kwargs'.

required
Note
  • Links all files from source sandbox except the task UID file itself
  • Uses environment variables for source task identification
  • Commands are added to the destination task's pre_exec list
  • Symbolic links are created in the destination task's sandbox
Implementation Details
  1. Sets SRC_TASK_ID environment variable
  2. Sets SRC_TASK_SANDBOX path variable
  3. Creates symbolic links for all files except the task ID file
Example

:: src_task = {'uid': 'producer_task'} dst_task = {'task_backend_specific_kwargs': {}} backend.link_implicit_data_deps(src_task, dst_task)

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def link_implicit_data_deps(self, src_task, dst_task):
    """Add implicit data dependencies through symbolic links in task sandboxes.

    Creates pre-execution commands that establish symbolic links from the
    source task's sandbox to the destination task's sandbox, simulating
    implicit data dependencies without explicit file specifications.

    Args:
        src_task (Dict): Source task dictionary containing 'uid' key.
        dst_task (Dict): Destination task dictionary with 'task_backend_specific_kwargs'.

    Note:
        - Links all files from source sandbox except the task UID file itself
        - Uses environment variables for source task identification
        - Commands are added to the destination task's pre_exec list
        - Symbolic links are created in the destination task's sandbox

    Implementation Details:
        1. Sets SRC_TASK_ID environment variable
        2. Sets SRC_TASK_SANDBOX path variable
        3. Creates symbolic links for all files except the task ID file

    Example:
        ::
            src_task = {'uid': 'producer_task'}
            dst_task = {'task_backend_specific_kwargs': {}}
            backend.link_implicit_data_deps(src_task, dst_task)
    """

    dst_kwargs = dst_task['task_backend_specific_kwargs']
    src_uid = src_task['uid']

    cmd1 = f'export SRC_TASK_ID={src_uid}'
    cmd2 = f'export SRC_TASK_SANDBOX="$RP_PILOT_SANDBOX/$SRC_TASK_ID"'

    cmd3 = '''files=$(cd "$SRC_TASK_SANDBOX" && ls | grep -ve "^$SRC_TASK_ID")
            for f in $files
            do 
                ln -sf "$SRC_TASK_SANDBOX/$f" "$RP_TASK_SANDBOX"
            done'''

    commands = [cmd1, cmd2, cmd3]

    if dst_kwargs.get('pre_exec'):
        dst_kwargs['pre_exec'].extend(commands)
    else:
        dst_kwargs['pre_exec'] = commands

submit_tasks async

submit_tasks(tasks: list)

Submit a list of tasks for execution.

Processes a list of workflow tasks, builds RadicalPilot task descriptions, and submits them to the task manager for execution. Handles task building failures gracefully by skipping invalid tasks.

Parameters:

Name Type Description Default
tasks list

List of task dictionaries, each containing: - uid: Unique task identifier - task_backend_specific_kwargs: RadicalPilot-specific parameters - Other task description fields

required

Returns:

Type Description

The result of task_manager.submit_tasks() with successfully built tasks.

Note
  • Failed task builds are skipped (build_task returns None)
  • Only successfully built tasks are submitted to the task manager
  • Task building includes validation and error handling
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
async def submit_tasks(self, tasks: list):
    """Submit a list of tasks for execution.

    Processes a list of workflow tasks, builds RadicalPilot task descriptions,
    and submits them to the task manager for execution. Handles task building
    failures gracefully by skipping invalid tasks.

    Args:
        tasks (list): List of task dictionaries, each containing:
            - uid: Unique task identifier
            - task_backend_specific_kwargs: RadicalPilot-specific parameters
            - Other task description fields

    Returns:
        The result of task_manager.submit_tasks() with successfully built tasks.

    Note:
        - Failed task builds are skipped (build_task returns None)
        - Only successfully built tasks are submitted to the task manager
        - Task building includes validation and error handling
    """

    _tasks = []
    for task in tasks:
        task_to_submit = self.build_task(task['uid'],
                      task, task['task_backend_specific_kwargs'])

        if not task_to_submit:
            continue

        _tasks.append(task_to_submit)

    return self.task_manager.submit_tasks(_tasks)

cancel_task async

cancel_task(uid: str) -> bool

Cancel a task in the execution backend.

Parameters:

Name Type Description Default
uid str

The UID of the task to cancel.

required

Returns:

Name Type Description
bool bool

True if the task was found and cancellation was attempted, False otherwise.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
async def cancel_task(self, uid: str) -> bool:
    """
    Cancel a task in the execution backend.

    Args:
        uid (str): The UID of the task to cancel.

    Returns:
        bool: True if the task was found and cancellation was attempted, False otherwise.
    """
    if uid in self.tasks:
        self.task_manager.cancel_tasks(uid)
        return True
    return False

get_nodelist

get_nodelist()

Get information about allocated compute nodes.

Retrieves the nodelist from the active resource pilot, providing details about the compute nodes allocated for task execution.

Returns:

Type Description

rp.NodeList: NodeList object containing information about allocated nodes. Each node in nodelist.nodes is of type rp.NodeResource. Returns None if the pilot is not in PMGR_ACTIVE state.

Note
  • Only returns nodelist when pilot is in active state
  • Nodelist provides detailed resource information for each node
  • Useful for resource-aware task scheduling and monitoring
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
def get_nodelist(self):
    """Get information about allocated compute nodes.

    Retrieves the nodelist from the active resource pilot, providing
    details about the compute nodes allocated for task execution.

    Returns:
        rp.NodeList: NodeList object containing information about allocated
            nodes. Each node in nodelist.nodes is of type rp.NodeResource.
            Returns None if the pilot is not in PMGR_ACTIVE state.

    Note:
        - Only returns nodelist when pilot is in active state
        - Nodelist provides detailed resource information for each node
        - Useful for resource-aware task scheduling and monitoring
    """
    nodelist = None
    if self.resource_pilot.state == rp.PMGR_ACTIVE:
        nodelist = self.resource_pilot.nodelist

    return nodelist

state

state()

Retrieve the current state of the resource pilot.

Returns:

Type Description

The current state of the resource pilot.

Note

This method is currently not implemented and serves as a placeholder.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
604
605
606
607
608
609
610
611
612
613
def state(self):
    """Retrieve the current state of the resource pilot.

    Returns:
        The current state of the resource pilot.

    Note:
        This method is currently not implemented and serves as a placeholder.
    """
    raise NotImplementedError

task_state_cb

task_state_cb(task, state)

Callback function for handling task state changes.

Parameters:

Name Type Description Default
task

The task object whose state has changed.

required
state

The new state of the task.

required
Note

This method is currently not implemented and serves as a placeholder for custom task state change handling.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
615
616
617
618
619
620
621
622
623
624
625
626
def task_state_cb(self, task, state):
    """Callback function for handling task state changes.

    Args:
        task: The task object whose state has changed.
        state: The new state of the task.

    Note:
        This method is currently not implemented and serves as a placeholder
        for custom task state change handling.
    """
    raise NotImplementedError

shutdown async

shutdown() -> None

Gracefully shutdown the backend and clean up resources.

Closes the RadicalPilot session with data download, ensuring proper cleanup of all resources including pilots, tasks, and session data.

Note
  • Downloads session data before closing
  • Ensures graceful termination of all backend resources
  • Prints confirmation message when shutdown is triggered
Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
628
629
630
631
632
633
634
635
636
637
638
639
640
async def shutdown(self) -> None:
    """Gracefully shutdown the backend and clean up resources.

    Closes the RadicalPilot session with data download, ensuring proper
    cleanup of all resources including pilots, tasks, and session data.

    Note:
        - Downloads session data before closing
        - Ensures graceful termination of all backend resources
        - Prints confirmation message when shutdown is triggered
    """
    print('Shutdown is triggered, terminating the resources gracefully')
    self.session.close(download=True)

__aenter__ async

__aenter__()

Async context manager entry.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
642
643
644
645
646
async def __aenter__(self):
    """Async context manager entry."""
    if not self._initialized:
        await self._async_init()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
648
649
650
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.shutdown()

create async classmethod

create(resources: Dict, raptor_config: Optional[Dict] = None)

Alternative factory method for creating initialized backend.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
652
653
654
655
656
@classmethod
async def create(cls, resources: Dict, raptor_config: Optional[Dict] = None):
    """Alternative factory method for creating initialized backend."""
    backend = cls(resources, raptor_config)
    return await backend

service_ready_callback

service_ready_callback(future, task, state)

Callback function for handling service task readiness.

This callback is specifically designed for service tasks that need to wait for additional information before being considered ready. It runs the wait_info() call in a separate daemon thread to avoid blocking the main execution flow.

Parameters:

Name Type Description Default
future Future

The future object to set the result or exception on.

required
task

The task object that has the wait_info() method.

required
state

The current state of the task (unused in this callback).

required
Note

The wait_info() call is synchronous and potentially blocking, so it's executed in a daemon thread. The future will be set with either the result or an exception based on the outcome.

Source code in doc_env/lib/python3.13/site-packages/radical/asyncflow/backends/execution/radical_pilot.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def service_ready_callback(future, task, state):
    """Callback function for handling service task readiness.

    This callback is specifically designed for service tasks that need to wait
    for additional information before being considered ready. It runs the
    wait_info() call in a separate daemon thread to avoid blocking the main
    execution flow.

    Args:
        future (Future): The future object to set the result or exception on.
        task: The task object that has the wait_info() method.
        state: The current state of the task (unused in this callback).

    Note:
        The wait_info() call is synchronous and potentially blocking, so it's
        executed in a daemon thread. The future will be set with either the
        result or an exception based on the outcome.
    """
    def wait_and_set():
        try:
            info = task.wait_info()  # synchronous call
            future.set_result(info)
        except Exception as e:
            future.set_exception(e)

    threading.Thread(target=wait_and_set, daemon=True).start()