Skip to content

Backend API Reference

API documentation for RHAPSODY execution backends.

Base Backend Interface

rhapsody.backends.base.BaseExecutionBackend

Bases: ABC

Abstract base class for execution backends that manage task execution and state.

This class defines the interface for execution backends that handle task submission, state management, and dependency linking in a distributed or parallel execution environment.

build_task(task) abstractmethod

Build or prepare a task for execution.

Parameters:

Name Type Description Default
task dict

Dictionary containing task definition, parameters, and metadata required for task construction.

required

cancel_task(uid) abstractmethod async

Cancel a task in the execution backend.

Parameters:

Name Type Description Default
uid str

Task identifier

required

Raises:

Type Description
NotImplementedError

If the backend doesn't support cancellation

get_task_states_map() abstractmethod

Retrieve a mapping of task IDs to their current states.

Returns:

Type Description
Any

A dictionary mapping task identifiers to their current execution states.

Link explicit data dependencies between tasks or files.

Creates explicit dependency relationships based on specified file names or paths, allowing for more precise control over task execution order.

Parameters:

Name Type Description Default
src_task dict[str, Any] | None

The source task that produces the dependency.

None
dst_task dict[str, Any] | None

The destination task that depends on the source.

None
file_name str | None

Name of the file that represents the dependency.

None
file_path str | None

Full path to the file that represents the dependency.

None

Link implicit data dependencies between two tasks.

Creates a dependency relationship where the destination task depends on data produced by the source task, with the dependency being inferred automatically.

Parameters:

Name Type Description Default
src_task dict[str, Any]

The source task that produces data.

required
dst_task dict[str, Any]

The destination task that depends on the source task's output.

required

register_callback(func) abstractmethod

Register a callback function for task state changes.

Parameters:

Name Type Description Default
func Callable[[dict[str, Any], str], None]

A callable that will be invoked when task states change. The function should accept task and state parameters.

required

shutdown() abstractmethod async

Gracefully shutdown the execution backend.

This method should clean up resources, terminate running tasks if necessary, and prepare the backend for termination.

state() abstractmethod

Get the current state of the execution backend.

Returns:

Type Description
str

A string representing the current state of the backend (e.g., 'running',

str

'idle', 'shutting_down', 'error').

submit_tasks(tasks) abstractmethod async

Submit a list of tasks for execution.

Parameters:

Name Type Description Default
tasks list[dict]

A list of dictionaries containing task definitions and metadata. Each task dictionary should contain the necessary information for task execution.

required

task_state_cb(task, state) abstractmethod

Callback function invoked when a task's state changes.

Parameters:

Name Type Description Default
task dict

Dictionary containing task information and metadata.

required
state str

The new state of the task (e.g., 'pending', 'running', 'completed', 'failed').

required

Dask Execution Backend

rhapsody.backends.execution.dask_parallel.DaskExecutionBackend(resources=None)

Bases: BaseExecutionBackend

An async-only Dask execution backend for distributed task execution.

Handles task submission, cancellation, and proper async event loop handling for distributed task execution using Dask. All functions must be async.

Usage

backend = await DaskExecutionBackend(resources)

or

async with DaskExecutionBackend(resources) as backend: await backend.submit_tasks(tasks)

Initialize the Dask execution backend (non-async setup only).

Parameters:

Name Type Description Default
resources dict | None

Dictionary of resource requirements for tasks. Contains configuration parameters for the Dask client initialization.

None

__aenter__() async

Async context manager entry.

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

__await__()

Make DaskExecutionBackend awaitable like Dask Client.

build_task(task) async

Build or prepare a task for execution.

Parameters:

Name Type Description Default
task dict

Dictionary containing task definition, parameters, and metadata required for task construction.

required

cancel_all_tasks() async

Cancel all currently running/pending tasks.

Returns:

Type Description
int

Number of tasks that were successfully cancelled

cancel_task(uid) async

Cancel a task by its UID.

Parameters:

Name Type Description Default
uid str

The UID of the task to cancel.

required

Returns:

Name Type Description
bool bool

True if the task was found and cancellation was attempted,

bool

False otherwise.

create(resources=None) async classmethod

Alternative factory method for creating initialized backend.

Parameters:

Name Type Description Default
resources dict | None

Configuration parameters for Dask client initialization.

None

Returns:

Type Description
DaskExecutionBackend

Fully initialized DaskExecutionBackend instance.

get_task_states_map()

Retrieve a mapping of task IDs to their current states.

Returns:

Name Type Description
StateMapper StateMapper

Object containing the mapping of task states for this backend.

Handle explicit data dependencies between tasks.

Parameters:

Name Type Description Default
src_task dict[str, Any] | None

The source task that produces the dependency.

None
dst_task dict[str, Any] | None

The destination task that depends on the source.

