Skip to content

configurable rollouts actor #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ cd pipelinerl
Create the environments with dependencies.
```bash
conda create -n pipeline-rl -y python=3.11
conda run --no-capture-output -n pipeline-rl pip install torch==2.6.0 --index-url https://download.pytorch.org/whl/cu121
conda run --no-capture-output -n pipeline-rl pip install -r requirements.txt --no-build-isolation
conda run --no-capture-output -n pipeline-rl pip install torch==2.6.0
conda run --no-capture-output -n pipeline-rl pip install -e . --no-build-isolation
```

By default Pipeline-RL will use the file system as the medium for streaming the generated data to the trainer processes. This works on one node, but the files can get quite large. To use Redis instead you will need to install the Redis server in the same conda environment:
Expand Down
8 changes: 8 additions & 0 deletions conf/actor/math.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
log_each_n_secs: 10
llm_max_rollouts: 128
rollout_workers: 1
rollout_policy: pipelinerl.math.rollouts.generate_math_rollout
discount_factor: 1
system_prompt: Please reason step by step, and put your final answer within \boxed{}.
task_template: |-
{task}
109 changes: 109 additions & 0 deletions conf/actor/web.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
log_each_n_secs: 10
llm_max_rollouts: 128
rollout_workers: 1
rollout_policy: pipelinerl.tapeagents_rollouts.generate_rollout

environment:
_target_: tapeagents.mcp.MCPEnvironment
config_path: conf/mcp/web.json

llm:
_target_: tapeagents.llms.LiteLLM
model_name: o4-mini-2025-04-16
use_cache: true
context_size: 200000
parameters:
temperature: 1
max_completion_tokens: 16000

agent:
_target_: tapeagents.agent.Agent
name : web_agent
llms:
default: ${llm}
templates:
system_prompt: |
You are an expert AI Agent trained to assist users with complex information processing tasks.
Your role is to understand user queries and respond in a helpful and accurate manner.
Keep your replies concise and direct. Prioritize clarity and avoid over-elaboration.
Do not express emotions or opinions about user questions.
allowed_tools: |
You have access to the following tools:
{tools_description}
thought_format: |
Important! Respond with the plain text, do not include any JSON or code.
Do not output anything besides what I asked in this message.
allowed_steps: |
You have access to the following tools:
{tools_description}
You are allowed to produce ONLY steps with the following JSON schemas:
{allowed_steps}
Do not reproduce the schema when producing steps; use it as a reference.
format: >
Output only a single JSON dict or a single JSON list.
DO NOT OUTPUT ANYTHING BESIDES THE JSON! DO NOT PLACE ANY COMMENTS INSIDE THE JSON.
It will break the system that processes the output.

nodes:
- _target_: tapeagents.nodes.StandardNode
name: plan
system_prompt: ${agent.templates.system_prompt}
guidance: |
Write a concise multi-step plan explaining which steps should be performed to find the answer for the given task.
Be specific about how each step should be performed. Only describe the intended actions here, do not perform them yet.
Consider that next steps may depend on results of previous steps, so include conditional branching using "if" statements where needed.
Start with the title "Plan". Every step should have short name and description.
${agent.templates.thought_format}
steps_prompt: ${agent.templates.allowed_tools}

- _target_: tapeagents.nodes.StandardNode
name: reflect
system_prompt: ${agent.templates.system_prompt}
guidance: |
Observe the current state of the task and produce the reflection text strictly following these rules:
1. Evaluate the action's success, explain its impact on the task and our plan,
2. If the last action was not successful, describe errors and the possible reasons for failure.
3. List the next steps to accomplish the current plan step and propose single next immediate action.
4. When proposing webpage interactions:
- Always accept cookie and close popups first before interacting
- If the last action was not successful, check if the target element is visible and use scrolling if it is not.
5. Describe the expected effect of the proposed action.
${agent.templates.thought_format}
steps_prompt: ${agent.templates.allowed_tools}

- _target_: tapeagents.nodes.StandardNode
name: act
system_prompt: ${agent.templates.system_prompt}
guidance: Then produce single function call for the next step. If the answer is ready, call FinalStep function.
steps:
- tapeagents.steps.ReasoningThought
- tapeagents.core.FinalStep
use_known_actions: true
use_function_calls: true
next_node: act

- _target_: tapeagents.nodes.StandardNode
name: summarize
system_prompt: ${agent.templates.system_prompt}
guidance: |
Summarize last observation. If its an image, thoroughly describe it with all details.
Describe the results of the last action and observed changes. Discuss its impact on the task and our plan.
Do not hallucinate or make up any information, only describe what you see in the observation.
Do not guess or assume action effects, describe only visible changes.
${agent.templates.thought_format}
steps_prompt: ${agent.templates.allowed_tools}
next_node: reflect

