Changelog¶
[0.2.0] - 2026-04-04¶
Added¶
DragonExecutionBackendV3: migrated to the Dragon batch.py streaming pipeline. Tasks submitted viasession.submit_tasks()are dispatched individually by a continuously running background thread — there is no compile or start step.DragonExecutionBackendV3: accepts two new constructor parameters:num_nodes(total nodes) andpool_nodes(nodes per worker pool), forwarded directly toBatch().DragonExecutionBackendV3.fence(): new method that delegates tobatch.fence(), allowing callers to wait for all in-flight tasks submitted by this client to complete.DragonExecutionBackendV3.create_ddict(): new helper that creates a DragonDDictinstance directly from the backend.DragonExecutionBackendV3._deliver_batch(): replaces_deliver_result/_deliver_failure. All tasks that complete within a single monitor sweep are collected into a list and delivered to the asyncio event loop in onecall_soon_threadsafecall instead of one per task — reducing cross-thread wakeups from O(tasks) to O(sweeps).DragonExecutionBackendV3result monitoring reads directly fromBatch.results_ddict(the same distributed dict Dragon workers write to) instead of going through theTaskobject. The membership check and value read are now a singletry/except KeyErroroperation, eliminating the redundant__contains__network round-trip (was two DDict RTTs per ready task, now one).DragonExecutionBackendV3: monitor thread now starts lazily on firstsubmit_tasks()call instead of at_async_inittime, eliminating the idle-spin phase while tasks are being built and registered.DragonExecutionBackendV3._cancelled_tasksconverted fromlisttoset: O(1) membership checks and automatic deduplication. Cancelled UIDs are now removed from the set once the monitor loop processes the task, preventing unbounded growth.DragonExecutionBackendV3._task_registryentries are now removed atomically (viadict.pop) on result or failure delivery, eliminating unbounded registry growth for long-running sessions.ConcurrentExecutionBackend: regular (synchronous) functions are now executed correctly in bothThreadPoolExecutorandProcessPoolExecutor. Previously, all function tasks were dispatched viaasyncio.run(func(...)), which raisedValueErrorfor non-coroutine callables. The executor now detectsasyncio.iscoroutinefunctionand calls sync functions directly.Session/TaskStateManager: task futures now propagate exceptions on failure. Previously all futures were resolved withset_result(task)regardless of outcome. Failures now resolve as:- Function task raises → original exception propagated via
fut.set_exception(exc). - Executable task returns non-zero exit code →
fut.set_exception(TaskExecutionError(uid, stderr, exit_code)). - Successful tasks →
fut.set_result(task)(unchanged).
- Function task raises → original exception propagated via
Session.wait_tasks: replaced bareexcept Exception(which silently swallowed all errors) withasyncio.gather(*futures, return_exceptions=True). Task failures no longer propagate throughwait_tasks; callers inspecttask.state/task.exception/task.stderrdirectly.- API documentation: new Wait Modes reference in
docs/api/index.mdwith a comparison table ofwait_tasks,asyncio.gather(*futures), andawait future / await task, including exception types and when to use each. - API documentation: new Callable Task API section documenting sync and async function support, executor behaviour, and result access fields.
- Getting-started guide: new Wait Modes and Sync and Async Callable Tasks sections added to
docs/getting-started/advanced-usage.mdwith runnable code examples. ConcurrentExecutionBackend: executable tasks now honour per-task working directory viatask.cwd(task-level) ortask_backend_specific_kwargs={"cwd": "..."}(overrides task-level); passed ascwd=toasyncio.create_subprocess_exec/asyncio.create_subprocess_shell.DaskExecutionBackend: executable tasks now honourcwdfromtask_backend_specific_kwargs(overrides task-leveltask.cwd); forwarded to_run_executablewhich passes it ascwd=tosubprocess.run.DragonExecutionBackendV3: documented that per-task working directory must be set viatask_backend_specific_kwargs={"process_template": {"cwd": "..."}}(or"process_templates"for MPI jobs); this is the only supported mechanism in V3.ConcurrentExecutionBackend: executable tasks now honourenvfromtask_backend_specific_kwargs={"env": {...}}; passed asenv=toasyncio.create_subprocess_exec/asyncio.create_subprocess_shell.None(default) inherits the parent process environment.- Unit tests for
cwdandenvbehaviour acrossConcurrentExecutionBackend,DaskExecutionBackend, andDragonExecutionBackendV3. DaskExecutionBackendnow supports all threeComputeTasktypes: sync functions, async functions, and executable tasks (via subprocess inside a Dask worker).- Cluster-agnostic design: pass
cluster=orclient=to target any Dask-compatible cluster (SLURM, Kubernetes, LocalCUDACluster, etc.) without changing task code. - Resource scheduling via
task_backend_specific_kwargs={"resources": {"GPU": 1}}routes tasks to workers advertising matching Dask resources. shell=Truefor executable tasks viatask_backend_specific_kwargs={"shell": True}.DaskExecutionBackend._check_resources_satisfiable(): pre-submit check that immediately fails tasks with unsatisfiable resource constraints instead of hanging indefinitely. Function tasks settask.exception; executable tasks settask.stderrandtask.exit_code = 1.- Integration tests for Dask backend: end-to-end sync/async/executable task execution, cluster injection (cluster and client), and resource constraint failure behavior.
Performance¶
DragonExecutionBackendV3._monitor_loop: eliminated redundantBatch.results_ddict.__contains__check — each ready task previously required two distributed DDict network round-trips (membership test + value read); now a singletry/except KeyErrorread is used, saving ~570µs per task on HPC interconnects (~5.7s for 10K tasks).DragonExecutionBackendV3._monitor_loop: monitor thread now starts lazily at firstsubmit_tasks()call instead of at backend initialisation, eliminating ~1.2s of idle spinning while tasks are being built.DragonExecutionBackendV3._deliver_batch: cross-thread wakeups reduced from O(tasks) to O(sweeps) — all completions found in a single sweep are batched into onecall_soon_threadsafecall (~0.78s saved for 10K tasks).DragonExecutionBackendV3: removed per-tasklogger.debugcalls from the monitor hot path, eliminating 10K f-string allocations per run (29ms at 10K tasks, scales linearly).TaskStateManager: removed_task_statesshadow dict — task state is now read directly fromtask["state"](single source of truth), eliminating one dict write per task completion.TaskStateManager._update_task_impl:_task_futuresentries are now removed viadict.popon resolution, preventing unbounded memory growth at scale.Session: removed history recording (task["history"]timestamps) — twotime.time()syscalls and two dict writes per task eliminated.Session.get_statistics: method removed entirely; it depended on history timestamps and iterated all tasks on every call.
Measured on 2 nodes / 128 workers (HPC), 10K function tasks:
| Run 1 | Run 2 | Run 3 | Run 4 | |
|---|---|---|---|---|
| Dragon batch only | 9.82s | 10.92s | 9.99s | 10.08s |
| RHAPSODY + Dragon batch | 10.50s | 12.17s | 9.92s | 11.18s |
RHAPSODY overhead reduced from ~7.8s (before) to ≤1.3s (after) — within normal run-to-run variance of the Dragon batch itself.
Fixed¶
ConcurrentExecutionBackend._run_in_thread: calling a regular (sync) function no longer raisesValueError: a coroutine was expected.ConcurrentExecutionBackend._run_in_process: same fix — sync functions are called directly after cloudpickle deserialization.TaskStateManager._update_task_impl: futures for failed tasks are now resolved withset_exceptioninstead ofset_result, soawait futureandawait asyncio.gather(*futures)correctly raise on task failure.TaskStateManager.get_wait_future: same fix applied to the race-condition path (task already completed beforeget_wait_futurewas called).Session.wait_tasks: removedexcept Exception: logger.error(...)which was silently eating all non-timeout exceptions including programming errors.
Changed¶
ComputeTask: renamedworking_directoryparameter and dict key tocwdto distinguish task-level working directory from backend-level working directory (used by Dragon V1/V2 viaresources["working_dir"]). Update call sites:ComputeTask(cwd="/path")andtask["cwd"].DragonExecutionBackendV3: removed the silently-ignoredworking_directoryparameter from__init__andcreate()— it was never stored or used (unlike V1/V2 which honourresources["working_dir"]). Usetask_backend_specific_kwargs={"process_template": {"cwd": "..."}}instead.DaskExecutionBackend: resource constraints are now specified exclusively viatask_backend_specific_kwargs={"resources": {...}}(previously viagpu=/cpu_threads=onComputeTask).DaskExecutionBackend._build_dask_resources(): reads fromtask_backend_specific_kwargs["resources"]instead of task-levelgpu/cpu_threadsfields.DaskExecutionBackend._submit_executable():shell=now reads fromtask_backend_specific_kwargsfor consistency.- Optimized Dragon backend tests by using module-scoped fixtures, reusing a single backend instance per Dragon version (V1, V2, V3) across all tests instead of creating/destroying one per test.
Breaking Changes¶
DragonExecutionBackendV3: removednum_workers,disable_background_batching, anddisable_batch_submissionconstructor parameters. These were incompatible with the new streaming pipeline — the Dragon batch always runs in streaming mode and worker counts are controlled vianum_nodes/pool_nodes. Migration:DragonExecutionBackendV3(num_workers=16)→DragonExecutionBackendV3(num_nodes=4, pool_nodes=2)(or simply omit both to let Dragon decide)DragonExecutionBackendV3(disable_batch_submission=True)→ remove (streaming is now always on)DragonExecutionBackendV3(disable_background_batching=True)→ remove
ComputeTaskandAITask: removedranks,memory,gpu,cpu_threads, andenvironmentparameters. These fields were never consumed by any execution backend — they were silently ignored, creating a misleading API. Resource requirements are now backend-specific and must be passed viatask_backend_specific_kwargs. Migration:ComputeTask(executable=..., gpu=2)→ComputeTask(executable=..., task_backend_specific_kwargs={"resources": {"GPU": 2}})(Dask) ortask_backend_specific_kwargs={"process_template": {"gpu": 2}}(Dragon V3)ComputeTask(executable=..., environment={"K": "V"})→ComputeTask(executable=..., task_backend_specific_kwargs={"env": {"K": "V"}})(Concurrent/Dask)ComputeTask: removedcwdandshellas named parameters. All execution context is now exclusively specified viatask_backend_specific_kwargsfor consistency. Migration:ComputeTask(executable=..., cwd="/path")→ComputeTask(executable=..., task_backend_specific_kwargs={"cwd": "/path"})ComputeTask(executable=..., shell=True)→ComputeTask(executable=..., task_backend_specific_kwargs={"shell": True})
Fixed¶
_run_executable: replacedstdout=subprocess.PIPE, stderr=subprocess.PIPEwithcapture_output=True(ruff UP022).
[0.1.2] - 2026-02-16¶
Fixed¶
- Fixed project URLs in
pyproject.tomlto point to the correct GitHub repository. - Removed Python 3.8 classifier (package requires Python >= 3.9).
[0.1.1] - 2026-02-16¶
Fixed¶
- Fixed
ImportErrorwhen installingrhapsody-pywithout optional backends (Dragon, Dask, RADICAL-Pilot). Optional backend imports inbackends/__init__.pyare now wrapped intry/except.
[0.1.0] - 2025-02-16¶
Added¶
- Initial PyPI release as
rhapsody-py(import name:rhapsody). - Execution backends: Concurrent (built-in), Dask, RADICAL-Pilot, Dragon (V1, V2, V3).
- Inference backend: Dragon-VLLM for LLM serving on HPC.
- Task API:
ComputeTaskfor executables/functions,AITaskfor LLM inference. - Session API for task submission, monitoring, and lifecycle management.
- Backend discovery and registry system.
- Removed ThreadPoolExecutor wait approach (
_wait_executor), streamlining concurrency management. - Added
disable_batch_submissionparameter toDragonExecutionBackendV3for configurable task submission strategy. - Introduced polling-based batch monitoring with
_monitor_loopand_process_batch_results.