None
file_name str | None

Name of the file that represents the dependency.

None
file_path str | None

Full path to the file that represents the dependency.

None

Handle implicit data dependencies for a task.

Parameters:

Name Type Description Default
src_task dict[str, Any]

The source task that produces data.

required
dst_task dict[str, Any]

The destination task that depends on the source task's output.

required

register_callback(func)

Register a callback for task state changes.

Parameters:

Name Type Description Default
func Callable

Function to be called when task states change. Should accept task and state parameters.

required

shutdown() async

Shutdown the Dask client and clean up resources.

Closes the Dask client connection, clears task storage, and handles any cleanup exceptions gracefully.

state() async

Get the current state of the Dask execution backend.

Returns:

Type Description
str

Current state of the backend as a string.

submit_tasks(tasks) async

Submit async tasks to Dask cluster.

Processes a list of tasks and submits them to the Dask cluster for execution. Filters out future objects from arguments and validates that all functions are async coroutine functions.

Parameters:

Name Type Description Default
tasks list[dict[str, Any]]

List of task dictionaries containing: - uid: Unique task identifier - function: Async callable to execute - args: Positional arguments - kwargs: Keyword arguments - executable: Optional executable path (not supported) - task_backend_specific_kwargs: Backend-specific parameters

required
Note

Executable tasks are not supported and will result in FAILED state. Only async functions are supported - sync functions will result in FAILED state. Future objects are filtered out from arguments as they are not picklable.

task_state_cb(task, state) async

Callback function invoked when a task's state changes.

Parameters:

Name Type Description Default
task dict

Dictionary containing task information and metadata.

required
state str

The new state of the task.

required

RADICAL-Pilot Execution Backend

rhapsody.backends.execution.radical_pilot.RadicalExecutionBackend(resources, raptor_config=None)

Bases: BaseExecutionBackend

Radical Pilot-based execution backend for large-scale HPC task execution.

The RadicalExecutionBackend manages computing resources and task execution using the Radical Pilot framework. It interfaces with various resource management systems (SLURM, FLUX, etc.) on diverse HPC machines, providing capabilities for session management, task lifecycle control, and resource allocation.

This backend supports both traditional task execution and advanced features like Raptor mode for high-throughput computing scenarios. It handles pilot submission, task management, and provides data dependency linking mechanisms.

Attributes:

Name Type Description
session Session

Primary session for managing task execution context, uniquely identified by a generated ID.

task_manager TaskManager

Manages task lifecycle including submission, tracking, and completion within the session.

pilot_manager PilotManager

Coordinates computing resources (pilots) that are dynamically allocated based on resource requirements.

resource_pilot Pilot

Submitted computing resources configured according to the provided resource specifications.

tasks dict

Dictionary storing task descriptions indexed by UID.

raptor_mode bool

Flag indicating whether Raptor mode is enabled.

masters list

List of master tasks when Raptor mode is enabled.

workers list

List of worker tasks when Raptor mode is enabled.

master_selector callable

Generator for load balancing across masters.

_callback_func Callable

Registered callback function for task events.

Parameters:

Name Type Description Default
resources dict

Resource requirements for the pilot including CPU, GPU, and memory specifications.

required
raptor_config Optional[dict]

Configuration for enabling Raptor mode. Contains master and worker task specifications.

None

Raises:

Type Description
Exception

If session creation, pilot submission, or task manager setup fails.

SystemExit

If KeyboardInterrupt or SystemExit occurs during initialization.

Example

:: resources = { "resource": "local.localhost", "runtime": 30, "exit_on_error": True, "cores": 4 } backend = await RadicalExecutionBackend(resources)

# With Raptor mode
raptor_config = {
    "masters": [{
        "executable": "/path/to/master",
        "arguments": ["--config", "master.conf"],
        "ranks": 1,
        "workers": [{
            "executable": "/path/to/worker",
            "arguments": ["--mode", "compute"],
            "ranks": 4
        }]
    }]
}
backend = await RadicalExecutionBackend(resources, raptor_config)

Initialize the RadicalExecutionBackend with resources.

Creates a new Radical Pilot session, initializes task and pilot managers, submits pilots based on resource configuration, and optionally enables Raptor mode for high-throughput computing.

Parameters:

Name Type Description Default
resources Dict

Resource configuration for the Radical Pilot session. Must contain valid pilot description parameters such as: - resource: Target resource (e.g., "local.localhost") - runtime: Maximum runtime in minutes - cores: Number of CPU cores - gpus: Number of GPUs (optional)

required
raptor_config Optional[Dict]

Configuration for Raptor mode containing: - masters: List of master task configurations - Each master can have associated workers Defaults to None (Raptor mode disabled).

None

Raises:

Type Description
Exception

If RadicalPilot backend fails to initialize properly.

