Resource Specification Guide¶
RHAPSODY uses a per-backend model for resource requirements.
Resource requirements are expressed through task_backend_specific_kwargs, which is forwarded directly to the backend that executes the task during the creation and submission of the task.
This design keeps the task API thin and makes resource semantics explicit per backend.
Quick Reference¶
| Backend | Resource key | cwd | env | shell |
|---|---|---|---|---|
ConcurrentExecutionBackend |
not supported | task_backend_specific_kwargs={"cwd": "..."} |
task_backend_specific_kwargs={"env": {...}} |
task_backend_specific_kwargs={"shell": True} |
DaskExecutionBackend |
task_backend_specific_kwargs={"resources": {"GPU": 1}} |
task_backend_specific_kwargs={"cwd": "..."} |
task_backend_specific_kwargs={"env": {...}} |
task_backend_specific_kwargs={"shell": True} |
DragonExecutionBackendV3 |
task_backend_specific_kwargs={"process_template": {"policy": ...}} |
task_backend_specific_kwargs={"process_template": {"cwd": "..."}} |
task_backend_specific_kwargs={"process_template": {"env": {...}}} |
not applicable |
Concurrent Backend¶
ConcurrentExecutionBackend runs tasks in a local ThreadPoolExecutor or ProcessPoolExecutor.
There is no slot or resource scheduler — all processes share the same host.
Supported task_backend_specific_kwargs:
| Key | Type | Default | Description |
|---|---|---|---|
cwd |
str |
None (inherits process cwd) |
Working directory for the subprocess |
env |
dict |
None (inherits parent environment) |
Environment variables |
shell |
bool |
False |
Execute via shell (create_subprocess_shell) |
from rhapsody.api import ComputeTask
# Working directory
task = ComputeTask(
executable="/bin/pwd",
task_backend_specific_kwargs={"cwd": "/data"},
)
# Custom environment
task = ComputeTask(
executable="/bin/printenv",
task_backend_specific_kwargs={
"env": {"MY_VAR": "hello", "CUDA_VISIBLE_DEVICES": "0"},
},
)
# Shell execution
task = ComputeTask(
executable="echo $HOSTNAME && date",
task_backend_specific_kwargs={"shell": True, "cwd": "/tmp"},
)
CPU / GPU pinning
The Concurrent backend has no resource scheduler. To pin threads or GPUs, do it
inside your task function — for example, call torch.cuda.set_device(gpu_id) at
the start of your callable, or use os.sched_setaffinity for CPU pinning.
Dask Backend¶
DaskExecutionBackend submits tasks to a Dask cluster. Resource scheduling uses Dask's
native resource system.
Supported task_backend_specific_kwargs:
| Key | Type | Default | Description |
|---|---|---|---|
resources |
dict |
{} |
Dask resource constraints forwarded to client.submit(..., resources=...) |
cwd |
str |
None |
Working directory for executable tasks |
env |
dict |
None |
Environment variables for executable tasks |
shell |
bool |
False |
Execute executable via shell |
cwd and env apply to executable tasks only (they are passed to subprocess.run inside the Dask worker). For function tasks, set the working directory or environment from within the function itself.
from rhapsody.api import ComputeTask
# GPU resource constraint — worker must be started with --resources "GPU=1"
task = ComputeTask(
function=my_gpu_fn,
args=(data,),
task_backend_specific_kwargs={
"resources": {"GPU": 1},
},
)
# Multi-resource constraint
task = ComputeTask(
function=large_model_fn,
args=(data,),
task_backend_specific_kwargs={
"resources": {"GPU": 2, "memory": 32e9},
},
)
# Executable with cwd, env, and shell
task = ComputeTask(
executable="./simulate.sh",
arguments=["--config", "run.yaml"],
task_backend_specific_kwargs={
"cwd": "/scratch/run-001",
"env": {"OMP_NUM_THREADS": "4"},
"shell": True,
},
)
Workers must be started advertising the resources you request:
# Start a worker with GPU and custom memory resources
dask worker tcp://scheduler:8786 --resources "GPU=1,memory=32e9"
Unsatisfiable resources
If no worker can satisfy the requested resources, the task fails immediately with
a TaskValidationError-like message instead of hanging indefinitely. This is caught
by DaskExecutionBackend._check_resources_satisfiable() at submit time.
Cluster injection¶
The resources= init parameter configures LocalCluster. For any other cluster,
pass it via cluster= or client= — the task code is unchanged:
from dask_jobqueue import SLURMCluster
from rhapsody.backends import DaskExecutionBackend
cluster = SLURMCluster(cores=4, memory="8GB", walltime="01:00:00")
cluster.scale(jobs=4)
backend = await DaskExecutionBackend(cluster=cluster)
Dragon Backend (V3)¶
DragonExecutionBackendV3 submits tasks to the Dragon runtime. All per-task resource
and placement settings flow through Dragon's ProcessTemplate.
There is no backend-level working_directory in V3. Every process
configuration must be set per-task via task_backend_specific_kwargs.
Supported task_backend_specific_kwargs:
| Key | Type | Description |
|---|---|---|
process_template |
dict |
ProcessTemplate kwargs for a single-process task |
process_templates |
list[tuple[int, dict]] |
List of (n_procs, template_dict) for multi-rank jobs |
ProcessTemplate kwargs (passed inside process_template or each entry of process_templates):
| Parameter | Type | Description |
|---|---|---|
cwd |
str |
Working directory for the process (default: ".") |
env |
dict |
Environment variables |
stdout |
int |
stdout handling (PIPE, STDOUT, None) |
stderr |
int |
stderr handling (PIPE, STDOUT, None) |
policy |
Policy |
Placement and GPU/NUMA affinity |
options |
ProcessOptions |
Dragon process options |
Excluded parameters
target, args, and kwargs are managed by RHAPSODY internally via
ComputeTask.executable, ComputeTask.arguments, and ComputeTask.function.
Do not set them inside process_template.
Single-process task¶
from rhapsody.api import ComputeTask
# Executable with cwd and env
task = ComputeTask(
executable="/bin/bash",
arguments=["-c", "echo $HOSTNAME"],
task_backend_specific_kwargs={
"process_template": {
"cwd": "/scratch/run-001",
"env": {"OMP_NUM_THREADS": "8"},
}
},
)
# Function task with cwd
task = ComputeTask(
function=my_simulation,
args=(config,),
task_backend_specific_kwargs={
"process_template": {"cwd": "/scratch/run-001"},
},
)
Multi-rank (parallel) job¶
Use process_templates (plural) to launch a task as a parallel job. Each entry is
(n_procs, template_dict):
# 2 groups × 2 processes = 4 processes total
task = ComputeTask(
executable="/bin/bash",
arguments=["-c", "echo $HOSTNAME"],
task_backend_specific_kwargs={
"process_templates": [
(2, {"cwd": "/data", "env": {"RANK": "0"}}),
(2, {"cwd": "/data", "env": {"RANK": "2"}}),
]
},
)
GPU affinity with Policy¶
Use Dragon's Policy to pin processes to specific GPUs across nodes:
from dragon.infrastructure.policy import Policy
from dragon.native.machine import System, Node
from rhapsody.api import ComputeTask
def make_gpu_policies(nprocs: int):
"""Round-robin GPU assignment across all Dragon nodes."""
all_gpus = [
(node.hostname, gpu_id)
for huid in System().nodes
for node in [Node(huid)]
for gpu_id in node.gpus
]
return [
Policy(
placement=Policy.Placement.HOST_NAME,
host_name=all_gpus[i % len(all_gpus)][0],
gpu_affinity=[all_gpus[i % len(all_gpus)][1]],
)
for i in range(nprocs)
]
policies = make_gpu_policies(nprocs=4)
# Single process pinned to GPU 0
task = ComputeTask(
function=gpu_work,
task_backend_specific_kwargs={
"process_template": {"policy": policies[0]},
},
)
# Parallel job: 2 groups of 2 processes each, each group on its own GPU
task = ComputeTask(
function=gpu_work,
task_backend_specific_kwargs={
"process_templates": [
(2, {"policy": policies[0]}),
(2, {"policy": policies[1]}),
]
},
)