Skip to content

Integrations

RHAPSODY provides seamless integrations with several high-performance computing and AI infrastructure tools. This guide covers the available integrations and how to use them effectively.

Dragon-VLLM Inference Backend

The Dragon-VLLM integration provides a high-performance inference backend that combines Dragon's distributed computing capabilities with vLLM's efficient LLM serving.

Overview

The DragonVllmInferenceBackend offers:

  • Request Batching: Automatically accumulates individual requests into efficient batches
  • Server Mode: Optional HTTP server with OpenAI-compatible API endpoints
  • Engine Mode: Direct Python API for programmatic access
  • Async Operations: Non-blocking inference with asyncio integration
  • Multi-Node Support: Scale inference across multiple GPU nodes

Installation

Install RHAPSODY with Dragon-VLLM support:

pip install "rhapsody-py[vllm-dragon]"
pip install git+https://github.com/radical-cybertools/vllm-dragonhpc.git@main

Prerequisites

The Dragon-VLLM backend requires:

  • Dragon runtime (dragonhpc)
  • Dragon-VLLM package (dragon-vllm)
  • Python >= 3.10
  • GPU nodes with CUDA support

Basic Usage

Here's a complete example of using the Dragon-VLLM backend for AI inference:

import asyncio
import multiprocessing as mp

from rhapsody import Session
from rhapsody.api import AITask, ComputeTask
from rhapsody.backends import DragonExecutionBackendV3, DragonVllmInferenceBackend

async def main():
    # Set Dragon as the multiprocessing start method
    mp.set_start_method("dragon")

    # Initialize execution backend for compute tasks
    execution_backend = await DragonExecutionBackendV3(num_workers=4)

    # Initialize inference backend for AI tasks
    inference_backend = DragonVllmInferenceBackend(
        config_file="config.yaml",
        model_name="Qwen2.5-0.5B-Instruct",
        num_nodes=1,
        num_gpus=1,
        tp_size=1,
        port=8001,
        offset=0,
    )

    # Initialize the inference service
    await inference_backend.initialize()

    # Create a session with both backends
    session = Session([execution_backend, inference_backend])

    # Define mixed workload: AI and compute tasks
    tasks = [
        AITask(
            prompt="What is the capital of France?",
            backend=inference_backend.name
        ),
        AITask(
            prompt=["Tell me a joke", "What is 2+2?"],
            backend=inference_backend.name
        ),
        ComputeTask(
            executable="/usr/bin/echo",
            arguments=["Hello from Dragon!"],
            backend=execution_backend.name,
        ),
    ]

    # Submit all tasks at once
    await session.submit_tasks(tasks)

    # Wait for results
    results = await asyncio.gather(*tasks)

    # Process results
    for i, task in enumerate(results):
        if "prompt" in task:
            # AITask: use .response for model output
            print(f"Task {i + 1} [AI]: {task.response}")
        else:
            # ComputeTask: use .return_value for function/executable output
            print(f"Task {i + 1} [Compute]: {task.return_value}")

    await session.close()

if __name__ == "__main__":
    asyncio.run(main())

Running with Dragon

Scripts using Dragon backends must be launched with the dragon command:

dragon -m my_script.py

Configuration File

The DragonVllmInferenceBackend requires a YAML configuration file (config.yaml) that defines model settings, hardware allocation, batching behavior, and optional guardrails. A sample configuration is maintained in the vllm-dragonhpc repository:

# Download the sample config
wget https://raw.githubusercontent.com/radical-cybertools/vllm-dragonhpc/main/config.sample -O config.yaml

Edit config.yaml to match your environment — at minimum, set model_name to your target HuggingFace model and hf_token to your access token. The sample includes sections for hardware allocation, LLM parameters (precision, token limits, sampling), input batching, guardrails, and dynamic inference worker scaling. See the config.sample for all available options and their defaults.

Configuration Options

The DragonVllmInferenceBackend supports the following parameters:

Parameter Type Default Description
config_file str Required Path to vLLM configuration YAML file
model_name str Required HuggingFace model name or local path
num_nodes int 1 Number of nodes for inference
num_gpus int 1 Number of GPUs per node
tp_size int 1 Tensor parallelism size
port int 8000 HTTP server port (if use_service=True)
offset int 0 Node offset for multi-service deployments
use_service bool True Enable HTTP server with OpenAI-compatible API
max_batch_size int 1024 Maximum requests per batch
max_batch_wait_ms int 500 Maximum time to wait for batch accumulation

Server Mode vs Engine Mode

Server Mode (HTTP API)

When use_service=True, the backend starts an HTTP server with OpenAI-compatible endpoints:

inference_backend = DragonVllmInferenceBackend(
    config_file="config.yaml",
    model_name="llama-3-8b",
    use_service=True,
    port=8000
)

await inference_backend.initialize()

# Access via HTTP
# GET  http://<hostname>:8000/health
# POST http://<hostname>:8000/generate
# POST http://<hostname>:8000/v1/chat/completions  (OpenAI-compatible)
# GET  http://<hostname>:8000/v1/models

Engine Mode (Direct API)

When use_service=False, use the Python API directly:

inference_backend = DragonVllmInferenceBackend(
    config_file="config.yaml",
    model_name="llama-3-8b",
    use_service=False
)