SystemExit

If keyboard interrupt or system exit occurs during setup, with session path information for debugging.

Note
  • Automatically registers backend states with the global StateMapper
  • logs status messages for successful initialization or failures
  • Session UID is generated using radical.utils for uniqueness

__aenter__() async

Async context manager entry.

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

__await__()

Make RadicalExecutionBackend awaitable.

build_task(uid, task_desc, task_backend_specific_kwargs)

Build a RadicalPilot task description from workflow task parameters.

Converts a workflow task description into a RadicalPilot TaskDescription, handling different task modes (executable, function, service) and applying appropriate configurations.

Parameters:

Name Type Description Default
uid str

Unique identifier for the task.

required
task_desc Dict

Task description containing: - executable: Path to executable (for executable tasks) - function: Python function (for function tasks) - args: Function arguments - kwargs: Function keyword arguments - is_service: Boolean indicating service task

required
task_backend_specific_kwargs Dict

RadicalPilot-specific parameters for the task description.

required

Returns:

Type Description
object | None

rp.TaskDescription: Configured RadicalPilot task description, or None if task creation failed.

Note
  • Function tasks require Raptor mode to be enabled
  • Service tasks cannot be Python functions
  • Failed tasks trigger callback with FAILED state
  • Raptor tasks are assigned to masters via load balancing
Example

:: task_desc = { 'executable': '/bin/echo', 'args': ['Hello World'], 'is_service': False } rp_task = backend.build_task('task_001', task_desc, {})

cancel_task(uid) async

Cancel a task.

Parameters:

Name Type Description Default
uid str

Task UID to cancel.

required

Returns:

Type Description
bool

True if task found and cancellation attempted, False otherwise.

create(resources, raptor_config=None) async classmethod

Create initialized backend.

Parameters:

Name Type Description Default
resources dict

Radical Pilot configuration.

required
raptor_config dict | None

Optional Raptor mode configuration.

None

Returns:

Type Description
RadicalExecutionBackend

Initialized RadicalExecutionBackend instance.

get_nodelist()

Get information about allocated compute nodes.

Retrieves the nodelist from the active resource pilot, providing details about the compute nodes allocated for task execution.

Returns:

Type Description
object | None

rp.NodeList: NodeList object containing information about allocated nodes. Each node in nodelist.nodes is of type rp.NodeResource. Returns None if the pilot is not in PMGR_ACTIVE state.

Note
  • Only returns nodelist when pilot is in active state
  • Nodelist provides detailed resource information for each node
  • Useful for resource-aware task scheduling and monitoring

get_task_states_map()

Get the state mapper for this backend.

Returns:

Name Type Description
StateMapper StateMapper

StateMapper instance configured for RadicalPilot backend with appropriate state mappings (DONE, FAILED, CANCELED, AGENT_EXECUTING).

Link explicit data dependencies between tasks or from external sources.

Creates data staging entries to establish explicit dependencies where files are transferred or linked from source to destination tasks. Supports both task-to-task dependencies and external file staging.

Parameters:

Name Type Description Default
src_task Optional[Dict]

Source task dictionary containing the file. None when staging from external path.

None
dst_task Dict

Destination task dictionary that will receive the file. Must contain 'task_backend_specific_kwargs' key.

None
file_name Optional[str]

Name of the file to stage. Defaults to: - src_task UID if staging from task - basename of file_path if staging from external path

None
file_path Optional[str]

External file path to stage (alternative to task-sourced files).

None

Returns:

Name Type Description
Dict dict

The data dependency dictionary that was added to input staging.

Raises:

Type Description
ValueError

If neither file_name nor file_path is provided, or if src_task is missing when file_path is not specified.

Note
  • External files use TRANSFER action
  • Task-to-task dependencies use LINK action
  • Files are staged to task:/// namespace in destination
  • Input staging list is created if it doesn't exist
Example

:: # Link output from task1 to task2 backend.link_explicit_data_deps( src_task={'uid': 'task1'}, dst_task={'task_backend_specific_kwargs': {}}, file_name='output.dat' )

# Stage external file
backend.link_explicit_data_deps(
    dst_task={'task_backend_specific_kwargs': {}},
    file_path='/path/to/input.txt'
)

Add implicit data dependencies through symbolic links in task sandboxes.

Creates pre-execution commands that establish symbolic links from the source task's sandbox to the destination task's sandbox, simulating implicit data dependencies without explicit file specifications.

Parameters:

Name Type Description Default
src_task Dict

Source task dictionary containing 'uid' key.

required
dst_task Dict

Destination task dictionary with 'task_backend_specific_kwargs' for pre_exec commands.

required
Note
  • Links all files from source sandbox except the task UID file itself
  • Uses environment variables for source task identification
  • Commands are added to the destination task's pre_exec list
  • Symbolic links are created in the destination task's sandbox