split: validation
batch: 2
retry_unsolved: true

only_tasks: #[] # list of (level, task_num)
- [1, 0]
- [1, 1]
- [1, 2]
- [1, 3]
- [1, 4]
- [1, 5]
- [1, 6]
- [1, 7]
10 changes: 1 addition & 9 deletions conf/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defaults:
- finetune: base
- rewards: pure_success
- streams: files
- actor: math
- _self_

finetune:
Expand Down Expand Up @@ -37,9 +38,6 @@ finetune:
max_lag: ${..max_lag}
weight_update_interval: 1
pop_old_data: ${..pop_old_data}
actor:
llm_max_rollouts: 128
rollout_workers: 1
verifier:
host: localhost
port: 7777
Expand Down Expand Up @@ -90,11 +88,6 @@ world:

actor_group_port: 9000

# changed
system_prompt: Please reason step by step, and put your final answer within \boxed{}.
task_template: |-
{task}

eval_every_n_versions: 78000

# changed
Expand All @@ -115,7 +108,6 @@ force_restart: false
pop_old_data: true
max_lag: null
attempts: 8
discount_factor: 1
train_dataset_names:
- open_reasoner_zero_57k
- open_reasoner_zero_extended_72k
Expand Down
27 changes: 27 additions & 0 deletions conf/debug.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defaults:
- base
- override streams: redis
- _self_

finetune:
seq_length: 5000
gradient_accumulation_passes: 1024

llm:
parameters:
max_tokens: 4096

test_llm:
parameters:
max_tokens: 4096

# debug:
# mode: open_loop

output_dir: results/debug_4gpu_7b/${now:%Y_%m_%d}/${now:start_at_%H_%M_%S}

# model_path: Qwen/Qwen2.5-0.5B

# vllm_config:
# vllm_kwargs:
# enforce_eager: ""
23 changes: 23 additions & 0 deletions conf/mcp/web.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"mcpServers": {
"serper-search": {
"command": "uv",
"args": ["run", "tapeagents/tools/mcp_servers/web_search.py"],
"env": {"SERPER_API_KEY": ""}
},
"fetch": {
"command": "uvx",
"args": [
"mcp-server-fetch"
]
},
"python_exec": {
"command": "npx",
"args": [
"-y",
"@pydantic/mcp-run-python",
"stdio"
]
}
}
}
13 changes: 13 additions & 0 deletions pipelinerl/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import hydra
from omegaconf import DictConfig

from pipelinerl.launch import main as launch_main


@hydra.main(config_path="../conf/", config_name="debug", version_base="1.3.2")
def main(cfg: DictConfig):
launch_main(cfg)


if __name__ == "__main__":
main()
17 changes: 17 additions & 0 deletions pipelinerl/debug_rollout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import hydra
from omegaconf import DictConfig

from pipelinerl.tapeagents_rollouts import generate_rollout


@hydra.main(config_path="../conf/", config_name="debug", version_base="1.3.2")
def main(cfg: DictConfig):
llm = None
problem = None
session = None
result = generate_rollout(cfg, llm, problem, session)
print(result)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion pipelinerl/entrypoints/verifier.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hydra
from omegaconf import DictConfig

from pipelinerl.verifier_api import run_verifier
from pipelinerl.math.verifier_api import run_verifier
from pipelinerl.utils import better_crashing


