Skip to content

Commit

Permalink
joblib parallel processes working
Browse files Browse the repository at this point in the history
  • Loading branch information
ollmer committed Jan 9, 2025
1 parent baba708 commit d827d23
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
15 changes: 11 additions & 4 deletions conf/gaia_openai.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ defaults:
- llm: gpt4o_mini
- _self_

exp_name: gpt4o_mini_val_gym1
exp_name: gpt4o_mini_val_joblib1

exp_path: outputs/gaia/runs/${exp_name}
split: validation
batch: 32
batch: 8

level: 1
task: 0
only_tasks: # list of (level, task_num)
- [2, 0]
- [2, 1]
- [2, 2]
- [2, 3]
- [2, 4]
- [2, 5]
- [2, 6]
- [2, 7]

agent:
plain_code: false
Expand Down
30 changes: 20 additions & 10 deletions examples/gaia_agent/scripts/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,23 @@ def main(cfg: DictConfig) -> None:
the separate files per level. If needed continue solving the unsolved tasks in the
next run.
"""
code_sandbox = maybe_get_code_sandbox(cfg.exp_path)
tasks = load_dataset(cfg.split)
dt = time.perf_counter()
n_workers = cfg.batch or 0
n_workers = cfg.batch or 1
tapes_dir = os.path.join(cfg.exp_path, "tapes")
tasks = [
delayed(task_worker)(cfg, level, task_num, code_sandbox)
for level, level_tasks in tasks.items()
for task_num, _ in enumerate(level_tasks)
if not task_already_solved(task_num, level, tapes_dir)
]
if cfg.only_tasks:
tasks = cfg.only_tasks
else:
tasks = [
(level, task_num)
for level, level_tasks in tasks.items()
for task_num, _ in enumerate(level_tasks)
if not task_already_solved(task_num, level, tapes_dir)
]
logger.info(f"Evaluate {len(tasks)} unsolved tasks using {n_workers} workers")
Parallel(n_jobs=n_workers, prefer="processes")(tasks)
Parallel(n_jobs=n_workers, prefer="processes")(
[delayed(task_worker)(cfg, level, task_num) for level, task_num in tasks]
)
dt = time.perf_counter() - dt
logger.info(f"Done, elapsed time: {dt:.2f} sec")

Expand All @@ -72,7 +76,7 @@ def task_already_solved(i: int, level: int, tapes_dir: str) -> bool:
return os.path.exists(tape_path) and result not in ["", None, "None"]


def task_worker(cfg: DictConfig, level: int, task_num: int, code_sandbox: ContainerExecutor | None):
def task_worker(cfg: DictConfig, level: int, task_num: int):
tasks = load_dataset(cfg.split)
task = tasks[level][task_num]
llm: TrainableLLM = instantiate(cfg.llm)
Expand All @@ -86,13 +90,19 @@ def task_worker(cfg: DictConfig, level: int, task_num: int, code_sandbox: Contai
tapes_dir = os.path.join(cfg.exp_path, "tapes")
images_dir = os.path.join(cfg.exp_path, "images")
task_name = f"l{level}_task{task_num:03d}"
code_sandbox = maybe_get_code_sandbox(cfg.exp_path)
env = get_env(cfg.exp_path, code_sandbox=code_sandbox, **cfg.env)

for tape in solve_task(task, agent, env, level):
save_json_tape(tape, tapes_dir, task_name)
save_tape_images(tape, images_dir)
logger.info(f"Task {task_name} solved, saved to {tapes_dir}")
env.close()
if code_sandbox:
try:
code_sandbox.stop()
except Exception:
pass


if __name__ == "__main__":
Expand Down

0 comments on commit d827d23

Please sign in to comment.