Backend API Reference¶
API documentation for RHAPSODY execution backends.
Base Backend Interface¶
rhapsody.backends.base.BaseExecutionBackend
¶
Bases: ABC
Abstract base class for execution backends that manage task execution and state.
This class defines the interface for execution backends that handle task submission, state management, and dependency linking in a distributed or parallel execution environment.
build_task(task)
abstractmethod
¶
Build or prepare a task for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
dict
|
Dictionary containing task definition, parameters, and metadata required for task construction. |
required |
cancel_task(uid)
abstractmethod
async
¶
Cancel a task in the execution backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uid
|
str
|
Task identifier |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the backend doesn't support cancellation |
get_task_states_map()
abstractmethod
¶
Retrieve a mapping of task IDs to their current states.
Returns:
| Type | Description |
|---|---|
Any
|
A dictionary mapping task identifiers to their current execution states. |
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)
abstractmethod
¶
Link explicit data dependencies between tasks or files.
Creates explicit dependency relationships based on specified file names or paths, allowing for more precise control over task execution order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src_task
|
dict[str, Any] | None
|
The source task that produces the dependency. |
None
|
dst_task
|
dict[str, Any] | None
|
The destination task that depends on the source. |
None
|
file_name
|
str | None
|
Name of the file that represents the dependency. |
None
|
file_path
|
str | None
|
Full path to the file that represents the dependency. |
None
|
link_implicit_data_deps(src_task, dst_task)
abstractmethod
¶
Link implicit data dependencies between two tasks.
Creates a dependency relationship where the destination task depends on data produced by the source task, with the dependency being inferred automatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src_task
|
dict[str, Any]
|
The source task that produces data. |
required |
dst_task
|
dict[str, Any]
|
The destination task that depends on the source task's output. |
required |
register_callback(func)
abstractmethod
¶
Register a callback function for task state changes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[dict[str, Any], str], None]
|
A callable that will be invoked when task states change. The function should accept task and state parameters. |
required |
shutdown()
abstractmethod
async
¶
Gracefully shutdown the execution backend.
This method should clean up resources, terminate running tasks if necessary, and prepare the backend for termination.
state()
abstractmethod
¶
Get the current state of the execution backend.
Returns:
| Type | Description |
|---|---|
str
|
A string representing the current state of the backend (e.g., 'running', |
str
|
'idle', 'shutting_down', 'error'). |
submit_tasks(tasks)
abstractmethod
async
¶
Submit a list of tasks for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
list[dict]
|
A list of dictionaries containing task definitions and metadata. Each task dictionary should contain the necessary information for task execution. |
required |
task_state_cb(task, state)
abstractmethod
¶
Callback function invoked when a task's state changes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
dict
|
Dictionary containing task information and metadata. |
required |
state
|
str
|
The new state of the task (e.g., 'pending', 'running', 'completed', 'failed'). |
required |
Dask Execution Backend¶
rhapsody.backends.execution.dask_parallel.DaskExecutionBackend(resources=None)
¶
Bases: BaseExecutionBackend
An async-only Dask execution backend for distributed task execution.
Handles task submission, cancellation, and proper async event loop handling for distributed task execution using Dask. All functions must be async.
Usage
backend = await DaskExecutionBackend(resources)
or¶
async with DaskExecutionBackend(resources) as backend: await backend.submit_tasks(tasks)
Initialize the Dask execution backend (non-async setup only).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resources
|
dict | None
|
Dictionary of resource requirements for tasks. Contains configuration parameters for the Dask client initialization. |
None
|
__aenter__()
async
¶
Async context manager entry.
__aexit__(exc_type, exc_val, exc_tb)
async
¶
Async context manager exit.
__await__()
¶
Make DaskExecutionBackend awaitable like Dask Client.
build_task(task)
async
¶
Build or prepare a task for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
dict
|
Dictionary containing task definition, parameters, and metadata required for task construction. |
required |
cancel_all_tasks()
async
¶
Cancel all currently running/pending tasks.
Returns:
| Type | Description |
|---|---|
int
|
Number of tasks that were successfully cancelled |
cancel_task(uid)
async
¶
Cancel a task by its UID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uid
|
str
|
The UID of the task to cancel. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the task was found and cancellation was attempted, |
bool
|
False otherwise. |
create(resources=None)
async
classmethod
¶
Alternative factory method for creating initialized backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resources
|
dict | None
|
Configuration parameters for Dask client initialization. |
None
|
Returns:
| Type | Description |
|---|---|
DaskExecutionBackend
|
Fully initialized DaskExecutionBackend instance. |
get_task_states_map()
¶
Retrieve a mapping of task IDs to their current states.
Returns:
| Name | Type | Description |
|---|---|---|
StateMapper |
StateMapper
|
Object containing the mapping of task states for this backend. |
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)
¶
Handle explicit data dependencies between tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src_task
|
dict[str, Any] | None
|
The source task that produces the dependency. |
None
|
dst_task
|
dict[str, Any] | None
|
The destination task that depends on the source. |
None
|
file_name
|
str | None
|
Name of the file that represents the dependency. |
None
|
file_path
|
str | None
|
Full path to the file that represents the dependency. |
None
|
link_implicit_data_deps(src_task, dst_task)
¶
Handle implicit data dependencies for a task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src_task
|
dict[str, Any]
|
The source task that produces data. |
required |
dst_task
|
dict[str, Any]
|
The destination task that depends on the source task's output. |
required |
register_callback(func)
¶
Register a callback for task state changes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
Function to be called when task states change. Should accept task and state parameters. |
required |
shutdown()
async
¶
Shutdown the Dask client and clean up resources.
Closes the Dask client connection, clears task storage, and handles any cleanup exceptions gracefully.
state()
async
¶
Get the current state of the Dask execution backend.
Returns:
| Type | Description |
|---|---|
str
|
Current state of the backend as a string. |
submit_tasks(tasks)
async
¶
Submit async tasks to Dask cluster.
Processes a list of tasks and submits them to the Dask cluster for execution. Filters out future objects from arguments and validates that all functions are async coroutine functions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
list[dict[str, Any]]
|
List of task dictionaries containing: - uid: Unique task identifier - function: Async callable to execute - args: Positional arguments - kwargs: Keyword arguments - executable: Optional executable path (not supported) - task_backend_specific_kwargs: Backend-specific parameters |
required |
Note
Executable tasks are not supported and will result in FAILED state. Only async functions are supported - sync functions will result in FAILED state. Future objects are filtered out from arguments as they are not picklable.
task_state_cb(task, state)
async
¶
Callback function invoked when a task's state changes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
dict
|
Dictionary containing task information and metadata. |
required |
state
|
str
|
The new state of the task. |
required |
RADICAL-Pilot Execution Backend¶
rhapsody.backends.execution.radical_pilot.RadicalExecutionBackend(resources, raptor_config=None)
¶
Bases: BaseExecutionBackend
Radical Pilot-based execution backend for large-scale HPC task execution.
The RadicalExecutionBackend manages computing resources and task execution using the Radical Pilot framework. It interfaces with various resource management systems (SLURM, FLUX, etc.) on diverse HPC machines, providing capabilities for session management, task lifecycle control, and resource allocation.
This backend supports both traditional task execution and advanced features like Raptor mode for high-throughput computing scenarios. It handles pilot submission, task management, and provides data dependency linking mechanisms.
Attributes:
| Name | Type | Description |
|---|---|---|
session |
Session
|
Primary session for managing task execution context, uniquely identified by a generated ID. |
task_manager |
TaskManager
|
Manages task lifecycle including submission, tracking, and completion within the session. |
pilot_manager |
PilotManager
|
Coordinates computing resources (pilots) that are dynamically allocated based on resource requirements. |
resource_pilot |
Pilot
|
Submitted computing resources configured according to the provided resource specifications. |
tasks |
dict
|
Dictionary storing task descriptions indexed by UID. |
raptor_mode |
bool
|
Flag indicating whether Raptor mode is enabled. |
masters |
list
|
List of master tasks when Raptor mode is enabled. |
workers |
list
|
List of worker tasks when Raptor mode is enabled. |
master_selector |
callable
|
Generator for load balancing across masters. |
_callback_func |
Callable
|
Registered callback function for task events. |
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resources
|
dict
|
Resource requirements for the pilot including CPU, GPU, and memory specifications. |
required |
raptor_config
|
Optional[dict]
|
Configuration for enabling Raptor mode. Contains master and worker task specifications. |
None
|
Raises:
| Type | Description |
|---|---|
Exception
|
If session creation, pilot submission, or task manager setup fails. |
SystemExit
|
If KeyboardInterrupt or SystemExit occurs during initialization. |
Example
:: resources = { "resource": "local.localhost", "runtime": 30, "exit_on_error": True, "cores": 4 } backend = await RadicalExecutionBackend(resources)
# With Raptor mode
raptor_config = {
"masters": [{
"executable": "/path/to/master",
"arguments": ["--config", "master.conf"],
"ranks": 1,
"workers": [{
"executable": "/path/to/worker",
"arguments": ["--mode", "compute"],
"ranks": 4
}]
}]
}
backend = await RadicalExecutionBackend(resources, raptor_config)
Initialize the RadicalExecutionBackend with resources.
Creates a new Radical Pilot session, initializes task and pilot managers, submits pilots based on resource configuration, and optionally enables Raptor mode for high-throughput computing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resources
|
Dict
|
Resource configuration for the Radical Pilot session. Must contain valid pilot description parameters such as: - resource: Target resource (e.g., "local.localhost") - runtime: Maximum runtime in minutes - cores: Number of CPU cores - gpus: Number of GPUs (optional) |
required |
raptor_config
|
Optional[Dict]
|
Configuration for Raptor mode containing: - masters: List of master task configurations - Each master can have associated workers Defaults to None (Raptor mode disabled). |
None
|
Raises:
| Type | Description |
|---|---|
Exception
|
If RadicalPilot backend fails to initialize properly. |
SystemExit
|
If keyboard interrupt or system exit occurs during setup, with session path information for debugging. |
Note
- Automatically registers backend states with the global StateMapper
- logs status messages for successful initialization or failures
- Session UID is generated using radical.utils for uniqueness
__aenter__()
async
¶
Async context manager entry.
__aexit__(exc_type, exc_val, exc_tb)
async
¶
Async context manager exit.
__await__()
¶
Make RadicalExecutionBackend awaitable.
build_task(uid, task_desc, task_backend_specific_kwargs)
¶
Build a RadicalPilot task description from workflow task parameters.
Converts a workflow task description into a RadicalPilot TaskDescription, handling different task modes (executable, function, service) and applying appropriate configurations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uid
|
str
|
Unique identifier for the task. |
required |
task_desc
|
Dict
|
Task description containing: - executable: Path to executable (for executable tasks) - function: Python function (for function tasks) - args: Function arguments - kwargs: Function keyword arguments - is_service: Boolean indicating service task |
required |
task_backend_specific_kwargs
|
Dict
|
RadicalPilot-specific parameters for the task description. |
required |
Returns:
| Type | Description |
|---|---|
object | None
|
rp.TaskDescription: Configured RadicalPilot task description, or None if task creation failed. |
Note
- Function tasks require Raptor mode to be enabled
- Service tasks cannot be Python functions
- Failed tasks trigger callback with FAILED state
- Raptor tasks are assigned to masters via load balancing
Example
:: task_desc = { 'executable': '/bin/echo', 'args': ['Hello World'], 'is_service': False } rp_task = backend.build_task('task_001', task_desc, {})
cancel_task(uid)
async
¶
Cancel a task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uid
|
str
|
Task UID to cancel. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if task found and cancellation attempted, False otherwise. |
create(resources, raptor_config=None)
async
classmethod
¶
Create initialized backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resources
|
dict
|
Radical Pilot configuration. |
required |
raptor_config
|
dict | None
|
Optional Raptor mode configuration. |
None
|
Returns:
| Type | Description |
|---|---|
RadicalExecutionBackend
|
Initialized RadicalExecutionBackend instance. |
get_nodelist()
¶
Get information about allocated compute nodes.
Retrieves the nodelist from the active resource pilot, providing details about the compute nodes allocated for task execution.
Returns:
| Type | Description |
|---|---|
object | None
|
rp.NodeList: NodeList object containing information about allocated nodes. Each node in nodelist.nodes is of type rp.NodeResource. Returns None if the pilot is not in PMGR_ACTIVE state. |
Note
- Only returns nodelist when pilot is in active state
- Nodelist provides detailed resource information for each node
- Useful for resource-aware task scheduling and monitoring
get_task_states_map()
¶
Get the state mapper for this backend.
Returns:
| Name | Type | Description |
|---|---|---|
StateMapper |
StateMapper
|
StateMapper instance configured for RadicalPilot backend with appropriate state mappings (DONE, FAILED, CANCELED, AGENT_EXECUTING). |
link_explicit_data_deps(src_task=None, dst_task=None, file_name=None, file_path=None)
¶
Link explicit data dependencies between tasks or from external sources.
Creates data staging entries to establish explicit dependencies where files are transferred or linked from source to destination tasks. Supports both task-to-task dependencies and external file staging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src_task
|
Optional[Dict]
|
Source task dictionary containing the file. None when staging from external path. |
None
|
dst_task
|
Dict
|
Destination task dictionary that will receive the file. Must contain 'task_backend_specific_kwargs' key. |
None
|
file_name
|
Optional[str]
|
Name of the file to stage. Defaults to: - src_task UID if staging from task - basename of file_path if staging from external path |
None
|
file_path
|
Optional[str]
|
External file path to stage (alternative to task-sourced files). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Dict |
dict
|
The data dependency dictionary that was added to input staging. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither file_name nor file_path is provided, or if src_task is missing when file_path is not specified. |
Note
- External files use TRANSFER action
- Task-to-task dependencies use LINK action
- Files are staged to task:/// namespace in destination
- Input staging list is created if it doesn't exist
Example
:: # Link output from task1 to task2 backend.link_explicit_data_deps( src_task={'uid': 'task1'}, dst_task={'task_backend_specific_kwargs': {}}, file_name='output.dat' )
# Stage external file
backend.link_explicit_data_deps(
dst_task={'task_backend_specific_kwargs': {}},
file_path='/path/to/input.txt'
)
link_implicit_data_deps(src_task, dst_task)
¶
Add implicit data dependencies through symbolic links in task sandboxes.
Creates pre-execution commands that establish symbolic links from the source task's sandbox to the destination task's sandbox, simulating implicit data dependencies without explicit file specifications.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src_task
|
Dict
|
Source task dictionary containing 'uid' key. |
required |
dst_task
|
Dict
|
Destination task dictionary with 'task_backend_specific_kwargs' for pre_exec commands. |
required |
Note
- Links all files from source sandbox except the task UID file itself
- Uses environment variables for source task identification
- Commands are added to the destination task's pre_exec list
- Symbolic links are created in the destination task's sandbox
Implementation Details
- Sets SRC_TASK_ID environment variable
- Sets SRC_TASK_SANDBOX path variable
- Creates symbolic links for all files except the task ID file
Example
:: src_task = {'uid': 'producer_task'} dst_task = {'task_backend_specific_kwargs': {}} backend.link_implicit_data_deps(src_task, dst_task)
register_callback(func)
¶
Register a callback function for task state changes.
Sets up a callback mechanism that handles task state transitions, with special handling for service tasks that require additional readiness confirmation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
Callback function that will be invoked on task state changes. Should accept parameters: (task, state, service_callback=None). |
required |
Note
- Service tasks in AGENT_EXECUTING state get special service_ready_callback
- All other tasks use the standard callback mechanism
- The callback is registered with the underlying task manager
select_master()
¶
Create a generator for load balancing task submission across masters.
Provides a round-robin generator that cycles through available master UIDs to distribute tasks evenly across all masters in Raptor mode.
Returns:
| Type | Description |
|---|---|
None
|
Generator[str]: Generator yielding master UIDs in round-robin fashion. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If Raptor mode is not enabled or no masters are available. |
Example
:: selector = backend.select_master() master_uid = next(selector) # Get next master for task assignment
setup_raptor_mode(raptor_config)
¶
Set up Raptor mode by configuring and submitting master and worker tasks.
Initializes Raptor mode by creating master tasks and their associated worker tasks based on the provided configuration. Masters coordinate work distribution while workers execute the actual computations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raptor_config
|
Dict
|
Configuration dictionary with the following structure: { 'masters': [ { 'executable': str, # Path to master executable 'arguments': list, # Arguments for master 'ranks': int, # Number of CPU processes 'workers': [ # Worker configurations { 'executable': str, # Worker executable path 'arguments': list, # Worker arguments 'ranks': int, # Worker CPU processes 'worker_type': str # Optional worker class }, ... ] }, ... ] } |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If task description creation or submission fails. |
Note
- Creates unique UIDs for masters and workers using session namespace
- Sets up master selector for load balancing across masters
- Workers default to 'DefaultWorker' class if not specified
- All master and worker tasks are stored in respective class attributes
shutdown()
async
¶
Gracefully shutdown the backend and clean up resources.
Closes the RadicalPilot session with data download, ensuring proper cleanup of all resources including pilots, tasks, and session data.
Note
- Downloads session data before closing
- Ensures graceful termination of all backend resources
- Prints confirmation message when shutdown is triggered
state()
¶
Retrieve resource pilot state.
submit_tasks(tasks)
async
¶
Submit a list of tasks for execution.
Processes a list of workflow tasks, builds RadicalPilot task descriptions, and submits them to the task manager for execution. Handles task building failures gracefully by skipping invalid tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
list
|
List of task dictionaries, each containing: - uid: Unique task identifier - task_backend_specific_kwargs: RadicalPilot-specific parameters - Other task description fields |
required |
Note
- Failed task builds are skipped (build_task returns None)
- Only successfully built tasks are submitted to the task manager
- Task building includes validation and error handling
task_state_cb(task, state)
¶
Handle task state changes.
Backend Discovery¶
rhapsody.backends.discovery.discover_backends()
¶
Discover which backends are available based on optional dependencies.
Returns:
| Type | Description |
|---|---|
dict[str, bool]
|
Dictionary mapping backend names to availability status |
rhapsody.backends.discovery.get_backend(backend_name, *args, **kwargs)
¶
Factory function to create backend instances.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend_name
|
str
|
Name of the backend to create |
required |
*args
|
Any
|
Positional arguments for backend constructor |
()
|
**kwargs
|
Any
|
Keyword arguments for backend constructor |
{}
|
Returns:
| Type | Description |
|---|---|
BaseExecutionBackend
|
Backend instance (may need to be awaited for async backends) |
Backend Registry¶
rhapsody.backends.discovery.BackendRegistry
¶
Registry for managing available execution backends.
get_backend_class(backend_name)
classmethod
¶
Get backend class by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend_name
|
str
|
Name of the backend to retrieve |
required |
Returns:
| Type | Description |
|---|---|
type[BaseExecutionBackend]
|
Backend class type |
Raises:
| Type | Description |
|---|---|
ValueError
|
If backend is not registered |
ImportError
|
If backend module cannot be imported |
list_backends()
classmethod
¶
List all registered backend names.
register_backend(name, import_path)
classmethod
¶
Register a new backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the backend |
required |
import_path
|
str
|
Full import path to the backend class |
required |