Expand Down
36 changes: 27 additions & 9 deletions pipelinerl/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def run_ref_llm(cfg: DictConfig, preprocessor_llm_idx: int, local_idx: int, gpus
kwargs = cfg.vllm_config.vllm_kwargs
if kwargs["num-scheduler-steps"] > 1:
kwargs["num-scheduler-steps"] = 1
logger.warning(f"Set num-scheduler-steps to 1 for reference vLLM")
logger.warning("Set num-scheduler-steps to 1 for reference vLLM")
log_dir = exp_dir / f"ref_vllm_{preprocessor_llm_idx}"
os.makedirs(log_dir, exist_ok=True)

Expand All @@ -81,8 +81,8 @@ def run_ref_llm(cfg: DictConfig, preprocessor_llm_idx: int, local_idx: int, gpus

gpu_str = ",".join([str(gpu) for gpu in gpus])
logger.info(f"Running reference LLM with command: {' '.join(cmd)} with gpus: {gpu_str}")
log_file_path = os.path.join(log_dir, f"stdout.log")
err_file_path = os.path.join(log_dir, f"stderr.log")
log_file_path = os.path.join(log_dir, "stdout.log")
err_file_path = os.path.join(log_dir, "stderr.log")
with open(log_file_path, "a") as log_file, open(err_file_path, "a") as err_file:
yield _popen(
cmd,
Expand Down Expand Up @@ -138,8 +138,9 @@ def run_actor_llm(

gpu_str = ",".join([str(gpu) for gpu in gpus])
logger.info(f"Running actor_llm with command: {' '.join(cmd)} on gpus: {gpu_str}")
log_file_path = os.path.join(log_dir, f"stdout.log")
err_file_path = os.path.join(log_dir, f"stderr.log")
save_command(log_dir, cmd)
log_file_path = os.path.join(log_dir, "stdout.log")
err_file_path = os.path.join(log_dir, "stderr.log")
with open(log_file_path, "a") as log_file, open(err_file_path, "a") as err_file:
yield _popen(
cmd,
Expand All @@ -166,6 +167,7 @@ def run_actor(world_map: WorldMap, actor_idx: int, exp_dir: Path):
f"+me.llm_urls={llm_urls}",
]
logger.info(f"Running actor with command: {' '.join(cmd)}")
save_command(exp_dir / "actor", cmd)
yield _popen(
cmd,
env=dict(os.environ),
Expand All @@ -186,10 +188,11 @@ def run_verifier(cfg: DictConfig):
f"hydra.run.dir={cfg.output_dir}/verifier",
]
logger.info(f"Running verifier with command: {' '.join(cmd)}")
save_command(Path(cfg.output_dir) / "verifier", cmd)
log_dir = os.path.join(cfg.output_dir, "verifier")
os.makedirs(log_dir, exist_ok=True)
log_file_path = os.path.join(log_dir, f"stdout.log")
err_file_path = os.path.join(log_dir, f"stderr.log")
log_file_path = os.path.join(log_dir, "stdout.log")
err_file_path = os.path.join(log_dir, "stderr.log")
with open(log_file_path, "a") as log_file, open(err_file_path, "a") as err_file:
yield _popen(
cmd,
Expand Down Expand Up @@ -285,6 +288,7 @@ def run_finetune(cfg: DictConfig, world_map: WorldMap, gpus: list[int], exp_dir:
cmd.append("finetune.send_weight_updates=False")

logger.info(f"Running finetune with command: {' '.join(cmd)}")
save_command(exp_dir / "finetune", cmd)
env = dict(os.environ)
env["DS_ENV_FILE"] = str(exp_dir / ".deepspeed_env")
yield _popen(cmd, env=env)
Expand All @@ -307,6 +311,7 @@ def run_preprocess(world_map: WorldMap, preprocessor_idx: int, exp_dir: Path):
f"+me.llm_urls={llm_urls}",
]
logger.info(f"Running preprocess with command: {' '.join(cmd)}")
save_command(exp_dir / "preprocess", cmd)
yield _popen(
cmd,
env=dict(os.environ),
Expand All @@ -329,9 +334,22 @@ def run_redis(cfg: DictConfig):
cfg.streams.save,
]
logger.info(f"Running redis with command: {' '.join(cmd)}")
save_command(Path(cfg.output_dir) / "redis", cmd)
yield _popen(cmd, env=dict(os.environ))


def save_command(script_dir: Path, cmd):
os.makedirs(script_dir, exist_ok=True)
script_path = script_dir / "start.sh"
with open(script_path, "w") as f:
f.write("#!/bin/bash\n")
# Properly quote arguments for the shell script
quoted_cmd = [f"'{arg}'" if " " in arg or "$" in arg else arg for arg in cmd]
f.write(" ".join(quoted_cmd) + "\n")
os.chmod(script_path, 0o755)
logger.info(f"Saved start script to {script_path}")


def clean_up(exp_dir, force_restart):
logger.info("Cleaning up streams directory")
if os.path.exists(f"{exp_dir}/streams"):
Expand Down Expand Up @@ -386,7 +404,7 @@ def gently_stop_all_processes():
gently_stop_all_processes()
sys.exit(1)
# TODO: make the watcdog code below more stable
# if (trainer_state is not Noneq
# if (trainer_state is not None
# and (version := trainer_state.propagated_weight_version is not None)
# and version > last_trainer_version):
# last_trainer_version = version
Expand Down Expand Up @@ -493,7 +511,7 @@ def main(cfg: DictConfig):
clean_up(exp_dir, cfg.force_restart)
os.makedirs(config_dir, exist_ok=True)
OmegaConf.save(cfg, config_dir / "exp_config.yaml")
logger.info(f"Orchestrator 0 created the exp folder")
logger.info("Orchestrator 0 created the exp folder")
if cfg.streams.backend == "redis":
processes.extend(run_redis(cfg))
redis = connect_to_redis(cfg.streams)
Expand Down
Loading