diff --git a/docs/sphinx_doc/source/tutorial/201-agent.md b/docs/sphinx_doc/source/tutorial/201-agent.md index e498f18d1..b7e648eba 100644 --- a/docs/sphinx_doc/source/tutorial/201-agent.md +++ b/docs/sphinx_doc/source/tutorial/201-agent.md @@ -66,7 +66,6 @@ Below is a table summarizing the functionality of some of the key agents availab | Agent Type | Description | Typical Use Cases | | -------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | `AgentBase` | Serves as the superclass for all agents, providing essential attributes and methods. | The foundation for building any custom agent. | -| `RpcAgentBase` | Executes remote procedure calls in distributed mode. | The foundation for building any custom agent in distributed mode. | | `DialogAgent` | Manages dialogues by understanding context and generating coherent responses. | Customer service bots, virtual assistants. | | `UserAgent` | Interacts with the user to collect input, generating messages that may include URLs or additional specifics based on required keys. | Collecting user input for agents | | *More to Come* | AgentScope is continuously expanding its pool with more specialized agents for diverse applications. | | diff --git a/docs/sphinx_doc/source/tutorial/208-distribute.md b/docs/sphinx_doc/source/tutorial/208-distribute.md index afe885f9a..4a9d36101 100644 --- a/docs/sphinx_doc/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/source/tutorial/208-distribute.md @@ -45,24 +45,23 @@ A-->C Please follow the steps below to deploy your application distributedly. -### Write your agents +### Convert your agents -To use distributed functionality, your agent class needs to inherit from `RpcAgentBase`. -`RpcAgentBase` requires several additional parameters compared to `AgentBase` during initialization. +`AgentBase` provided the `to_dist` method to convert the agent into a distributed version. +`to_dist` requires several parameters. - `host`: the hostname or IP address of the machine where the agent runs, defaults to `localhost`. - `port`: the port of this agent's RPC server, defaults to `80`. - `launch_server`: whether to launch an RPC server locally, defaults to `True`. - `local_mode`: set to `True` if all agents run on the same machine, defaults to `True`. - -But don't worry, `RpcAgentBase` shares the same interface as `AgentBase`, you only need to implement the `reply` method. You can even copy the `reply` method of an `AgentBase` subclass and use it in `RpcAgentBase`. +- `lazy_launch`: if set to `True`, only launch the server when the agent is called. ### Run in multi-process mode AgentScope supports deployment in multi-process mode, where each agent is a sub-process of the application's main process, and all agents run on the same machine. -The usage is exactly the same as single process mode, and you only need to ensure that the agents used are instances of `RpcAgentBase` subclasses. +The usage is exactly the same as single process mode, and you only need to call the `to_dist` method after initialization. -Suppose you have classes `A` and `B`, both of which inherit from `RpcAgentBase`. +Suppose you have classes `A` and `B`, both of which inherit from `AgentBase`. ```python # import packages @@ -70,11 +69,11 @@ Suppose you have classes `A` and `B`, both of which inherit from `RpcAgentBase`. a = A( name="A", ..., -) +).to_dist() b = B( name="B", ..., -) +).to_dist() x = None while x is None or x.content != 'exit': @@ -90,8 +89,11 @@ AgentScope also supports to run agents on multiple machines. In this case, you n # import packages server_a = RpcAgentServerLauncher( - name="A", - ..., + agent_class=A, + agent_kwargs={ + "name": "A" + ... + }, host=ip_a, port=12001, local_mode=False, @@ -107,8 +109,11 @@ Please make sure that the two machines can access each other using the IP addres # import packages server_b = RpcAgentServerLauncher( - name="B", - ..., + agent_class=B, + agent_kwargs={ + "name": "B", + ... + }, host=ip_b, port=12001, local_mode=False, @@ -124,14 +129,16 @@ Then, you can run the application's main process on any machine that can access a = A( name="A", - ..., + ... +).to_dist( host=ip_a, port=12001, launch_server=False, ) b = B( name="B", - ..., + ... +).to_dist( host=ip_b, port=12002, launch_server=False, diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index 987d38ec3..300b2f537 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -69,11 +69,11 @@ def setup_server(parsed_args: argparse.Namespace) -> None: host = getattr(parsed_args, f"{parsed_args.role}_host") port = getattr(parsed_args, f"{parsed_args.role}_port") server_launcher = RpcAgentServerLauncher( + agent_class=DialogAgent, + agent_kwargs=config, host=host, port=port, local_mode=False, - agent_class=DialogAgent, - **config, ) server_launcher.launch() server_launcher.wait_until_terminate() @@ -86,21 +86,21 @@ def run_main_process(parsed_args: argparse.Namespace) -> None: ) pro_agent = DialogAgent( name="Pro", - ).to_distributed( + ).to_dist( host=parsed_args.pro_host, port=parsed_args.pro_port, launch_server=False, ) con_agent = DialogAgent( name="Con", - ).to_distributed( + ).to_dist( host=parsed_args.con_host, port=parsed_args.con_port, launch_server=False, ) judge_agent = DialogAgent( name="Judge", - ).to_distributed( + ).to_dist( host=parsed_args.judge_host, port=parsed_args.judge_port, launch_server=False, diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index 915f82e29..ce81b67be 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -2,7 +2,6 @@ """ An example of distributed dialog """ import argparse -import time from loguru import logger import agentscope @@ -41,13 +40,14 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: agent_class=DialogAgent, agent_kwargs={ "name": "Assitant", - "host": assistant_host, - "port": assistant_port, "sys_prompt": "You are a helpful assistant.", "model": "gpt-3.5-turbo", "use_memory": True, "local_mode": False, }, + host=assistant_host, + port=assistant_port, + local_mode=False, ) assistant_server_launcher.launch() assistant_server_launcher.wait_until_terminate() @@ -60,7 +60,7 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: ) assistant_agent = DialogAgent( name="Assistant", - ).to_distributed( + ).to_dist( host=assistant_host, port=assistant_port, launch_server=False, @@ -77,7 +77,6 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: while not msg.content.endswith("exit"): msg = assistant_agent(msg) logger.chat(msg) - time.sleep(0.5) msg = user_agent(msg) diff --git a/notebook/distributed_debate.ipynb b/notebook/distributed_debate.ipynb index 09bc590ee..10a532515 100644 --- a/notebook/distributed_debate.ipynb +++ b/notebook/distributed_debate.ipynb @@ -93,19 +93,19 @@ " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.\",\n", - ").to_distributed()\n", + ").to_dist()\n", "con_agent = DialogAgent(\n", " name=\"Con\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.\",\n", - ").to_distributed()\n", + ").to_dist()\n", "judge_agent = DialogAgent(\n", " name=\"Judge\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.\"\n", - ").to_distributed()" + ").to_dist()" ] }, { diff --git a/notebook/distributed_dialog.ipynb b/notebook/distributed_dialog.ipynb index 46b1e3f78..07a6425fe 100644 --- a/notebook/distributed_dialog.ipynb +++ b/notebook/distributed_dialog.ipynb @@ -91,7 +91,7 @@ " sys_prompt=\"You are a helpful assistant.\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", - ").to_distributed()\n", + ").to_dist()\n", "user_agent = UserAgent(\n", " name=\"User\",\n", ")" @@ -119,7 +119,6 @@ "while not msg.content.endswith(\"exit\"):\n", " msg = assistant_agent(msg)\n", " logger.chat(msg)\n", - " time.sleep(0.5)\n", " msg = user_agent(msg)" ] } diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index 29e0b3d5f..463650f83 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -182,7 +182,7 @@ def _broadcast_to_audience(self, x: dict) -> None: for agent in self._audience: agent.observe(x) - def to_distributed( + def to_dist( self, host: str = "localhost", port: int = None, @@ -192,7 +192,27 @@ def to_distributed( local_mode: bool = True, lazy_launch: bool = True, ) -> AgentBase: - """Convert current agent instance into a distributed version""" + """Convert current agent instance into a distributed version. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + local_mode (`bool`, defaults to `True`): + Whether the started rpc server only listens to local + requests. + lazy_launch (`bool`, defaults to `True`): + Only launch the server when the agent is called. + + Returns: + `AgentBase`: the wrapped agent instance with distributed + functionality + """ from .rpc_agent import RpcAgent if issubclass(self.__class__, RpcAgent): diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 124e46471..746fe6303 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -71,9 +71,9 @@ class RpcAgent(AgentBase): def __init__( self, + name: str, agent_class: Type[AgentBase], agent_configs: dict, - name: str, host: str = "localhost", port: int = None, max_pool_size: int = 100, @@ -82,6 +82,28 @@ def __init__( local_mode: bool = True, lazy_launch: bool = True, ) -> None: + """Initialize a RpcAgent instance. + + Args: + agent_class (`Type[AgentBase]`, defaults to `None`): + The AgentBase subclass encapsulated by this wrapper. + agent_configs (`dict`): The args used to initialize the + agent_class. + name (`str`): Name of the agent. + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + local_mode (`bool`, defaults to `True`): + Whether the started rpc server only listens to local + requests. + lazy_launch (`bool`, defaults to `True`): + Only launch the server when the agent is called. + """ super().__init__(name=name) self.host = host self.port = port @@ -156,10 +178,18 @@ def setup_rcp_agent_server( """Setup gRPC server rpc agent. Args: + agent_class (`Type[AgentBase]`): + A subclass of AgentBase. + agent_args (`tuple`): The args tuple used to initialize the + agent_class. + agent_kwargs (`dict`): The args dict used to initialize the + agent_class. + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. port (`int`): The socket port monitored by grpc server. - servicer_class (`Type[RpcAgentServicer]`): - An implementation of RpcAgentBaseService. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. start_event (`EventClass`, defaults to `None`): An Event instance used to determine whether the child process has been started. @@ -168,12 +198,14 @@ def setup_rcp_agent_server( process has been stopped. pipe (`int`, defaults to `None`): A pipe instance used to pass the actual port of the server. - max_workers (`int`, defaults to `4`): - max worker number of grpc server. local_mode (`bool`, defaults to `None`): Only listen to local requests. - init_settings (`dict`, defaults to `None`): - Init settings. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + max_workers (`int`, defaults to `4`): + max worker number of grpc server. """ if init_settings is not None: @@ -274,30 +306,16 @@ def __init__( max_pool_size: int = 100, max_timeout_seconds: int = 1800, local_mode: bool = True, - **kwargs: Any, ) -> None: """Init a rpc agent server launcher. Args: - name (`str`): - The name of the agent. - config (`Optional[dict]`): - The configuration of the agent, if provided, the agent will - be initialized from the config rather than the other - parameters. - sys_prompt (`Optional[str]`): - The system prompt of the agent, which can be passed by args - or hard-coded in the agent. - model (`Optional[Union[Callable[..., Any], str]]`, defaults to - None): - The callable model object or the model name, which is used to - load model from configuration. - use_memory (`bool`, defaults to `True`): - Whether the agent has memory. - memory_config (`Optional[dict]`): - The config of memory. - agent_class (`Type[RpcAgentBase]`, defaults to `None`): - The RpcAgentBase class used in rpc agent server as a servicer. + agent_class (`Type[AgentBase]`, defaults to `None`): + The AgentBase subclass encapsulated by this wrapper. + agent_args (`tuple`): The args tuple used to initialize the + agent_class. + agent_kwargs (`dict`): The args dict used to initialize the + agent_class. host (`str`, defaults to `"localhost"`): Hostname of the rpc agent server. port (`int`, defaults to `None`): @@ -310,7 +328,6 @@ def __init__( Whether the started rpc server only listens to local requests. """ - # TODO: update docstring self.agent_class = agent_class self.agent_args = agent_args self.agent_kwargs = agent_kwargs @@ -322,7 +339,6 @@ def __init__( self.server = None self.stop_event = None self.parent_con = None - self.kwargs = kwargs def launch(self) -> None: """launch a local rpc agent server.""" @@ -420,8 +436,7 @@ def get_task_id(self) -> int: return self.task_id_counter def call_func(self, request: RpcMsg, _: ServicerContext) -> RpcMsg: - """Call the specific servicer function. - """ + """Call the specific servicer function.""" if hasattr(self, request.target_func): return getattr(self, request.target_func)(request) else: diff --git a/src/agentscope/agents/user_agent.py b/src/agentscope/agents/user_agent.py index ac2339166..e8de167ac 100644 --- a/src/agentscope/agents/user_agent.py +++ b/src/agentscope/agents/user_agent.py @@ -46,15 +46,15 @@ def reply( x (`dict`, defaults to `None`): A dictionary containing initial data to be added to memory. Defaults to None. - required_keys (`Optional[Union[list[str], str]]`, defaults to - None): + required_keys \ + (`Optional[Union[list[str], str]]`, defaults to `None`): Strings that requires user to input, which will be used as the key of the returned dict. Defaults to None. Returns: - dict: A dictionary representing the message object that contains - the user's input and any additional details. This is also - stored in the object's memory. + `dict`: A dictionary representing the message object that contains + the user's input and any additional details. This is also + stored in the object's memory. """ if x is not None: self.memory.add(x) diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 5068c9bf5..be8279657 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -115,7 +115,7 @@ def test_single_rpc_agent_server(self) -> None: port = 12001 agent_a = DemoRpcAgent( name="a", - ).to_distributed( + ).to_dist( host=host, port=port, ) @@ -169,11 +169,12 @@ def test_connect_to_an_existing_rpc_server(self) -> None: }, local_mode=False, host="127.0.0.1", + port=12010, ) launcher.launch() agent_a = DemoRpcAgent( name="a", - ).to_distributed( + ).to_dist( host="127.0.0.1", port=launcher.port, launch_server=False, @@ -205,21 +206,21 @@ def test_multi_rpc_agent(self) -> None: port3 = 12003 agent_a = DemoRpcAgentAdd( name="a", - ).to_distributed( + ).to_dist( host=host, port=port1, lazy_launch=False, ) agent_b = DemoRpcAgentAdd( name="b", - ).to_distributed( + ).to_dist( host=host, port=port2, lazy_launch=False, ) agent_c = DemoRpcAgentAdd( name="c", - ).to_distributed( + ).to_dist( host=host, port=port3, lazy_launch=False, @@ -265,7 +266,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent a agent_a = DemoRpcAgentAdd( name="a", - ).to_distributed( + ).to_dist( host=host, port=port1, lazy_launch=False, @@ -277,7 +278,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent c agent_c = DemoRpcAgentAdd( name="c", - ).to_distributed( + ).to_dist( host=host, port=port2, lazy_launch=False, @@ -294,13 +295,13 @@ def test_msghub_compatibility(self) -> None: """test compatibility with msghub""" agent_a = DemoRpcAgentWithMemory( name="a", - ).to_distributed() + ).to_dist() agent_b = DemoRpcAgentWithMemory( name="b", - ).to_distributed() + ).to_dist() agent_c = DemoRpcAgentWithMemory( name="c", - ).to_distributed() + ).to_dist() participants = [agent_a, agent_b, agent_c] annonuncement_msgs = [ Msg(name="System", content="Announcement 1"), @@ -336,7 +337,7 @@ def test_standalone_multiprocess_init(self) -> None: # rpc agent a agent_a = DemoRpcAgentWithMonitor( name="a", - ).to_distributed( + ).to_dist( host=host, port=port1, lazy_launch=False, @@ -344,7 +345,7 @@ def test_standalone_multiprocess_init(self) -> None: # local agent b agent_b = DemoRpcAgentWithMonitor( name="b", - ).to_distributed( + ).to_dist( host=host, port=port2, lazy_launch=False,