Implementation Details
  1. Sets SRC_TASK_ID environment variable
  2. Sets SRC_TASK_SANDBOX path variable
  3. Creates symbolic links for all files except the task ID file
Example

:: src_task = {'uid': 'producer_task'} dst_task = {'task_backend_specific_kwargs': {}} backend.link_implicit_data_deps(src_task, dst_task)

register_callback(func)

Register a callback function for task state changes.

Sets up a callback mechanism that handles task state transitions, with special handling for service tasks that require additional readiness confirmation.

Parameters:

Name Type Description Default
func Callable

Callback function that will be invoked on task state changes. Should accept parameters: (task, state, service_callback=None).

required
Note
  • Service tasks in AGENT_EXECUTING state get special service_ready_callback
  • All other tasks use the standard callback mechanism
  • The callback is registered with the underlying task manager

select_master()

Create a generator for load balancing task submission across masters.

Provides a round-robin generator that cycles through available master UIDs to distribute tasks evenly across all masters in Raptor mode.

Returns:

Type Description
None

Generator[str]: Generator yielding master UIDs in round-robin fashion.

Raises:

Type Description
RuntimeError

If Raptor mode is not enabled or no masters are available.

Example

:: selector = backend.select_master() master_uid = next(selector) # Get next master for task assignment

setup_raptor_mode(raptor_config)

Set up Raptor mode by configuring and submitting master and worker tasks.

Initializes Raptor mode by creating master tasks and their associated worker tasks based on the provided configuration. Masters coordinate work distribution while workers execute the actual computations.

Parameters:

Name Type Description Default
raptor_config Dict

Configuration dictionary with the following structure: { 'masters': [ { 'executable': str, # Path to master executable 'arguments': list, # Arguments for master 'ranks': int, # Number of CPU processes 'workers': [ # Worker configurations { 'executable': str, # Worker executable path 'arguments': list, # Worker arguments 'ranks': int, # Worker CPU processes 'worker_type': str # Optional worker class }, ... ] }, ... ] }

required

Raises:

Type Description
Exception

If task description creation or submission fails.

Note
  • Creates unique UIDs for masters and workers using session namespace
  • Sets up master selector for load balancing across masters
  • Workers default to 'DefaultWorker' class if not specified
  • All master and worker tasks are stored in respective class attributes

shutdown() async

Gracefully shutdown the backend and clean up resources.

Closes the RadicalPilot session with data download, ensuring proper cleanup of all resources including pilots, tasks, and session data.

Note
  • Downloads session data before closing
  • Ensures graceful termination of all backend resources
  • Prints confirmation message when shutdown is triggered

state()

Retrieve resource pilot state.

submit_tasks(tasks) async

Submit a list of tasks for execution.

Processes a list of workflow tasks, builds RadicalPilot task descriptions, and submits them to the task manager for execution. Handles task building failures gracefully by skipping invalid tasks.

Parameters:

Name Type Description Default
tasks list

List of task dictionaries, each containing: - uid: Unique task identifier - task_backend_specific_kwargs: RadicalPilot-specific parameters - Other task description fields

required
Note
  • Failed task builds are skipped (build_task returns None)
  • Only successfully built tasks are submitted to the task manager
  • Task building includes validation and error handling

task_state_cb(task, state)

Handle task state changes.

Backend Discovery

rhapsody.backends.discovery.discover_backends()

Discover which backends are available based on optional dependencies.

Returns:

Type Description
dict[str, bool]

Dictionary mapping backend names to availability status

rhapsody.backends.discovery.get_backend(backend_name, *args, **kwargs)

Factory function to create backend instances.

Parameters:

Name Type Description Default
backend_name str

Name of the backend to create

required
*args Any

Positional arguments for backend constructor

()
**kwargs Any

Keyword arguments for backend constructor

{}

Returns:

Type Description
BaseExecutionBackend

Backend instance (may need to be awaited for async backends)

Example

Dask backend

backend = get_backend('dask', resources={'threads': 4})

RADICAL-Pilot backend

backend = get_backend('radical_pilot')

Backend Registry

rhapsody.backends.discovery.BackendRegistry

Registry for managing available execution backends.

get_backend_class(backend_name) classmethod

Get backend class by name.

Parameters:

Name Type Description Default
backend_name str

Name of the backend to retrieve

required

Returns:

Type Description
type[BaseExecutionBackend]

Backend class type

Raises:

Type Description
ValueError

If backend is not registered

ImportError

If backend module cannot be imported

list_backends() classmethod

List all registered backend names.

register_backend(name, import_path) classmethod

Register a new backend.

Parameters:

Name Type Description Default
name str

Name of the backend

required
import_path str

Full import path to the backend class

required