Skip to content

Core API

This section documents the core RHAPSODY APIs and functions.

Main Functions

rhapsody.get_backend(backend_name, *args, **kwargs)

Factory function to create backend instances.

Parameters:

Name Type Description Default
backend_name str

Name of the backend to create

required
*args Any

Positional arguments for backend constructor

()
**kwargs Any

Keyword arguments for backend constructor

{}

Returns:

Type Description
BaseExecutionBackend

Backend instance (may need to be awaited for async backends)

Example

Dask backend

backend = get_backend('dask', resources={'threads': 4})

RADICAL-Pilot backend

backend = get_backend('radical_pilot')

Source code in src/rhapsody/backends/discovery.py
def get_backend(backend_name: str, *args: Any, **kwargs: Any) -> BaseExecutionBackend:
    """Factory function to create backend instances.

    Args:
        backend_name: Name of the backend to create
        *args: Positional arguments for backend constructor
        **kwargs: Keyword arguments for backend constructor

    Returns:
        Backend instance (may need to be awaited for async backends)

    Example:
        # Dask backend
        backend = get_backend('dask', resources={'threads': 4})

        # RADICAL-Pilot backend
        backend = get_backend('radical_pilot')
    """
    backend_class = BackendRegistry.get_backend_class(backend_name)
    return backend_class(*args, **kwargs)

rhapsody.discover_backends()

Discover which backends are available based on optional dependencies.

Returns:

Type Description
dict[str, bool]

Dictionary mapping backend names to availability status

Source code in src/rhapsody/backends/discovery.py
def discover_backends() -> dict[str, bool]:
    """Discover which backends are available based on optional dependencies.

    Returns:
        Dictionary mapping backend names to availability status
    """
    availability = {}

    for backend_name in BackendRegistry.list_backends():
        try:
            # Try to import the backend class to check if dependencies are available
            backend_class = BackendRegistry.get_backend_class(backend_name)

            # Try a minimal instantiation to check runtime dependencies
            # This is a simple test - we catch any exception that might occur
            if backend_name == "radical_pilot":
                # Radical pilot requires resources parameter
                test_resources = {
                    "resource": "local.localhost",
                    "runtime": 1,
                    "cores": 1,
                }
                backend_class(test_resources)  # type: ignore[call-arg]
            else:
                # Other backends like dask
                backend_class()  # type: ignore[call-arg]

            availability[backend_name] = True

        except ImportError:
            # Import error means dependencies are not available
            availability[backend_name] = False
        except Exception as e:
            # For now, let's be more permissive and only fail on ImportError
            # Other exceptions might be configuration issues but backend could work
            # TODO: Investigate specific exceptions and handle them appropriately
            print(f"Warning: Backend {backend_name} had non-import error: {e}")
            availability[backend_name] = True

    return availability

Backend Registry

rhapsody.BackendRegistry

Registry for managing available execution backends.

get_backend_class(backend_name) classmethod

Get backend class by name.

Parameters:

Name Type Description Default
backend_name str

Name of the backend to retrieve

required

Returns:

Type Description
type[BaseExecutionBackend]

Backend class type

Raises:

Type Description
ValueError

If backend is not registered

ImportError

If backend module cannot be imported

Source code in src/rhapsody/backends/discovery.py
@classmethod
def get_backend_class(cls, backend_name: str) -> type[BaseExecutionBackend]:
    """Get backend class by name.

    Args:
        backend_name: Name of the backend to retrieve

    Returns:
        Backend class type

    Raises:
        ValueError: If backend is not registered
        ImportError: If backend module cannot be imported
    """
    if backend_name not in cls._backends:
        available = list(cls._backends.keys())
        raise ValueError(f"Backend '{backend_name}' not found. Available: {available}")

    module_path = cls._backends[backend_name]
    module_name, class_name = module_path.rsplit(".", 1)

    try:
        module = importlib.import_module(module_name)
        backend_class = getattr(module, class_name)
        return backend_class
    except ImportError as e:
        raise ImportError(f"Failed to import backend '{backend_name}': {e}") from e

list_backends() classmethod

List all registered backend names.

Source code in src/rhapsody/backends/discovery.py
@classmethod
def list_backends(cls) -> list[str]:
    """List all registered backend names."""
    return list(cls._backends.keys())

register_backend(name, import_path) classmethod

Register a new backend.

Parameters:

Name Type Description Default
name str

Name of the backend

required
import_path str

Full import path to the backend class

required
Source code in src/rhapsody/backends/discovery.py
@classmethod
def register_backend(cls, name: str, import_path: str) -> None:
    """Register a new backend.

    Args:
        name: Name of the backend
        import_path: Full import path to the backend class
    """
    cls._backends[name] = import_path
**Parameters:**

- `name` (str): Backend name identifier
- `backend_class` (type): Backend class implementing BaseBackend

**Example:**

