Skip to content

Commit

Permalink
run gaia tasks in separate processes with joblib
Browse files Browse the repository at this point in the history
  • Loading branch information
ollmer committed Jan 8, 2025
1 parent 7fccda1 commit baba708
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 34 deletions.
60 changes: 27 additions & 33 deletions examples/gaia_agent/scripts/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import hydra
from hydra.utils import instantiate
from joblib import Parallel, delayed
from omegaconf import DictConfig

from tapeagents.config import is_debug_mode
from tapeagents.io import save_json_tape, save_tape_images
from tapeagents.llms import TrainableLLM
from tapeagents.parallel_processing import choose_processor
from tapeagents.tools.container_executor import maybe_get_code_sandbox
from tapeagents.tools.container_executor import ContainerExecutor, maybe_get_code_sandbox

from ..agent import GaiaAgent
from ..environment import get_env
Expand All @@ -34,33 +34,19 @@ def main(cfg: DictConfig) -> None:
the separate files per level. If needed continue solving the unsolved tasks in the
next run.
"""
os.environ["TAPEAGENTS_SQLITE_DB"] = os.path.join(cfg.exp_path, "tapedata.sqlite")
llm: TrainableLLM = instantiate(cfg.llm)
code_sandbox = maybe_get_code_sandbox(cfg.exp_path)
agent = GaiaAgent.create(llm, **cfg.agent)
tasks = load_dataset(cfg.split)
tapes_dir = os.path.join(cfg.exp_path, "tapes")
validate_config(cfg, llm, tapes_dir)
images_dir = os.path.join(cfg.exp_path, "images")
os.makedirs(images_dir, exist_ok=True)

dt = time.perf_counter()
n_workers = cfg.batch or 0
processor = choose_processor(n_workers)
args = [
(agent, llm, cfg.env, code_sandbox, task, cfg.exp_path, i, level)
tapes_dir = os.path.join(cfg.exp_path, "tapes")
tasks = [
delayed(task_worker)(cfg, level, task_num, code_sandbox)
for level, level_tasks in tasks.items()
for i, task in enumerate(level_tasks)
if not task_already_solved(i, level, tapes_dir)
for task_num, _ in enumerate(level_tasks)
if not task_already_solved(task_num, level, tapes_dir)
]
if cfg.get("n_tasks"):
args = args[: cfg.n_tasks] # run only the first n_tasks
logger.info(f"Evaluate {len(args)} unsolved tasks using {n_workers} workers")
for tape_ready in processor(args, task_worker):
if isinstance(tape_ready, Exception):
raise tape_ready
if is_debug_mode():
break
logger.info(f"Evaluate {len(tasks)} unsolved tasks using {n_workers} workers")
Parallel(n_jobs=n_workers, prefer="processes")(tasks)
dt = time.perf_counter() - dt
logger.info(f"Done, elapsed time: {dt:.2f} sec")

Expand All @@ -86,19 +72,27 @@ def task_already_solved(i: int, level: int, tapes_dir: str) -> bool:
return os.path.exists(tape_path) and result not in ["", None, "None"]


def task_worker(args: tuple) -> int:
agent, llm, cfg_env, code_sandbox, task, exp_path, i, level = args
tapes_dir = os.path.join(exp_path, "tapes")
images_dir = os.path.join(exp_path, "images")
tape_name = f"l{level}_task{i:03d}"
env = get_env(exp_path, code_sandbox=code_sandbox, **cfg_env)
def task_worker(cfg: DictConfig, level: int, task_num: int, code_sandbox: ContainerExecutor | None):
tasks = load_dataset(cfg.split)
task = tasks[level][task_num]
llm: TrainableLLM = instantiate(cfg.llm)
agent = GaiaAgent.create(llm, **cfg.agent)
os.environ["TAPEAGENTS_SQLITE_DB"] = os.path.join(cfg.exp_path, "tapedata.sqlite")
tapes_dir = os.path.join(cfg.exp_path, "tapes")
validate_config(cfg, llm, tapes_dir)
images_dir = os.path.join(cfg.exp_path, "images")
os.makedirs(images_dir, exist_ok=True)

tapes_dir = os.path.join(cfg.exp_path, "tapes")
images_dir = os.path.join(cfg.exp_path, "images")
task_name = f"l{level}_task{task_num:03d}"
env = get_env(cfg.exp_path, code_sandbox=code_sandbox, **cfg.env)

for tape in solve_task(task, agent, env, level):
save_json_tape(tape, tapes_dir, tape_name)
save_tape_images(tape, images_dir)
logger.info(f"Task {tape_name} solved, saved to {tapes_dir}")
save_json_tape(tape, tapes_dir, task_name)
save_tape_images(tape, images_dir)
logger.info(f"Task {task_name} solved, saved to {tapes_dir}")
env.close()
return 1


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ gradio==5.5.0
hydra-core==1.3.2
ipykernel==6.29.5
ipywidgets==8.1.5
joblib==1.3.2
jsonref==1.1.0
fastapi==0.115.5
langchain-community==0.0.38
Expand Down
2 changes: 1 addition & 1 deletion tapeagents/tools/container_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def silence_pip(code: str, lang: str) -> str:
return "\n".join(lines)


def maybe_get_code_sandbox(exp_path: str):
def maybe_get_code_sandbox(exp_path: str) -> ContainerExecutor | None:
code_path = os.path.join(exp_path, "code")
os.makedirs(code_path, exist_ok=True)
try:
Expand Down

0 comments on commit baba708

Please sign in to comment.