Define your target machine to run on¶
Import ROSE main modules:
from rose.metrics import MEAN_SQUARED_ERROR_MSE
from rose.al.active_learner import SequentialActiveLearner
from radical.asyncflow import WorkflowEngine
from radical.asyncflow import RadicalExecutionBackend
Define your resource engine, as we described in our previous Target Resources step:
engine = await RadicalExecutionBackend({'resource': 'local.localhost'})
asyncflow = await WorkflowEngine.create(engine)
acl = SequentialActiveLearner(asyncflow)
Now our resource engine is defined, lets define our main AL workflow components:
Note
The Task object is based on the Radical.Pilot.TaskDescription, meaning that users can pass any args
and kwargs
that the Radical.Pilot.TaskDescription
can accept to the Task object.
@acl.simulation_task
async def simulation(*args):
return 'python3 sim.py'
@acl.training_task
async def training(*args):
return f'python3 train.py'
@acl.active_learn_task
async def active_learn(*args):
return f'python3 active.py'
Tip
ROSE supports defining tasks with python code instead of executables (i.e., python scripts, shell scripts, etc.). To do that, the user have to pass the as_executable=False
argument to the decorator as follows:
@acl.simulation_task(as_executable=False)
async def run_simulation(*args) -> dict:
"""Simulate a process and return dummy simulation results."""
await asyncio.sleep(1) # Simulate async workload
results = {
"input": args,
"output": [random.random() for _ in range(5)]
}
return results
@acl.training_task(as_executable=False)
async def run_training(simulation_results: dict) -> dict:
"""Train a dummy model using simulation results."""
await asyncio.sleep(1) # Simulate training time
model = {
"weights": [sum(simulation_results["output"]) * 0.1],
"trained_on": simulation_results["input"]
}
return model
@acl.active_learn_task(as_executable=False)
async def run_active_learning(model: dict) -> dict:
"""Perform a dummy active learning step with the trained model."""
await asyncio.sleep(1) # Simulate active learning
selected_samples = [random.randint(0, 100) for _ in range(3)]
return {
"model_weights": model["weights"],
"new_samples": selected_samples
}
Optionally, you can specify a metric to monitor and act as a condition to terminate once your results reach the specified value:
Tip
Specifying both @acl.as_stop_criterion
and max_iter
will cause ROSE to follow whichever constraint is satisfied first. Specifying neither will cause an error and eventually a failure to your workflow.
Note
ROSE supports custom/user-defined metrics in addition to a wide range of standard metrics. For a list of standard metrics and how to define a custom metrics, please refer to the following link: Standard Metrics.
# Defining the stop criterion with a metric (MSE in this case)
@acl.as_stop_criterion(metric_name=MEAN_SQUARED_ERROR_MSE, threshold=0.1)
async def check_mse(*args):
return f'python3 check_mse.py'
Warning
For any metric function like @acl.as_stop_criterion
the invoked script like check_mse.py
must return a numerical value.
Finally invoke the tasks and register them with the active learner as a workflow.
Note
In the Sequential Learner, the invocation order of the tasks is predefined order of tasks as follows: simulation
--> training
--> active_learn
.
# Start the teaching loop and break if max_iter = 10 or stop condition is met
await acl.teach(max_iter=10)
await asyncflow.shutdown()