```python
from rhapsody.backends.base import BaseBackend

class MyBackend(BaseBackend):
    # Implementation here
    pass

rhapsody.register_backend("my_backend", MyBackend)
backend = rhapsody.get_backend("my_backend")

list_backends()

List all available backend implementations.

def list_backends() -> List[str]

Returns:

  • List of backend names

Example:

backends = rhapsody.list_backends()
print(f"Available backends: {backends}")
# Output: ['concurrent', 'dask', 'radical_pilot']

Exception Classes

RhapsodyError

Base exception class for all RHAPSODY errors.

class RhapsodyError(Exception):
    """Base exception for RHAPSODY errors."""

    def __init__(self, message: str, details: Optional[Dict] = None):
        self.message = message
        self.details = details or {}
        super().__init__(message)

Attributes:

  • message (str): Error description
  • details (dict): Additional error context

BackendError

Exception raised for backend-specific errors.

class BackendError(RhapsodyError):
    """Exception for backend execution errors."""

    def __init__(self, message: str, backend_type: str, details: Optional[Dict] = None):
        self.backend_type = backend_type
        super().__init__(message, details)

Attributes:

  • backend_type (str): Backend that caused the error
  • Inherits from RhapsodyError

TaskError

Exception raised for task execution failures.

class TaskError(RhapsodyError):
    """Exception for task execution errors."""

    def __init__(self, message: str, task_uid: str, exit_code: Optional[int] = None, details: Optional[Dict] = None):
        self.task_uid = task_uid
        self.exit_code = exit_code
        super().__init__(message, details)

Attributes:

  • task_uid (str): Unique identifier of failed task
  • exit_code (int, optional): Process exit code
  • Inherits from RhapsodyError

ConfigurationError

Exception raised for configuration-related errors.

class ConfigurationError(RhapsodyError):
    """Exception for configuration errors."""

    def __init__(self, message: str, config_key: Optional[str] = None, details: Optional[Dict] = None):
        self.config_key = config_key
        super().__init__(message, details)

Attributes:

  • config_key (str, optional): Configuration key that caused error
  • Inherits from RhapsodyError

Usage Examples

Error Handling

import asyncio
import rhapsody
from rhapsody.exceptions import TaskError, BackendError, ConfigurationError

async def robust_workflow():
    try:
        backend = rhapsody.get_backend("concurrent")

        tasks = [
            {
                "uid": "test_task",
                "executable": "false"  # Always fails
            }
        ]

        await backend.submit_tasks(tasks)
        await backend.wait()

    except TaskError as e:
        print(f"Task {e.task_uid} failed with exit code {e.exit_code}")
        print(f"Error details: {e.details}")

    except BackendError as e:
        print(f"Backend '{e.backend_type}' error: {e.message}")

    except ConfigurationError as e:
        print(f"Configuration error in '{e.config_key}': {e.message}")

    except RhapsodyError as e:
        print(f"RHAPSODY error: {e.message}")

    except Exception as e:
        print(f"Unexpected error: {e}")

asyncio.run(robust_workflow())

Backend Discovery

import rhapsody

# List available backends
print("Available backends:")
for backend_name in rhapsody.list_backends():
    try:
        backend = rhapsody.get_backend(backend_name)
        print(f"  {backend_name}: {backend.__class__.__name__}")
    except Exception as e:
        print(f"  {backend_name}: Not available ({e})")

Custom Backend Registration

from rhapsody.backends.base import BaseBackend
from typing import List, Dict, Any

class LoggingBackend(BaseBackend):
    """Backend that logs all operations."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.submitted_tasks = []

    async def submit_tasks(self, tasks: List[Dict[str, Any]]) -> None:
        print(f"LoggingBackend: Submitting {len(tasks)} tasks")
        self.submitted_tasks.extend(tasks)
        # Actual submission logic here

    async def wait(self, timeout: Optional[float] = None) -> None:
        print("LoggingBackend: Waiting for tasks to complete")
        # Actual wait logic here

    async def terminate(self) -> None:
        print("LoggingBackend: Terminating")
        # Cleanup logic here

# Register the custom backend
rhapsody.register_backend("logging", LoggingBackend)

# Use the custom backend
backend = rhapsody.get_backend("logging")

Type Definitions

Common type aliases used throughout RHAPSODY:

from typing import Dict, List, Any, Optional, Callable, Union

# Task definition type
TaskDict = Dict[str, Any]

# Collection of tasks
TaskList = List[TaskDict]

# Callback function signature
CallbackFunction = Callable[[TaskDict, str], None]

# Resource specification
ResourceDict = Dict[str, Union[int, str, float]]

# Configuration dictionary
ConfigDict = Dict[str, Any]

Constants

Common constants used in RHAPSODY:

# Task states
class TaskState:
    NEW = "NEW"
    WAITING = "WAITING"
    READY = "READY"
    EXECUTING = "EXECUTING"
    COMPLETED = "COMPLETED"
    FAILED = "FAILED"
    RETRYING = "RETRYING"
    CANCELED = "CANCELED"

# Backend types
class BackendType:
    CONCURRENT = "concurrent"
    DASK = "dask"
    RADICAL_PILOT = "radical_pilot"

# Default values
class Defaults:
    BACKEND_TYPE = BackendType.CONCURRENT
    MAX_WORKERS = 4
    RETRY_ATTEMPTS = 0
    RETRY_DELAY = 1.0
    TIMEOUT = None

For more detailed API documentation, see the individual backend and component references.