await inference_backend.initialize()

# Direct inference
results = await inference_backend.generate(
    prompts=["What is AI?", "Explain machine learning"],
    timeout=300
)

Batching Strategy

The backend automatically batches requests for optimal throughput:

  1. Accumulates requests for up to max_batch_wait_ms milliseconds
  2. Processes immediately when batch reaches max_batch_size
  3. Submits combined batch to vLLM pipeline
  4. Distributes responses back to individual requests

This is significantly more efficient than processing requests individually, especially for high-throughput workloads.

Performance Tuning

  • Increase max_batch_size for higher throughput with more memory
  • Reduce max_batch_wait_ms for lower latency
  • Use tp_size > 1 for large models that don't fit on a single GPU

RADICAL AsyncFlow Integration

RHAPSODY integrates seamlessly with RADICAL AsyncFlow, a high-performance workflow engine for dynamic, asynchronous task graphs.

Overview

RADICAL AsyncFlow provides:

  • Dynamic Task Graphs: Create workflows with dependencies at runtime
  • Async/Await Syntax: Natural Python async programming model
  • Decorator-Based API: Simple function-to-task conversion
  • Backend Flexibility: Use any RHAPSODY backend as execution engine

Installation

Install RHAPSODY with AsyncFlow support:

pip install radical.asyncflow
pip install "rhapsody-py[dragon]"  # or your preferred backend

Basic Usage

Here's a complete workflow example using AsyncFlow with RHAPSODY's Dragon backend:

import asyncio
import multiprocessing as mp

from radical.asyncflow import WorkflowEngine
from rhapsody.backends import DragonExecutionBackendV3

async def main():
    # Set Dragon as the multiprocessing start method
    mp.set_start_method("dragon")

    # Initialize RHAPSODY backend
    backend = await DragonExecutionBackendV3()

    # Create AsyncFlow workflow engine with RHAPSODY backend
    flow = await WorkflowEngine.create(backend=backend)

    # Define tasks using decorators
    @flow.function_task
    async def task1(*args):
        """Data generation task"""
        print("Task 1: Generating data")
        data = list(range(1000))
        return sum(data)

    @flow.function_task
    async def task2(*args):
        """Data processing task"""
        input_data = args[0]
        print(f"Task 2: Processing data, input sum: {input_data}")
        return [x for x in range(1000) if x % 2 == 0]

    @flow.function_task
    async def task3(*args):
        """Data aggregation task"""
        sum_data, even_numbers = args
        print(f"Task 3: Aggregating results")
        return {
            "total_sum": sum_data,
            "even_count": len(even_numbers)
        }

    # Define workflow with dependencies
    async def run_workflow(wf_id):
        print(f"Starting workflow {wf_id}")

        # Create task graph: task3 depends on task1 and task2
        # task2 depends on task1
        t1 = task1()
        t2 = task2(t1)  # task2 waits for task1
        t3 = task3(t1, t2)  # task3 waits for both task1 and task2

        result = await t3  # Await final task
        print(f"Workflow {wf_id} completed, result: {result}")
        return result

    # Run multiple workflows concurrently
    results = await asyncio.gather(*[run_workflow(i) for i in range(10)])

    print(f"Completed {len(results)} workflows")

    # Shutdown the workflow engine
    await flow.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Running with Dragon

When using Dragon backend with AsyncFlow, launch with the dragon command:

dragon -m workflow.py

Key Features

1. Automatic Dependency Management

AsyncFlow automatically tracks dependencies between tasks based on function arguments:

@flow.function_task
async def step1():
    return "data"

@flow.function_task
async def step2(input_data):
    return f"processed_{input_data}"

# AsyncFlow automatically creates dependency: step2 waits for step1
result1 = step1()
result2 = step2(result1)
await result2

2. Concurrent Workflow Execution

Run multiple independent workflows in parallel:

# Each workflow has its own task graph
workflows = [run_workflow(i) for i in range(1000)]

# Execute all workflows concurrently
results = await asyncio.gather(*workflows)

3. Backend Interoperability

AsyncFlow works with any RHAPSODY backend:

# Local execution
from rhapsody.backends import ConcurrentExecutionBackend
backend = await ConcurrentExecutionBackend()

# Dask cluster
from rhapsody.backends import DaskExecutionBackend
backend = await DaskExecutionBackend()

# Dragon HPC
from rhapsody.backends import DragonExecutionBackendV3
backend = await DragonExecutionBackendV3(num_workers=2048)

# Create workflow with chosen backend
flow = await WorkflowEngine.create(backend=backend)

Performance Considerations

Scaling Guidelines

  • Use Dragon backend for HPC-scale workflows (1000+ concurrent tasks)
  • Use Dask backend for distributed cluster computing
  • Use Concurrent backend for local development and testing

Task Granularity

  • AsyncFlow excels at dynamic, fine-grained task graphs
  • For coarse-grained tasks, consider using RHAPSODY's Session API directly
  • All RHAPSODY API capabilities are exposed to AsyncFlow to launch workloads and workflows.

Dual API Usage

It is highly recommended not to combine RHAPSODY API with AsyncFlow API due to the possibility of asyncio.loop blocking.

For more information on specific backends, see the Advanced Usage guide.