Skip to content

rhapsody.backends.execution.concurrent

Concurrent distributed execution backend for parallel and distributed computing.

This module provides a backend that executes tasks on local or single node HPC resources.

ConcurrentExecutionBackend

ConcurrentExecutionBackend(executor: Executor = None, name: str = 'concurrent')

Bases: BaseBackend

Simple async-only concurrent execution backend.

Source code in src/rhapsody/backends/execution/concurrent.py
def __init__(self, executor: Executor = None, name: str = "concurrent"):
    super().__init__(name=name)

    self.logger = _get_logger()

    if not executor:
        executor = ThreadPoolExecutor()
        self.logger.info("No executor was provided. Falling back to default ThreadPoolExecutor")

    if not isinstance(executor, Executor):
        err = "Executor must be ThreadPoolExecutor or ProcessPoolExecutor"
        raise TypeError(err)

    if isinstance(executor, ProcessPoolExecutor) and cloudpickle is None:
        raise ImportError(
            "ProcessPoolExecutor requires 'cloudpickle'. "
            "Install it with: pip install cloudpickle"
        )

    self.executor = executor
    self.tasks: dict[str, asyncio.Task] = {}
    self._callback_func: Callable = lambda t, s: None
    self._initialized = False
    self._backend_state = BackendMainStates.INITIALIZED

name property

name: str

Name of the backend.

__await__

__await__()

Make backend awaitable.

Source code in src/rhapsody/backends/execution/concurrent.py
def __await__(self):
    """Make backend awaitable."""
    return self._async_init().__await__()

submit_tasks async

submit_tasks(tasks: list[dict[str, Any]]) -> list[Task]

Submit tasks for execution.

Source code in src/rhapsody/backends/execution/concurrent.py
async def submit_tasks(self, tasks: list[dict[str, Any]]) -> list[asyncio.Task]:
    """Submit tasks for execution."""
    # Set backend state to RUNNING when tasks are submitted
    if self._backend_state != BackendMainStates.RUNNING:
        self._backend_state = BackendMainStates.RUNNING
        self.logger.debug(f"Backend state set to: {self._backend_state.value}")

    submitted_tasks = []

    for task in tasks:
        future = asyncio.create_task(self._handle_task(task))
        submitted_tasks.append(future)

        self.tasks[task["uid"]] = task
        self.tasks[task["uid"]]["future"] = future

    return submitted_tasks

cancel_task async

cancel_task(uid: str) -> bool

Cancel a task by its UID.

Parameters:

  • uid (str) –

    The UID of the task to cancel.

Returns:

  • bool ( bool ) –

    True if the task was found and cancellation was attempted, False otherwise.

Source code in src/rhapsody/backends/execution/concurrent.py
async def cancel_task(self, uid: str) -> bool:
    """Cancel a task by its UID.

    Args:
        uid (str): The UID of the task to cancel.

    Returns:
        bool: True if the task was found and cancellation was attempted,
              False otherwise.
    """
    if uid in self.tasks:
        task = self.tasks[uid]
        future = task["future"]
        if future and future.cancel():
            # Set state on the task object itself before callback
            self._callback_func(task, "CANCELED")
            return True
    return False

cancel_all_tasks async

cancel_all_tasks() -> int

Cancel all running tasks.

Source code in src/rhapsody/backends/execution/concurrent.py
async def cancel_all_tasks(self) -> int:
    """Cancel all running tasks."""
    cancelled_count = 0
    for task in self.tasks.values():
        future = task["future"]
        future.cancel()
        cancelled_count += 1
    self.tasks.clear()
    return cancelled_count

shutdown async

shutdown() -> None

Shutdown the executor.

Source code in src/rhapsody/backends/execution/concurrent.py
async def shutdown(self) -> None:
    """Shutdown the executor."""
    # Set backend state to SHUTDOWN
    self._backend_state = BackendMainStates.SHUTDOWN
    self.logger.debug(f"Backend state set to: {self._backend_state.value}")

    await self.cancel_all_tasks()
    self.executor.shutdown(wait=True)
    self.logger.info("Concurrent execution backend shutdown complete")

state async

state() -> str

Get backend state.

Returns:

  • str ( str ) –

    Current backend state (INITIALIZED, RUNNING, SHUTDOWN)

Source code in src/rhapsody/backends/execution/concurrent.py
async def state(self) -> str:
    """Get backend state.

    Returns:
        str: Current backend state (INITIALIZED, RUNNING, SHUTDOWN)
    """
    return self._backend_state.value

__aenter__ async

__aenter__()

Async context manager entry.

Source code in src/rhapsody/backends/execution/concurrent.py
async def __aenter__(self):
    """Async context manager entry."""
    if not self._initialized:
        await self._async_init()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in src/rhapsody/backends/execution/concurrent.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.shutdown()

create async classmethod

create(executor: Executor) -> ConcurrentExecutionBackend

Alternative factory method for creating initialized backend.

Parameters:

  • executor (Executor) –

    A concurrent.Executor instance (ThreadPoolExecutor or ProcessPoolExecutor).

Returns:

Source code in src/rhapsody/backends/execution/concurrent.py
@classmethod
async def create(cls, executor: Executor) -> "ConcurrentExecutionBackend":
    """Alternative factory method for creating initialized backend.

    Args:
        executor: A concurrent.Executor instance (ThreadPoolExecutor
                  or ProcessPoolExecutor).

    Returns:
        Fully initialized ConcurrentExecutionBackend instance.
    """
    backend = cls(executor)
    return await backend

register_callback

register_callback(func: Callable[[dict[str, Any], str], None]) -> None

Register a callback function for task state changes.

This chains the user's callback with the internal callback used by wait_tasks(). Both callbacks will be invoked on every state change.

Parameters:

  • func (Callable[[dict[str, Any], str], None]) –

    A callable that will be invoked when task states change. The function should accept task and state parameters.

Source code in src/rhapsody/backends/base.py
def register_callback(self, func: Callable[[dict[str, Any], str], None]) -> None:
    """Register a callback function for task state changes.

    This chains the user's callback with the internal callback used by wait_tasks().
    Both callbacks will be invoked on every state change.

    Args:
        func: A callable that will be invoked when task states change.
            The function should accept task and state parameters.
    """
    self._callback_func = func