Skip to content

Learners with Parameterization Tutorial

This tutorial demonstrates how to configure and run multiple learning pipelines concurrently using ParallelActiveLearner. Youโ€™ll learn how to:

  • Set up parallel workflows
  • Configure each learner independently
  • Use per-iteration and adaptive configurations
  • Run learners concurrently with individual stop criteria

Note

This approach can be applied for both Active and Reinforcement learners (Sequential and Parallel).

Example Overview

This example includes:

  • Learner 0: Adaptive config โ€” increasing labeled data, decreasing noise & learning rate
  • Learner 1: Per-iteration config โ€” specific checkpoints for tuning
  • Learner 2: Static config โ€” constant settings throughout
  • All learners run concurrently and independently

Configuration Modes

๐Ÿง  Adaptive Configuration

  • Receives iteration number i
  • Labeled data: 100 + i*50
  • Noise: 0.1 * (0.95^i)
  • Learning rate: 0.01 * (0.9^i)
  • Batch size increases gradually, capped at 64

๐Ÿ” Per-Iteration Configuration

  • Iteration keys (e.g., 0, 5, 10) set exact checkpoints
  • -1 is the fallback/default config
  • Ideal for curriculum learning or scheduled tuning

Setup

Warning

The entire API of ROSE must be within an async context.

1. Imports & Engine

import os
import sys

from rose import TaskConfig
from rose import LearnerConfig

from rose.al import ParallelActiveLearner
from rose.metrics import MEAN_SQUARED_ERROR_MSE

from radical.asyncflow import WorkflowEngine
from radical.asyncflow import RadicalExecutionBackend

engine = await RadicalExecutionBackend(
    {'runtime': 30,
     'resource': 'local.localhost'
     }
     )
asyncflow = await WorkflowEngine.create(engine)
acl = ParallelActiveLearner(asyncflow)
code_path = f'{sys.executable} {os.getcwd()}'

1. Define Workflow Tasks

@acl.simulation_task
async def simulation(*args, **kwargs):
    n_labeled = kwargs.get("--n_labeled", 100)
    n_features = kwargs.get("--n_features", 2)

    return f"{code_path}/sim.py --n_labeled {n_labeled} --n_features {n_features}"

@acl.training_task
async def training(*args, **kwargs):
    learning_rate = kwargs.get("--learning_rate", 0.1)
    return f'{code_path}/train.py --learning_rate {learning_rate}'

@acl.active_learn_task
async def active_learn(*args, **kwargs):
    return f'{code_path}/active.py'

@acl.as_stop_criterion(metric_name=MEAN_SQUARED_ERROR_MSE, threshold=0.1)
async def check_mse(*args, **kwargs):
    return f'{code_path}/check_mse.py'

Configuration Approaches

Approach 1: Static Configuration

results = await acl.teach(
    parallel_learners=2,
    max_iter=10,
    learner_configs=[
        ParallelLearnerConfig(
            simulation=TaskConfig(kwargs={"--n_labeled": "200", "--n_features": 2}),
            training=TaskConfig(kwargs={"--learning_rate": "0.01"})
        ),
        ParallelLearnerConfig(
            simulation=TaskConfig(kwargs={"--n_labeled": "300", "--n_features": 4}),
            training=TaskConfig(kwargs={"--learning_rate": "0.005"})
        )
    ]
)

Approach 2: Per-Iteration Configuration

results = await acl.teach(
    parallel_learners=3,
    max_iter=15,
    learner_configs=[
        ParallelLearnerConfig(
            simulation=TaskConfig(kwargs={"--n_labeled": "200", "--n_features": 2})
        ),
        ParallelLearnerConfig(
            simulation={
                0: TaskConfig(kwargs={"--n_labeled": "100"}),
                5: TaskConfig(kwargs={"--n_labeled": "200"}),
                10: TaskConfig(kwargs={"--n_labeled": "400"}),
                -1: TaskConfig(kwargs={"--n_labeled": "500"})
            },
            training={
                0: TaskConfig(kwargs={"--learning_rate": "0.01"}),
                5: TaskConfig(kwargs={"--learning_rate": "0.005"}),
                -1: TaskConfig(kwargs={"--learning_rate": "0.001"})
            }
        ),
        None  # Default to base task behavior
    ]
)

Per-Iteration Config Keys

Use numeric keys for specific iterations and -1 as a fallback.

Approach 3: Adaptive Configuration

adaptive_sim = acl.create_adaptive_schedule('simulation', 
    lambda i: {
        'kwargs': {
            '--n_labeled': str(100 + i * 50),
            '--n_features': 2,
            '--noise_level': str(0.1 * (0.95 ** i))
        }
    })

adaptive_train = acl.create_adaptive_schedule('training',
    lambda i: {
        'kwargs': {
            '--learning_rate': str(0.01 * (0.9 ** i)),
            '--batch_size': str(min(64, 32 + i * 4))
        }
    })

results = await acl.teach(
    parallel_learners=2,
    max_iter=20,
    learner_configs=[
        ParallelLearnerConfig(simulation=adaptive_sim, training=adaptive_train),
        ParallelLearnerConfig(
            simulation=TaskConfig(kwargs={"--n_labeled": "300", "--n_features": 4}),
            training=TaskConfig(kwargs={"--learning_rate": "0.005"})
        )
    ]
)

Full Example: All Approaches Combined

adaptive_sim = acl.create_adaptive_schedule('simulation', 
    lambda i: {
        'kwargs': {
            '--n_labeled': str(100 + i * 50),
            '--n_features': 2
        }
    })

results = await acl.teach(
    parallel_learners=3,
    max_iter=15,
    learner_configs=[
        ParallelLearnerConfig(simulation=adaptive_sim),  # Adaptive
        ParallelLearnerConfig(                           # Per-iteration
            simulation={
                0: TaskConfig(kwargs={"--n_labeled": "150", "--n_features": 3}),
                7: TaskConfig(kwargs={"--n_labeled": "250", "--n_features": 3}),
                -1: TaskConfig(kwargs={"--n_labeled": "400", "--n_features": 3})
            }
        ),
        ParallelLearnerConfig(                           # Static
            simulation=TaskConfig(kwargs={"--n_labeled": "300", "--n_features": 4})
        )
    ]
)

await acl.shutdown()

Execution Details

Concurrent Execution

All learners run in parallel and independently. The workflow completes when all learners either reach max_iter or meet their stop criterion.

Stop Criteria

Each learner evaluates its own stop condition. One learner stopping does not affect others.

Performance Tip

Match the number of parallel learners to available resources. Overloading can slow down execution.

Quick Reference

create_iteration_schedule(task_name, schedule)

schedule = {
    0: {'kwargs': {'--learning_rate': '0.01'}},
    5: {'kwargs': {'--learning_rate': '0.005'}},
    -1: {'kwargs': {'--learning_rate': '0.001'}}
}
config = acl.create_iteration_schedule('training', schedule)

create_adaptive_schedule(task_name, fn)

def lr_decay(i):
    return {'kwargs': {'--learning_rate': str(0.01 * (0.95 ** i))}}

adaptive_config = acl.create_adaptive_schedule('training', lr_decay)

Next Steps

  • ๐Ÿงช Try different active learning algorithms per learner

  • ๐ŸŽฏ Use per-iteration configs to design curriculum learning

  • ๐Ÿ“Š Run parameter sweeps

  • ๐Ÿš€ Scale learners to match compute resources