diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dea4142eb..35f778a37 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -88,6 +88,7 @@ repos: --disable=W1113, --disable=W0221, --disable=R0401, + --disable=W0632, ] - repo: https://github.com/regebro/pyroma rev: "4.0" diff --git a/docs/sphinx_doc/source/agentscope.agents.rst b/docs/sphinx_doc/source/agentscope.agents.rst index 0b6520701..aa529a12d 100644 --- a/docs/sphinx_doc/source/agentscope.agents.rst +++ b/docs/sphinx_doc/source/agentscope.agents.rst @@ -48,11 +48,3 @@ dict_dialog_agent module :members: :undoc-members: :show-inheritance: - -rpc_dialog_agent module -------------------------------- - -.. automodule:: agentscope.agents.dict_dialog_agent - :members: - :undoc-members: - :show-inheritance: \ No newline at end of file diff --git a/docs/sphinx_doc/source/tutorial/201-agent.md b/docs/sphinx_doc/source/tutorial/201-agent.md index e498f18d1..b7e648eba 100644 --- a/docs/sphinx_doc/source/tutorial/201-agent.md +++ b/docs/sphinx_doc/source/tutorial/201-agent.md @@ -66,7 +66,6 @@ Below is a table summarizing the functionality of some of the key agents availab | Agent Type | Description | Typical Use Cases | | -------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | `AgentBase` | Serves as the superclass for all agents, providing essential attributes and methods. | The foundation for building any custom agent. | -| `RpcAgentBase` | Executes remote procedure calls in distributed mode. | The foundation for building any custom agent in distributed mode. | | `DialogAgent` | Manages dialogues by understanding context and generating coherent responses. | Customer service bots, virtual assistants. | | `UserAgent` | Interacts with the user to collect input, generating messages that may include URLs or additional specifics based on required keys. | Collecting user input for agents | | *More to Come* | AgentScope is continuously expanding its pool with more specialized agents for diverse applications. | | diff --git a/docs/sphinx_doc/source/tutorial/208-distribute.md b/docs/sphinx_doc/source/tutorial/208-distribute.md index d96a95d7d..22e0f8d63 100644 --- a/docs/sphinx_doc/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/source/tutorial/208-distribute.md @@ -45,25 +45,25 @@ A-->C Please follow the steps below to deploy your application distributedly. -### Write your agents +### Convert your agents -To use distributed functionality, your agent class needs to inherit from `RpcAgentBase`. -`RpcAgentBase` requires several additional parameters compared to `AgentBase` during initialization. +`AgentBase` provided the `to_dist` method to convert the agent into a distributed version. +`to_dist` requires several parameters. - `host`: the hostname or IP address of the machine where the agent runs, defaults to `localhost`. - `port`: the port of this agent's RPC server, defaults to `80`. - `launch_server`: whether to launch an RPC server locally, defaults to `True`. - `local_mode`: set to `True` if all agents run on the same machine, defaults to `True`. +- `lazy_launch`: if set to `True`, only launch the server when the agent is called. -But don't worry, `RpcAgentBase` shares the same interface as `AgentBase`, you only need to implement the `reply` method. You can even copy the `reply` method of an `AgentBase` subclass and use it in `RpcAgentBase`. +> The `to_dist` method is implemented based on [gRPC](https://grpc.io/). When 'launch_server' is set to `True`, it will start a gRPC server process, and the original agent will be transferred to the new process to run. ### Run in multi-process mode AgentScope supports deployment in multi-process mode, where each agent is a sub-process of the application's main process, and all agents run on the same machine. -Its usage is very similar to single-process mode. The only difference lies in the initialization phase. +The usage is exactly the same as single process mode, and you only need to call the `to_dist` method after initialization. -Suppose you have classes `A` and `B`, both of which inherit from `RpcAgentBase`. -You can run the application in multi-process mode by passing in an additional parameter `port`, the other parts are completely the same as the single-process mode. +Suppose you have classes `A` and `B`, both of which inherit from `AgentBase`. ```python # import packages @@ -71,13 +71,11 @@ You can run the application in multi-process mode by passing in an additional pa a = A( name="A", ..., - port=12001, # port is required, other fields like host, launch_server and local_mode use the default value -) +).to_dist() b = B( name="B", ..., - port=12002, # port is required -) +).to_dist() x = None while x is None or x.content != 'exit': @@ -93,11 +91,13 @@ AgentScope also supports to run agents on multiple machines. In this case, you n # import packages server_a = RpcAgentServerLauncher( - name="A", - ..., + agent_class=A, + agent_kwargs={ + "name": "A" + ... + }, host=ip_a, port=12001, - local_mode=False, ) server_a.launch() server_a.wait_until_terminate() @@ -110,11 +110,13 @@ Please make sure that the two machines can access each other using the IP addres # import packages server_b = RpcAgentServerLauncher( - name="B", - ..., + agent_class=B, + agent_kwargs={ + "name": "B", + ... + }, host=ip_b, port=12001, - local_mode=False, ) server_b.launch() server_b.wait_until_terminate() @@ -127,14 +129,16 @@ Then, you can run the application's main process on any machine that can access a = A( name="A", - ..., + ... +).to_dist( host=ip_a, port=12001, launch_server=False, ) b = B( name="B", - ..., + ... +).to_dist( host=ip_b, port=12002, launch_server=False, diff --git a/examples/distributed/configs/debate_agent_configs.json b/examples/distributed/configs/debate_agent_configs.json index 658c6cce4..dec5af779 100644 --- a/examples/distributed/configs/debate_agent_configs.json +++ b/examples/distributed/configs/debate_agent_configs.json @@ -1,20 +1,29 @@ -{ - "pro": { - "name": "Pro", - "sys_prompt": "Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.", - "model": "gpt-3.5-turbo", - "use_memory": true +[ + { + "class": "DictDialogAgent", + "args": { + "name": "Pro", + "sys_prompt": "Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.", + "model": "gpt-3.5-turbo", + "use_memory": true + } }, - "con": { - "name": "Con", - "sys_prompt": "Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.", - "model": "gpt-3.5-turbo", - "use_memory": true + { + "class": "DictDialogAgent", + "args": { + "name": "Con", + "sys_prompt": "Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.", + "model": "gpt-3.5-turbo", + "use_memory": true + } }, - "judge": { - "name": "Judge", - "sys_prompt": "Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.", - "model": "gpt-3.5-turbo", - "use_memory": true + { + "class": "DictDialogAgent", + "args": { + "name": "Judge", + "sys_prompt": "Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.", + "model": "gpt-3.5-turbo", + "use_memory": true + } } -} \ No newline at end of file +] \ No newline at end of file diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index f07e620fa..669b05304 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -6,7 +6,7 @@ import agentscope from agentscope.msghub import msghub -from agentscope.agents.rpc_dialog_agent import RpcDialogAgent +from agentscope.agents.dialog_agent import DialogAgent from agentscope.agents.rpc_agent import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.utils.logging_utils import logger @@ -65,15 +65,19 @@ def setup_server(parsed_args: argparse.Namespace) -> None: encoding="utf-8", ) as f: configs = json.load(f) + configs = { + "pro": configs[0]["args"], + "con": configs[1]["args"], + "judge": configs[2]["args"], + } config = configs[parsed_args.role] host = getattr(parsed_args, f"{parsed_args.role}_host") port = getattr(parsed_args, f"{parsed_args.role}_port") server_launcher = RpcAgentServerLauncher( + agent_class=DialogAgent, + agent_kwargs=config, host=host, port=port, - local_mode=False, - agent_class=RpcDialogAgent, - **config, ) server_launcher.launch() server_launcher.wait_until_terminate() @@ -81,23 +85,21 @@ def setup_server(parsed_args: argparse.Namespace) -> None: def run_main_process(parsed_args: argparse.Namespace) -> None: """Setup the main debate competition process""" - agentscope.init( + pro_agent, con_agent, judge_agent = agentscope.init( model_configs="configs/model_configs.json", + agent_configs="configs/debate_agent_configs.json", ) - pro_agent = RpcDialogAgent( - name="Pro", + pro_agent = pro_agent.to_dist( host=parsed_args.pro_host, port=parsed_args.pro_port, launch_server=False, ) - con_agent = RpcDialogAgent( - name="Con", + con_agent = con_agent.to_dist( host=parsed_args.con_host, port=parsed_args.con_port, launch_server=False, ) - judge_agent = RpcDialogAgent( - name="Judge", + judge_agent = judge_agent.to_dist( host=parsed_args.judge_host, port=parsed_args.judge_port, launch_server=False, @@ -108,13 +110,13 @@ def run_main_process(parsed_args: argparse.Namespace) -> None: with msghub(participants=participants, announcement=hint): for _ in range(3): pro_resp = pro_agent(x) - logger.chat(pro_resp.update_value()) + logger.chat(pro_resp) con_resp = con_agent(pro_resp) - logger.chat(con_resp.update_value()) + logger.chat(con_resp) x = judge_agent(con_resp) - logger.chat(x.update_value()) + logger.chat(x) x = judge_agent(x) - logger.chat(x.update_value()) + logger.chat(x) if __name__ == "__main__": diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index 83e7b7fd1..0a6f30e51 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -2,12 +2,11 @@ """ An example of distributed dialog """ import argparse -import time from loguru import logger import agentscope from agentscope.agents.user_agent import UserAgent -from agentscope.agents.rpc_dialog_agent import RpcDialogAgent +from agentscope.agents.dialog_agent import DialogAgent from agentscope.agents.rpc_agent import RpcAgentServerLauncher @@ -38,14 +37,15 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: model_configs="configs/model_configs.json", ) assistant_server_launcher = RpcAgentServerLauncher( - name="Assitant", - agent_class=RpcDialogAgent, + agent_class=DialogAgent, + agent_kwargs={ + "name": "Assitant", + "sys_prompt": "You are a helpful assistant.", + "model": "gpt-3.5-turbo", + "use_memory": True, + }, host=assistant_host, port=assistant_port, - sys_prompt="You are a helpful assistant.", - model="gpt-3.5-turbo", - use_memory=True, - local_mode=False, ) assistant_server_launcher.launch() assistant_server_launcher.wait_until_terminate() @@ -56,8 +56,12 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: agentscope.init( model_configs="configs/model_configs.json", ) - assistant_agent = RpcDialogAgent( + assistant_agent = DialogAgent( name="Assistant", + sys_prompt="You are a helpful assistant.", + model="gpt-3.5-turbo", + use_memory=True, + ).to_dist( host=assistant_host, port=assistant_port, launch_server=False, @@ -73,8 +77,7 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: msg = user_agent() while not msg.content.endswith("exit"): msg = assistant_agent(msg) - logger.chat(msg.update_value()) - time.sleep(0.5) + logger.chat(msg) msg = user_agent(msg) diff --git a/notebook/distributed_debate.ipynb b/notebook/distributed_debate.ipynb index 08ded268f..10a532515 100644 --- a/notebook/distributed_debate.ipynb +++ b/notebook/distributed_debate.ipynb @@ -84,31 +84,28 @@ "outputs": [], "source": [ "import agentscope\n", - "from agentscope.agents.rpc_dialog_agent import RpcDialogAgent\n", + "from agentscope.agents.dialog_agent import DialogAgent\n", "\n", "agentscope.init(model_configs=model_configs)\n", "\n", - "pro_agent = RpcDialogAgent(\n", + "pro_agent = DialogAgent(\n", " name=\"Pro\",\n", - " port=12001,\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.\",\n", - ")\n", - "con_agent = RpcDialogAgent(\n", + ").to_dist()\n", + "con_agent = DialogAgent(\n", " name=\"Con\",\n", - " port=12002,\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.\",\n", - ")\n", - "judge_agent = RpcDialogAgent(\n", + ").to_dist()\n", + "judge_agent = DialogAgent(\n", " name=\"Judge\",\n", - " port=12003,\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.\"\n", - ")" + ").to_dist()" ] }, { @@ -153,13 +150,13 @@ "with msghub(participants=participants, announcement=hint):\n", " for _ in range(3):\n", " pro_resp = pro_agent(x)\n", - " logger.chat(pro_resp.update_value())\n", + " logger.chat(pro_resp)\n", " con_resp = con_agent(pro_resp)\n", - " logger.chat(con_resp.update_value())\n", + " logger.chat(con_resp)\n", " x = judge_agent(con_resp)\n", - " logger.chat(x.update_value())\n", + " logger.chat(x)\n", " x = judge_agent(x)\n", - " logger.chat(x.update_value())\n" + " logger.chat(x)\n" ] }, { diff --git a/notebook/distributed_dialog.ipynb b/notebook/distributed_dialog.ipynb index 83a443311..07a6425fe 100644 --- a/notebook/distributed_dialog.ipynb +++ b/notebook/distributed_dialog.ipynb @@ -80,19 +80,18 @@ "source": [ "import agentscope\n", "from agentscope.agents.user_agent import UserAgent\n", - "from agentscope.agents.rpc_dialog_agent import RpcDialogAgent\n", + "from agentscope.agents.dialog_agent import DialogAgent\n", "\n", "agentscope.init(\n", " model_configs=model_configs\n", ")\n", "\n", - "assistant_agent = RpcDialogAgent(\n", + "assistant_agent = DialogAgent(\n", " name=\"Assistant\",\n", - " port=12010,\n", " sys_prompt=\"You are a helpful assistant.\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", - ")\n", + ").to_dist()\n", "user_agent = UserAgent(\n", " name=\"User\",\n", ")" @@ -119,8 +118,7 @@ "msg = user_agent()\n", "while not msg.content.endswith(\"exit\"):\n", " msg = assistant_agent(msg)\n", - " logger.chat(msg.update_value())\n", - " time.sleep(0.5)\n", + " logger.chat(msg)\n", " msg = user_agent(msg)" ] } diff --git a/src/agentscope/_init.py b/src/agentscope/_init.py index 6f75004b5..af8b0b597 100644 --- a/src/agentscope/_init.py +++ b/src/agentscope/_init.py @@ -4,7 +4,6 @@ import os import shutil from typing import Optional, Union, Sequence - from agentscope import agents from .agents import AgentBase from ._runtime import Runtime @@ -15,7 +14,7 @@ from .constants import _DEFAULT_DIR from .constants import _DEFAULT_LOG_LEVEL - +# init setting _INIT_SETTINGS = {} @@ -60,19 +59,25 @@ def init( cover the required arguments to initialize a specific agent object, otherwise the default values will be used. """ - - # TODO: add support to set quota for monitor - - # Load model configs if needed - if model_configs is not None: - read_model_configs(model_configs) - - # Init the runtime - Runtime.project = project - Runtime.name = name - - # Init file manager - file_manager.init(save_dir, save_api_invoke) + init_process( + model_configs=model_configs, + project=project, + name=name, + save_dir=save_dir, + save_api_invoke=save_api_invoke, + save_log=save_log, + logger_level=logger_level, + ) + + # save init settings for subprocess + _INIT_SETTINGS["model_configs"] = model_configs + _INIT_SETTINGS["project"] = project + _INIT_SETTINGS["name"] = name + _INIT_SETTINGS["runtime_id"] = Runtime.runtime_id + _INIT_SETTINGS["save_dir"] = save_dir + _INIT_SETTINGS["save_api_invoke"] = save_api_invoke + _INIT_SETTINGS["save_log"] = save_log + _INIT_SETTINGS["logger_level"] = logger_level # Save code if needed if save_code: @@ -83,13 +88,6 @@ def init( file_abs = os.path.join(cur_dir, filename) shutil.copy(file_abs, str(file_manager.dir_code)) - # Set logger and level - dir_log = str(file_manager.dir_log) if save_log else None - setup_logger(dir_log, logger_level) - - # Set monitor - _ = MonitorFactory.get_monitor(db_path=file_manager.path_db) - # Load config and init agent by configs if agent_configs is not None: if isinstance(agent_configs, str): @@ -108,16 +106,58 @@ def init( agent = agent_cls(**agent_args) agent_objs.append(agent) return agent_objs + return [] - # save init settings globally, will be used to init child process - _INIT_SETTINGS["model_configs"] = model_configs - _INIT_SETTINGS["project"] = project - _INIT_SETTINGS["name"] = name - _INIT_SETTINGS["save_dir"] = save_dir - _INIT_SETTINGS["save_log"] = save_log - _INIT_SETTINGS["save_code"] = save_code - _INIT_SETTINGS["save_api_invoke"] = save_api_invoke - _INIT_SETTINGS["logger_level"] = logger_level - _INIT_SETTINGS["agent_configs"] = agent_configs - return [] +def init_process( + model_configs: Optional[Union[dict, str, list]] = None, + project: Optional[str] = None, + name: Optional[str] = None, + runtime_id: Optional[str] = None, + save_dir: str = _DEFAULT_DIR, + save_api_invoke: bool = False, + save_log: bool = False, + logger_level: LOG_LEVEL = _DEFAULT_LOG_LEVEL, +) -> None: + """A entry to initialize the package in a process. + + Args: + project (`Optional[str]`, defaults to `None`): + The project name, which is used to identify the project. + name (`Optional[str]`, defaults to `None`): + The name for runtime, which is used to identify this runtime. + runtime_id (`Optional[str]`, defaults to `None`): + The id for runtime, which is used to identify this runtime. + save_dir (`str`, defaults to `./runs`): + The directory to save logs, files, codes, and api invocations. + If `dir` is `None`, when saving logs, files, codes, and api + invocations, the default directory `./runs` will be created. + save_api_invoke (`bool`, defaults to `False`): + Whether to save api invocations locally, including model and web + search invocation. + model_configs (`Optional[Sequence]`, defaults to `None`): + A sequence of pre-init model configs. + save_log (`bool`, defaults to `False`): + Whether to save logs locally. + logger_level (`LOG_LEVEL`, defaults to `"INFO"`): + The logging level of logger. + """ + # Load model configs if needed + if model_configs is not None: + read_model_configs(model_configs) + + # Init the runtime + Runtime.project = project + Runtime.name = name + if runtime_id is not None: + Runtime.runtime_id = runtime_id + + # Init file manager + file_manager.init(save_dir, save_api_invoke) + + # Init monitor + _ = MonitorFactory.get_monitor(db_path=file_manager.path_db) + + # Init logger + dir_log = str(file_manager.dir_log) if save_log else None + setup_logger(dir_log, logger_level) diff --git a/src/agentscope/agents/__init__.py b/src/agentscope/agents/__init__.py index 14723727c..fd125d17b 100644 --- a/src/agentscope/agents/__init__.py +++ b/src/agentscope/agents/__init__.py @@ -2,7 +2,6 @@ """ Import all agent related modules in the package. """ from .agent import AgentBase from .operator import Operator -from .rpc_agent import RpcAgentBase from .dialog_agent import DialogAgent from .dict_dialog_agent import DictDialogAgent from .user_agent import UserAgent @@ -11,7 +10,6 @@ __all__ = [ "AgentBase", "Operator", - "RpcAgentBase", "DialogAgent", "DictDialogAgent", "UserAgent", diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index 6dee7c4a1..18cca85b0 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -2,20 +2,29 @@ """ Base class for Agent """ from __future__ import annotations +from abc import ABCMeta from typing import Optional from typing import Sequence from typing import Union from typing import Any from typing import Callable - from loguru import logger -from .operator import Operator -from ..models import load_model_by_name -from ..memory import TemporaryMemory +from agentscope.agents.operator import Operator +from agentscope.models import load_model_by_name +from agentscope.memory import TemporaryMemory + + +class _RecordInitSettingMeta(ABCMeta): + """A wrapper to record the init args into `_init_settings` field.""" + def __call__(cls, *args: tuple, **kwargs: dict) -> Any: + instance = super().__call__(*args, **kwargs) + instance._init_settings = {"args": args, "kwargs": kwargs} + return instance -class AgentBase(Operator): + +class AgentBase(Operator, metaclass=_RecordInitSettingMeta): """Base class for all agents. All agents should inherit from this class and implement the `reply` @@ -172,3 +181,51 @@ def _broadcast_to_audience(self, x: dict) -> None: """Broadcast the input to all audiences.""" for agent in self._audience: agent.observe(x) + + def to_dist( + self, + host: str = "localhost", + port: int = None, + max_pool_size: int = 100, + max_timeout_seconds: int = 1800, + launch_server: bool = True, + local_mode: bool = True, + lazy_launch: bool = True, + ) -> AgentBase: + """Convert current agent instance into a distributed version. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + local_mode (`bool`, defaults to `True`): + Whether the started rpc server only listens to local + requests. + lazy_launch (`bool`, defaults to `True`): + Only launch the server when the agent is called. + + Returns: + `AgentBase`: the wrapped agent instance with distributed + functionality + """ + from .rpc_agent import RpcAgent + + if issubclass(self.__class__, RpcAgent): + return self + return RpcAgent( + agent_class=self.__class__, + agent_configs=self._init_settings, + name=self.name, + host=host, + port=port, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + launch_server=launch_server, + local_mode=local_mode, + lazy_launch=lazy_launch, + ) diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index a2aeff030..76988a541 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -1,21 +1,24 @@ # -*- coding: utf-8 -*- """ Base class for Rpc Agent """ -from multiprocessing import Process -from multiprocessing import Event +from multiprocessing import ( + Process, + Event, + Pipe, +) from multiprocessing.synchronize import Event as EventClass import socket import threading import time import json -from queue import Queue from typing import Any -from typing import Callable from typing import Optional from typing import Union from typing import Type from typing import Sequence +from queue import Queue from concurrent import futures +from loguru import logger try: import grpc @@ -29,15 +32,14 @@ except ImportError: ExpiringDict = None -from agentscope._init import _INIT_SETTINGS -from agentscope._init import init +from agentscope._init import init_process, _INIT_SETTINGS from agentscope.agents.agent import AgentBase -from agentscope.message import MessageBase -from agentscope.message import Msg -from agentscope.message import PlaceholderMessage -from agentscope.message import deserialize -from agentscope.message import serialize -from agentscope.utils.logging_utils import logger +from agentscope.message import ( + Msg, + PlaceholderMessage, + deserialize, + serialize, +) from agentscope.rpc import ( RpcAgentClient, RpcMsg, @@ -64,266 +66,90 @@ def inner(rpc_agent, msg): # type: ignore [no-untyped-def] return inner -class RpcAgentBase(AgentBase, RpcAgentServicer): - """Abstract service of RpcAgent, also act as AgentBase. - - Note: - Please implement reply method based on the functionality of your - agent. - """ +class RpcAgent(AgentBase): + """A wrapper to extend an AgentBase into a gRPC Client.""" def __init__( self, name: str, - config: Optional[dict] = None, - sys_prompt: Optional[str] = None, - model: Optional[Union[Callable[..., Any], str]] = None, - use_memory: bool = True, - memory_config: Optional[dict] = None, + agent_class: Type[AgentBase], + agent_configs: dict, host: str = "localhost", - port: int = 80, + port: int = None, max_pool_size: int = 100, max_timeout_seconds: int = 1800, launch_server: bool = True, local_mode: bool = True, lazy_launch: bool = True, - is_servicer: bool = False, ) -> None: - """Init a RpcAgentBase instance. + """Initialize a RpcAgent instance. Args: - name (`str`): - The name of the agent. - config (`Optional[dict]`): - The configuration of the agent, if provided, the agent will - be initialized from the config rather than the other - parameters. - sys_prompt (`Optional[str]`): - The system prompt of the agent, which can be passed by args - or hard-coded in the agent. - model (`Optional[Union[Callable[..., Any], str]]`, defaults to - None): - The callable model object or the model name, which is used to - load model from configuration. - use_memory (`bool`, defaults to `True`): - Whether the agent has memory. - memory_config (`Optional[dict]`): - The config of memory. - host (`str`, defaults to "localhost"): + agent_class (`Type[AgentBase]`, defaults to `None`): + The AgentBase subclass encapsulated by this wrapper. + agent_configs (`dict`): The args used to initialize the + agent_class. + name (`str`): Name of the agent. + host (`str`, defaults to `"localhost"`): Hostname of the rpc agent server. - port (`int`, defaults to `80`): + port (`int`, defaults to `None`): Port of the rpc agent server. max_pool_size (`int`, defaults to `100`): - The max number of task results that the server can - accommodate. Note that the oldest result will be deleted - after exceeding the pool size. + Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. Note that expired results will be - deleted. - launch_server (`bool`, defaults to `True`): - Launch a rpc server locally. + Timeout for task results. local_mode (`bool`, defaults to `True`): - The started server only listens to local requests. + Whether the started rpc server only listens to local + requests. lazy_launch (`bool`, defaults to `True`): Only launch the server when the agent is called. - is_servicer (`bool`, defaults to `False`): - Used as a servicer of rpc server. """ - super().__init__( - name, - config, - sys_prompt, - model, - use_memory, - memory_config, - ) + super().__init__(name=name) self.host = host self.port = port - self.is_servicer = is_servicer - - # prohibit servicer object from launching a new server - assert not (is_servicer and launch_server) - # launch_server is True only in the main process + self.server_launcher = None + self.client = None if launch_server: self.server_launcher = RpcAgentServerLauncher( - name=name, - config=config, - sys_prompt=sys_prompt, - model=model, - use_memory=use_memory, - memory_config=memory_config, - agent_class=self.__class__, + agent_class=agent_class, + agent_args=agent_configs["args"], + agent_kwargs=agent_configs["kwargs"], host=host, port=port, max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, local_mode=local_mode, ) - self.port = self.server_launcher.port - self.client = None if not lazy_launch: - self.server_launcher.launch() - self.client = RpcAgentClient(host=self.host, port=self.port) + self._launch_server() else: - self.server_launcher = None - # is_servicer is True only in the rpc server process - if is_servicer: - self.result_pool = ExpiringDict( - max_len=max_pool_size, - max_age_seconds=max_timeout_seconds, - ) - self.task_queue = Queue() - self.worker_thread = threading.Thread(target=self.process_tasks) - self.worker_thread.start() - self.task_id_lock = threading.Lock() - self.task_id_counter = 0 - - # connect to an existing rpc server - if not launch_server and not is_servicer: self.client = RpcAgentClient(host=self.host, port=self.port) - def get_task_id(self) -> int: - """Get the auto-increment task id.""" - with self.task_id_lock: - self.task_id_counter += 1 - return self.task_id_counter - - def call_func(self, request: RpcMsg, _: ServicerContext) -> RpcMsg: - if hasattr(self, request.target_func): - return getattr(self, request.target_func)(request) - else: - logger.error(f"Unsupported method {request.target_func}") - return RpcMsg( - value=Msg( - name=self.name, - content=f"Unsupported method {request.target_func}", - ).serialize(), - ) - - @rpc_servicer_method - def _call(self, request: RpcMsg) -> RpcMsg: - """Call function of RpcAgentService - - Args: - request (`RpcMsg`): - Message containing input parameters or input parameter - placeholders. - - Returns: - `RpcMsg`: A serialized Msg instance with attributes name, host, - port and task_id - """ - if request.value: - msg = deserialize(request.value) - else: - msg = None - task_id = self.get_task_id() - self.task_queue.put((task_id, msg)) - return RpcMsg( - value=Msg( - name=self.name, - content=None, - host=self.host, - port=self.port, - task_id=task_id, - ).serialize(), - ) - - @rpc_servicer_method - def _get(self, request: RpcMsg) -> RpcMsg: - """Get function of RpcAgentService - - Args: - request (`RpcMsg`): - Identifier of message, with json format:: - - { - 'task_id': int - } - - Returns: - `RpcMsg`: Concrete values of the specific message (or part of it). - """ - # todo: add format specification of request - msg = json.loads(request.value) - # todo: implement the waiting in a more elegant way, add timeout - while True: - result = self.result_pool.get(msg["task_id"], None) - if result is not None: - return RpcMsg(value=result.serialize()) - time.sleep(0.1) - - @rpc_servicer_method - def _observe(self, request: RpcMsg) -> RpcMsg: - """Observe function of RpcAgentService - - Args: - request (`RpcMsg`): - The serialized input to be observed. - - Returns: - `RpcMsg`: Empty RpcMsg. - """ - msgs = deserialize(request.value) - for msg in msgs: - if isinstance(msg, PlaceholderMessage): - msg.update_value() - self.memory.add(msgs) - return RpcMsg() - - def process_tasks(self) -> None: - """Task processing thread.""" - while True: - task_id, task_msg = self.task_queue.get() - result = self.reply(task_msg) - self.result_pool[task_id] = result + def _launch_server(self) -> None: + """Launch a rpc server and update the port and the client""" + self.server_launcher.launch() + self.port = self.server_launcher.port + self.client = RpcAgentClient(host=self.host, port=self.port) def reply(self, x: dict = None) -> dict: - """Reply function used in the rpc agent server process.""" - raise NotImplementedError( - f"Agent [{type(self).__name__}] is missing the required " - f'"reply" function.', + if self.client is None: + self._launch_server() + res_msg = self.client.call_func( + func_name="_call", + value=x.serialize() if x is not None else "", + ) + return PlaceholderMessage( + **deserialize(res_msg), # type: ignore [arg-type] ) def observe(self, x: Union[dict, Sequence[dict]]) -> None: - """Observe the input, store it in memory and don't response to it. - - Args: - x (`Union[dict, Sequence[dict]]`): - The input to be observed. - """ if self.client is None: - self.server_launcher.launch() - self.client = RpcAgentClient(host=self.host, port=self.port) + self._launch_server() self.client.call_func( func_name="_observe", value=serialize(x), # type: ignore [arg-type] ) - def __call__(self, *args: Any, **kwargs: Any) -> dict: - """Call function used in the main process.""" - if args is not None and len(args) > 0: - x = args[0] - elif kwargs is not None and len(kwargs) > 0: - x = kwargs["x"] - else: - x = None - if x is not None: - assert isinstance(x, MessageBase) - if self.client is None: - self.server_launcher.launch() - self.client = RpcAgentClient(host=self.host, port=self.port) - res_msg = self.client.call_func( - func_name="_call", - value=x.serialize() if x is not None else "", - ) - res = PlaceholderMessage( - **deserialize(res_msg), # type: ignore [arg-type] - ) - if self._audience is not None: - self._broadcast_to_audience(res) - return res - def stop(self) -> None: """Stop the RpcAgent and the launched rpc server.""" if self.server_launcher is not None: @@ -335,103 +161,161 @@ def __del__(self) -> None: def setup_rcp_agent_server( + agent_class: Type[AgentBase], + agent_args: tuple, + agent_kwargs: dict, + host: str, port: int, - servicer_class: Type[RpcAgentServicer], + init_settings: dict = None, start_event: EventClass = None, stop_event: EventClass = None, - max_workers: int = 4, + pipe: int = None, local_mode: bool = True, - init_settings: dict = None, - kwargs: dict = None, + max_pool_size: int = 100, + max_timeout_seconds: int = 1800, + max_workers: int = 4, ) -> None: """Setup gRPC server rpc agent. Args: + agent_class (`Type[AgentBase]`): + A subclass of AgentBase. + agent_args (`tuple`): The args tuple used to initialize the + agent_class. + agent_kwargs (`dict`): The args dict used to initialize the + agent_class. + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. port (`int`): The socket port monitored by grpc server. - servicer_class (`Type[RpcAgentServicer]`): - An implementation of RpcAgentBaseService. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. start_event (`EventClass`, defaults to `None`): An Event instance used to determine whether the child process has been started. stop_event (`EventClass`, defaults to `None`): The stop Event instance used to determine whether the child process has been stopped. - max_workers (`int`, defaults to `4`): - max worker number of grpc server. + pipe (`int`, defaults to `None`): + A pipe instance used to pass the actual port of the server. local_mode (`bool`, defaults to `None`): Only listen to local requests. - init_settings (`dict`, defaults to `None`): - Init settings. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + max_workers (`int`, defaults to `4`): + max worker number of grpc server. """ if init_settings is not None: - init(**init_settings) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) - servicer = servicer_class(**kwargs) - add_RpcAgentServicer_to_server(servicer, server) - if local_mode: - server.add_insecure_port(f"localhost:{port}") - else: - server.add_insecure_port(f"0.0.0.0:{port}") - server.start() + init_process(**init_settings) + servicer = RpcServerSideWrapper( + agent_class(*agent_args, **agent_kwargs), + host=host, + port=port, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + ) + while True: + try: + port = check_port(port) + servicer.port = port + logger.info( + f"Starting rpc server [{agent_class.__name__}] at port" + f" [{port}]...", + ) + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=max_workers), + ) + add_RpcAgentServicer_to_server(servicer, server) + if local_mode: + server.add_insecure_port(f"localhost:{port}") + else: + server.add_insecure_port(f"0.0.0.0:{port}") + server.start() + break + except OSError: + logger.warning( + f"Failed to start rpc server at port [{port}]" + f"try another port", + ) logger.info( - f"rpc server [{servicer_class.__name__}] at port [{port}] started " + f"rpc server [{agent_class.__name__}] at port [{port}] started " "successfully", ) + pipe.send(port) start_event.set() stop_event.wait() logger.info( - f"Stopping rpc server [{servicer_class.__name__}] at port [{port}]", + f"Stopping rpc server [{agent_class.__name__}] at port [{port}]", ) server.stop(0) logger.info( - f"rpc server [{servicer_class.__name__}] at port [{port}] stopped " + f"rpc server [{agent_class.__name__}] at port [{port}] stopped " "successfully", ) +def find_available_port() -> int: + """Get an unoccupied socket port number.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def check_port(port: Optional[int] = None) -> int: + """Check if the port is available. + + Args: + port (`int`): + the port number being checked. + + Returns: + `int`: the port number that passed the check. If the port is found + to be occupied, an available port number will be automatically + returned. + """ + if port is None: + new_port = find_available_port() + logger.warning( + "gRpc server port is not provided, automatically select " + f"[{new_port}] as the port number.", + ) + return new_port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + new_port = find_available_port() + logger.warning( + f"Port [{port}] is occupied, use [{new_port}] instead", + ) + return new_port + return port + + class RpcAgentServerLauncher: """Launcher of rpc agent server.""" def __init__( self, - name: str, - config: Optional[dict] = None, - sys_prompt: Optional[str] = None, - model: Optional[Union[Callable[..., Any], str]] = None, - use_memory: bool = True, - memory_config: Optional[dict] = None, - agent_class: Type[RpcAgentBase] = None, + agent_class: Type[AgentBase] = None, + agent_args: tuple = (), + agent_kwargs: dict = None, host: str = "localhost", port: int = None, max_pool_size: int = 100, max_timeout_seconds: int = 1800, - local_mode: bool = True, - **kwargs: Any, + local_mode: bool = False, ) -> None: """Init a rpc agent server launcher. Args: - name (`str`): - The name of the agent. - config (`Optional[dict]`): - The configuration of the agent, if provided, the agent will - be initialized from the config rather than the other - parameters. - sys_prompt (`Optional[str]`): - The system prompt of the agent, which can be passed by args - or hard-coded in the agent. - model (`Optional[Union[Callable[..., Any], str]]`, defaults to - None): - The callable model object or the model name, which is used to - load model from configuration. - use_memory (`bool`, defaults to `True`): - Whether the agent has memory. - memory_config (`Optional[dict]`): - The config of memory. - agent_class (`Type[RpcAgentBase]`, defaults to `None`): - The RpcAgentBase class used in rpc agent server as a servicer. + agent_class (`Type[AgentBase]`, defaults to `None`): + The AgentBase subclass encapsulated by this wrapper. + agent_args (`tuple`): The args tuple used to initialize the + agent_class. + agent_kwargs (`dict`): The args dict used to initialize the + agent_class. host (`str`, defaults to `"localhost"`): Hostname of the rpc agent server. port (`int`, defaults to `None`): @@ -440,98 +324,52 @@ def __init__( Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): Timeout for task results. - local_mode (`bool`, defaults to `True`): + local_mode (`bool`, defaults to `False`): Whether the started rpc server only listens to local requests. """ - self.name = name - self.config = config - self.sys_prompt = sys_prompt - self.model = model - self.use_memory = use_memory - self.memory_config = memory_config self.agent_class = agent_class + self.agent_args = agent_args + self.agent_kwargs = agent_kwargs self.host = host - self.port = self.check_port(port) + self.port = check_port(port) self.max_pool_size = max_pool_size self.max_timeout_seconds = max_timeout_seconds self.local_model = local_mode self.server = None self.stop_event = None - self.kwargs = kwargs - - def find_available_port(self) -> int: - """Get an unoccupied socket port number.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - return s.getsockname()[1] - - def check_port(self, port: int) -> int: - """Check if the port is available. - - Args: - port (`int`): - the port number being checked. - - Returns: - `int`: the port number that passed the check. If the port is found - to be occupied, an available port number will be automatically - returned. - """ - if port is None: - new_port = self.find_available_port() - logger.warning( - "gRpc server port is not provided, automatically select " - f"[{new_port}] as the port number.", - ) - return new_port - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if s.connect_ex(("localhost", port)) == 0: - new_port = self.find_available_port() - logger.warning( - f"Port [{port}] is occupied, use [{new_port}] instead", - ) - return new_port - return port + self.parent_con = None def launch(self) -> None: """launch a local rpc agent server.""" self.stop_event = Event() - logger.info( - f"Starting rpc server [{self.agent_class.__name__}] at port" - f" [{self.port}]...", - ) + self.parent_con, child_con = Pipe() start_event = Event() server_process = Process( target=setup_rcp_agent_server, kwargs={ + "agent_class": self.agent_class, + "agent_args": self.agent_args, + "agent_kwargs": self.agent_kwargs, + "host": self.host, "port": self.port, - "servicer_class": self.agent_class, + "init_settings": _INIT_SETTINGS, "start_event": start_event, "stop_event": self.stop_event, + "pipe": child_con, + "max_pool_size": self.max_pool_size, + "max_timeout_seconds": self.max_timeout_seconds, "local_mode": self.local_model, - "init_settings": _INIT_SETTINGS, - "kwargs": { - "name": self.name, - "config": self.config, - "sys_prompt": self.sys_prompt, - "model": self.model, - "use_memory": self.use_memory, - "memory_config": self.memory_config, - "host": self.host, - "port": self.port, - "max_pool_size": self.max_pool_size, - "max_timeout_seconds": self.max_timeout_seconds, - "launch_server": False, - "lazy_launch": False, - "is_servicer": True, - **self.kwargs, - }, }, ) server_process.start() + self.port = self.parent_con.recv() start_event.wait() self.server = server_process + logger.info( + f"Launch [{self.agent_class.__name__}] server at " + f"[{self.host}:{self.port}] success", + ) def wait_until_terminate(self) -> None: """Wait for server process""" @@ -553,3 +391,141 @@ def shutdown(self) -> None: f" [{self.port}] is killed.", ) self.server = None + + +class RpcServerSideWrapper(RpcAgentServicer): + """A wrapper to extend an AgentBase into a gRPC Servicer.""" + + def __init__( + self, + agent_instance: AgentBase, + host: str = "localhost", + port: int = None, + max_pool_size: int = 100, + max_timeout_seconds: int = 1800, + ): + """Init the service side wrapper. + + Args: + agent_instance (`AgentBase`): an instance of `AgentBase`. + host (`str`, defaults to "localhost"): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `100`): + The max number of task results that the server can + accommodate. Note that the oldest result will be deleted + after exceeding the pool size. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. Note that expired results will be + deleted. + """ + self.host = host + self.port = port + self.result_pool = ExpiringDict( + max_len=max_pool_size, + max_age_seconds=max_timeout_seconds, + ) + self.task_queue = Queue() + self.worker_thread = threading.Thread(target=self.process_tasks) + self.worker_thread.start() + self.task_id_lock = threading.Lock() + self.task_id_counter = 0 + self.agent = agent_instance + + def get_task_id(self) -> int: + """Get the auto-increment task id.""" + with self.task_id_lock: + self.task_id_counter += 1 + return self.task_id_counter + + def call_func(self, request: RpcMsg, _: ServicerContext) -> RpcMsg: + """Call the specific servicer function.""" + if hasattr(self, request.target_func): + return getattr(self, request.target_func)(request) + else: + logger.error(f"Unsupported method {request.target_func}") + return RpcMsg( + value=Msg( + name=self.agent.name, + content=f"Unsupported method {request.target_func}", + ).serialize(), + ) + + def _call(self, request: RpcMsg) -> RpcMsg: + """Call function of RpcAgentService + + Args: + request (`RpcMsg`): + Message containing input parameters or input parameter + placeholders. + + Returns: + `RpcMsg`: A serialized Msg instance with attributes name, host, + port and task_id + """ + if request.value: + msg = deserialize(request.value) + else: + msg = None + task_id = self.get_task_id() + self.task_queue.put((task_id, msg)) + return RpcMsg( + value=Msg( + name=self.agent.name, + content=None, + host=self.host, + port=self.port, + task_id=task_id, + ).serialize(), + ) + + def _get(self, request: RpcMsg) -> RpcMsg: + """Get function of RpcAgentService + + Args: + request (`RpcMsg`): + Identifier of message, with json format:: + + { + 'task_id': int + } + + Returns: + `RpcMsg`: Concrete values of the specific message (or part of it). + """ + # todo: add format specification of request + msg = json.loads(request.value) + # todo: implement the waiting in a more elegant way, add timeout + while True: + result = self.result_pool.get(msg["task_id"], None) + if result is not None: + return RpcMsg(value=result.serialize()) + time.sleep(0.1) + + def _observe(self, request: RpcMsg) -> RpcMsg: + """Observe function of RpcAgentService + + Args: + request (`RpcMsg`): + The serialized input to be observed. + + Returns: + `RpcMsg`: Empty RpcMsg. + """ + msgs = deserialize(request.value) + for msg in msgs: + if isinstance(msg, PlaceholderMessage): + msg.update_value() + self.agent.observe(msgs) + return RpcMsg() + + def process_tasks(self) -> None: + """Task processing thread.""" + while True: + task_id, task_msg = self.task_queue.get() + # TODO: optimize this and avoid blocking + if isinstance(task_msg, PlaceholderMessage): + task_msg.update_value() + result = self.agent.reply(task_msg) + self.result_pool[task_id] = result diff --git a/src/agentscope/agents/rpc_dialog_agent.py b/src/agentscope/agents/rpc_dialog_agent.py deleted file mode 100644 index 6f81c15ba..000000000 --- a/src/agentscope/agents/rpc_dialog_agent.py +++ /dev/null @@ -1,109 +0,0 @@ -# -*- coding: utf-8 -*- -""" A rpc version of dialog agent.""" -import json -from typing import Any -from typing import Callable -from typing import Optional -from typing import Union -from loguru import logger - -from agentscope.message import Msg -from agentscope.agents.rpc_agent import RpcAgentBase -from agentscope.prompt import PromptEngine -from agentscope.prompt import PromptType - - -class RpcDialogAgent(RpcAgentBase): - """DialogAgent class""" - - def __init__( - self, - name: str, - config: Optional[dict] = None, - sys_prompt: Optional[str] = None, - model: Optional[Union[Callable[..., Any], str]] = None, - use_memory: bool = True, - memory_config: Optional[dict] = None, - host: str = "localhost", - port: int = 80, - max_pool_size: int = 100, - max_timeout_seconds: int = 1800, - launch_server: bool = True, - local_mode: bool = True, - lazy_launch: bool = True, - is_servicer: bool = False, - ) -> None: - super().__init__( - name=name, - config=config, - sys_prompt=sys_prompt, - model=model, - use_memory=use_memory, - memory_config=memory_config, - host=host, - port=port, - max_pool_size=max_pool_size, - max_timeout_seconds=max_timeout_seconds, - launch_server=launch_server, - local_mode=local_mode, - lazy_launch=lazy_launch, - is_servicer=is_servicer, - ) - # init prompt engine - if is_servicer: - self.engine = PromptEngine(self.model, prompt_type=PromptType.LIST) - - def reply(self, x: dict = None) -> dict: - """Reply function of the agent. Processes the input data, - generates a prompt using the current dialogue memory and system - prompt, and invokes the language model to produce a response. The - response is then formatted and added to the dialogue memory. - - Args: - x (`dict`, defaults to `None`): - A dictionary representing the user's input to the agent. - This input is added to the dialogue memory if provided. - Returns: - dict: A dictionary representing the message generated by the agent - in response to the user's input. It contains at least a - 'speak' key with the textual response and may include other - keys such as 'agreement' if provided by the language model. - Raises: - `json.decoder.JSONDecodeError`: - If the response from the language model is not valid JSON, - it defaults to treating the response as plain text. - """ - # record the input if needed - if x is not None: - self.memory.add(x) - - # prepare prompt - prompt = self.engine.join( - self.sys_prompt, - self.memory.get_memory(), - ) - - # call llm - response = self.model( - prompt, - parse_func=json.loads, - fault_handler=lambda x: {"speak": x}, - ) - - logger.debug(json.dumps(response, indent=4)) - - if isinstance(response, dict) and "speak" in response: - msg = Msg( - name=self.name, - content=response.get("speak", None) or response, - **response, - ) - else: - msg = Msg(self.name, response) - - logger.chat(msg) - - # record to memory - self.memory.add(msg) - - return msg diff --git a/src/agentscope/agents/user_agent.py b/src/agentscope/agents/user_agent.py index ac2339166..e8de167ac 100644 --- a/src/agentscope/agents/user_agent.py +++ b/src/agentscope/agents/user_agent.py @@ -46,15 +46,15 @@ def reply( x (`dict`, defaults to `None`): A dictionary containing initial data to be added to memory. Defaults to None. - required_keys (`Optional[Union[list[str], str]]`, defaults to - None): + required_keys \ + (`Optional[Union[list[str], str]]`, defaults to `None`): Strings that requires user to input, which will be used as the key of the returned dict. Defaults to None. Returns: - dict: A dictionary representing the message object that contains - the user's input and any additional details. This is also - stored in the object's memory. + `dict`: A dictionary representing the message object that contains + the user's input and any additional details. This is also + stored in the object's memory. """ if x is not None: self.memory.add(x) diff --git a/src/agentscope/constants.py b/src/agentscope/constants.py index 5cc9fea61..2b557d75a 100644 --- a/src/agentscope/constants.py +++ b/src/agentscope/constants.py @@ -4,7 +4,7 @@ from enum import IntEnum PACKAGE_NAME = "agentscope" -MSG_TOKEN = f"<{PACKAGE_NAME}_msg>" +MSG_TOKEN = f"[{PACKAGE_NAME}_msg]" # default values diff --git a/src/agentscope/message.py b/src/agentscope/message.py index 8aa675f68..ae0d84672 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -180,14 +180,18 @@ def serialize(self) -> str: class PlaceholderMessage(MessageBase): """A placeholder for the return message of RpcAgent.""" + PLACEHOLDER_ATTRS = { + "_host", + "_port", + "_client", + "_task_id", + "_is_placeholder", + } + LOCAL_ATTRS = { - "host", - "port", - "client", - "task_id", - "is_placeholder", "name", "timestamp", + *PLACEHOLDER_ATTRS, } def __init__( @@ -232,14 +236,17 @@ def __init__( timestamp=timestamp, **kwargs, ) - # todo: avoid attribute name duplication - self.host = host - self.port = port - - self.client = RpcAgentClient(self.host, self.port) - self.task_id = task_id # placeholder indicates whether the real message is still in rpc server - self.is_placeholder = True + self._is_placeholder = True + self._host = host + self._port = port + self._client = RpcAgentClient(host, port) + self._task_id = task_id + + def __is_local(self, key: Any) -> bool: + return ( + key in PlaceholderMessage.LOCAL_ATTRS or not self._is_placeholder + ) def __getattr__(self, __name: str) -> Any: """Get attribute value from PlaceholderMessage. Get value from rpc @@ -248,53 +255,57 @@ def __getattr__(self, __name: str) -> Any: Args: __name (`str`): Attribute name. - """ - if ( - __name not in PlaceholderMessage.LOCAL_ATTRS - and self.is_placeholder - ): + if not self.__is_local(__name): self.update_value() return MessageBase.__getattr__(self, __name) + def __getitem__(self, __key: Any) -> Any: + """Get item value from PlaceholderMessage. Get value from rpc + agent server if necessary. + + Args: + __key (`Any`): + Item name. + """ + if not self.__is_local(__key): + self.update_value() + return MessageBase.__getitem__(self, __key) + def to_str(self) -> str: - if self.is_placeholder: - return f"{self.name}: [message from {self.host}:{self.port}]" - else: - return f"{self.name}: {self.content}" + return f"{self.name}: {self.content}" def update_value(self) -> MessageBase: """Get attribute values from rpc agent server immediately""" - if self.is_placeholder: + if self._is_placeholder: # retrieve real message from rpc agent server - result = self.client.call_func( + result = self._client.call_func( func_name="_get", - value=json.dumps({"task_id": self.task_id}), + value=json.dumps({"task_id": self._task_id}), ) self.update(deserialize(result)) # the actual value has been updated, not a placeholder any more - self.is_placeholder = False + self._is_placeholder = False return self def serialize(self) -> str: - if self.is_placeholder: + if self._is_placeholder: return json.dumps( { "__type": "PlaceholderMessage", "name": self.name, "content": None, "timestamp": self.timestamp, - "host": self.host, - "port": self.port, - "task_id": self.task_id, + "host": self._host, + "port": self._port, + "task_id": self._task_id, }, ) else: states = { k: v for k, v in self.items() - if k - not in ["host", "port", "client", "task_id", "is_placeholder"] + if k not in PlaceholderMessage.PLACEHOLDER_ATTRS } states["__type"] = "Msg" return json.dumps(states) diff --git a/src/agentscope/models/__init__.py b/src/agentscope/models/__init__.py index 4633d86a4..205f2607f 100644 --- a/src/agentscope/models/__init__.py +++ b/src/agentscope/models/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ Import modules in models package.""" import json -from typing import Union +from typing import Union, Sequence from loguru import logger diff --git a/src/agentscope/rpc/rpc_agent_client.py b/src/agentscope/rpc/rpc_agent_client.py index e1f5f3937..768a81515 100644 --- a/src/agentscope/rpc/rpc_agent_client.py +++ b/src/agentscope/rpc/rpc_agent_client.py @@ -30,8 +30,6 @@ def __init__(self, host: str, port: int) -> None: """ self.host = host self.port = port - self.channel = grpc.insecure_channel(f"{self.host}:{self.port}") - self.stub = RpcAgentStub(self.channel) def call_func(self, func_name: str, value: str = None) -> str: """Call the specific function of rpc server. @@ -43,7 +41,9 @@ def call_func(self, func_name: str, value: str = None) -> str: Returns: str: serialized return data. """ - result_msg = self.stub.call_func( - RpcMsg(value=value, target_func=func_name), - ) - return result_msg.value + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + result_msg = stub.call_func( + RpcMsg(value=value, target_func=func_name), + ) + return result_msg.value diff --git a/src/agentscope/utils/logging_utils.py b/src/agentscope/utils/logging_utils.py index 2bb82a562..47813b898 100644 --- a/src/agentscope/utils/logging_utils.py +++ b/src/agentscope/utils/logging_utils.py @@ -174,15 +174,15 @@ def setup_logger( `"DEBUG"`, `"INFO"`, `"SUCCESS"`, `"WARNING"`, `"ERROR"`, `"CRITICAL"`. """ - # redirect stderr to record errors in logging - sys.stderr = _Stream() - # avoid reinit in subprocess if hasattr(logger, "chat"): return + # redirect stderr to record errors in logging + sys.stderr = _Stream() + # add chat function for logger - logger.level("CHAT", no=20, color="") + logger.level("CHAT", no=21, color="") logger.chat = _chat # set logging level @@ -193,11 +193,10 @@ def setup_logger( if path_log is not None: if not os.path.exists(path_log): os.makedirs(path_log) - - path_log_file = os.path.join(path_log, "file_{time}.log") + path_log_file = os.path.join(path_log, "all.log") path_log_file_only_chat = os.path.join( path_log, - "file_{time}.log.chat", + "chat.log", ) # save all logging into file diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 2ffaac43e..be8279657 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -4,18 +4,21 @@ """ import unittest import time +import shutil +from loguru import logger +import agentscope from agentscope.agents import AgentBase -from agentscope.agents import RpcAgentBase from agentscope.agents.rpc_agent import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage from agentscope.message import deserialize from agentscope.msghub import msghub from agentscope.pipelines import sequentialpipeline +from agentscope.utils import MonitorFactory, QuotaExceededError -class DemoRpcAgent(RpcAgentBase): +class DemoRpcAgent(AgentBase): """A demo Rpc agent for test usage.""" def __init__(self, **kwargs) -> None: # type: ignore [no-untyped-def] @@ -30,7 +33,7 @@ def reply(self, x: dict = None) -> dict: return x -class DemoRpcAgentAdd(RpcAgentBase): +class DemoRpcAgentAdd(AgentBase): """A demo Rpc agent for test usage""" def reply(self, x: dict = None) -> dict: @@ -50,7 +53,7 @@ def reply(self, x: dict = None) -> dict: return x -class DemoRpcAgentWithMemory(RpcAgentBase): +class DemoRpcAgentWithMemory(AgentBase): """A demo Rpc agent that count its memory""" def reply(self, x: dict = None) -> dict: @@ -62,15 +65,57 @@ def reply(self, x: dict = None) -> dict: return msg +class DemoRpcAgentWithMonitor(AgentBase): + """A demo Rpc agent that use monitor""" + + def reply(self, x: dict = None) -> dict: + monitor = MonitorFactory.get_monitor() + try: + monitor.update({"msg_num": 1}) + except QuotaExceededError: + x.content["quota_exceeded"] = True + logger.chat( + { + "name": self.name, + "content": "quota_exceeded", + }, + ) + return x + x.content["msg_num"] = monitor.get_value("msg_num") + logger.chat( + { + "name": self.name, + "content": f"msg_num {x.content['msg_num']}", + }, + ) + time.sleep(0.2) + return x + + class BasicRpcAgentTest(unittest.TestCase): "Test cases for Rpc Agent" + def setUp(self) -> None: + """Init for Rpc Agent Test""" + agentscope.init( + project="test", + name="rpc_agent", + save_dir="./test_runs", + save_log=True, + ) + + def tearDown(self) -> None: + MonitorFactory._instance = None # pylint: disable=W0212 + logger.remove() + shutil.rmtree("./test_runs") + def test_single_rpc_agent_server(self) -> None: """test setup a single rpc agent""" host = "localhost" port = 12001 agent_a = DemoRpcAgent( name="a", + ).to_dist( host=host, port=port, ) @@ -79,19 +124,30 @@ def test_single_rpc_agent_server(self) -> None: result = agent_a(msg) # get name without waiting for the server self.assertEqual(result.name, "a") + self.assertEqual(result["name"], "a") js_placeholder_result = result.serialize() - self.assertTrue(result.is_placeholder) + self.assertTrue(result._is_placeholder) # pylint: disable=W0212 placeholder_result = deserialize(js_placeholder_result) self.assertTrue(isinstance(placeholder_result, PlaceholderMessage)) self.assertEqual(placeholder_result.name, "a") - self.assertTrue(placeholder_result.is_placeholder) + self.assertEqual( + placeholder_result["name"], # type: ignore [call-overload] + "a", + ) + self.assertTrue( + placeholder_result._is_placeholder, # pylint: disable=W0212 + ) # wait to get content self.assertEqual(result.content, msg.content) - self.assertFalse(result.is_placeholder) + self.assertFalse(result._is_placeholder) # pylint: disable=W0212 self.assertEqual(result.id, 0) - self.assertTrue(placeholder_result.is_placeholder) + self.assertTrue( + placeholder_result._is_placeholder, # pylint: disable=W0212 + ) self.assertEqual(placeholder_result.content, msg.content) - self.assertFalse(placeholder_result.is_placeholder) + self.assertFalse( + placeholder_result._is_placeholder, # pylint: disable=W0212 + ) self.assertEqual(placeholder_result.id, 0) # check msg js_msg_result = result.serialize() @@ -106,15 +162,19 @@ def test_single_rpc_agent_server(self) -> None: def test_connect_to_an_existing_rpc_server(self) -> None: """test connecting to an existing server""" launcher = RpcAgentServerLauncher( - name="a", - host="127.0.0.1", # choose port automatically - local_mode=False, agent_class=DemoRpcAgent, + agent_kwargs={ + "name": "a", + }, + local_mode=False, + host="127.0.0.1", + port=12010, ) launcher.launch() agent_a = DemoRpcAgent( name="a", + ).to_dist( host="127.0.0.1", port=launcher.port, launch_server=False, @@ -125,6 +185,17 @@ def test_connect_to_an_existing_rpc_server(self) -> None: self.assertEqual(result.name, "a") # waiting for server self.assertEqual(result.content, msg.content) + # test dict usage + msg = Msg(name="System", content={"text": "hi world"}) + result = agent_a(msg) + # get name without waiting for the server + self.assertEqual(result["name"], "a") + # waiting for server + self.assertEqual(result["content"], msg.content) + # test to_str + msg = Msg(name="System", content={"text": "test"}) + result = agent_a(msg) + self.assertEqual(result.to_str(), "a: {'text': 'test'}") launcher.shutdown() def test_multi_rpc_agent(self) -> None: @@ -135,18 +206,21 @@ def test_multi_rpc_agent(self) -> None: port3 = 12003 agent_a = DemoRpcAgentAdd( name="a", + ).to_dist( host=host, port=port1, lazy_launch=False, ) agent_b = DemoRpcAgentAdd( name="b", + ).to_dist( host=host, port=port2, lazy_launch=False, ) agent_c = DemoRpcAgentAdd( name="c", + ).to_dist( host=host, port=port3, lazy_launch=False, @@ -192,6 +266,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent a agent_a = DemoRpcAgentAdd( name="a", + ).to_dist( host=host, port=port1, lazy_launch=False, @@ -203,6 +278,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent c agent_c = DemoRpcAgentAdd( name="c", + ).to_dist( host=host, port=port2, lazy_launch=False, @@ -217,24 +293,15 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: def test_msghub_compatibility(self) -> None: """test compatibility with msghub""" - port1 = 12001 - port2 = 12002 - port3 = 12003 agent_a = DemoRpcAgentWithMemory( name="a", - lazy_launch=False, - port=port1, - ) + ).to_dist() agent_b = DemoRpcAgentWithMemory( name="b", - lazy_launch=False, - port=port2, - ) + ).to_dist() agent_c = DemoRpcAgentWithMemory( name="c", - lazy_launch=False, - port=port3, - ) + ).to_dist() participants = [agent_a, agent_b, agent_c] annonuncement_msgs = [ Msg(name="System", content="Announcement 1"), @@ -258,3 +325,40 @@ def test_msghub_compatibility(self) -> None: self.assertEqual(x_c.content["mem_size"], 7) x_c = sequentialpipeline(participants, x_c) self.assertEqual(x_c.content["mem_size"], 10) + + def test_standalone_multiprocess_init(self) -> None: + """test compatibility with agentscope.init""" + monitor = MonitorFactory.get_monitor() + monitor.register("msg_num", quota=10) + host = "localhost" + # automatically + port1 = 12001 + port2 = 12002 + # rpc agent a + agent_a = DemoRpcAgentWithMonitor( + name="a", + ).to_dist( + host=host, + port=port1, + lazy_launch=False, + ) + # local agent b + agent_b = DemoRpcAgentWithMonitor( + name="b", + ).to_dist( + host=host, + port=port2, + lazy_launch=False, + ) + msg = Msg(name="System", content={"msg_num": 0}) + j = 0 + for _ in range(5): + msg = agent_a(msg) + self.assertEqual(msg["content"]["msg_num"], j + 1) + msg = agent_b(msg) + self.assertEqual(msg["content"]["msg_num"], j + 2) + j += 2 + msg = agent_a(msg) + self.assertTrue(msg["content"]["quota_exceeded"]) + msg = agent_b(msg) + self.assertTrue(msg["content"]["quota_exceeded"])