From 010dc67a77eff44369e71677da4f45f641acacba Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Mon, 22 Jan 2024 20:43:56 +0800 Subject: [PATCH 01/15] rename attrs in placeholder message --- src/agentscope/message.py | 50 ++++++++++++++++++++------------------- tests/rpc_agent_test.py | 16 +++++++++---- 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/agentscope/message.py b/src/agentscope/message.py index 8aa675f68..c0f0e0911 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -180,14 +180,18 @@ def serialize(self) -> str: class PlaceholderMessage(MessageBase): """A placeholder for the return message of RpcAgent.""" + PLACEHOLDER_ATTRS = { + "_host", + "_port", + "_client", + "_task_id", + "_is_placeholder", + } + LOCAL_ATTRS = { - "host", - "port", - "client", - "task_id", - "is_placeholder", "name", "timestamp", + *PLACEHOLDER_ATTRS, } def __init__( @@ -232,14 +236,13 @@ def __init__( timestamp=timestamp, **kwargs, ) - # todo: avoid attribute name duplication - self.host = host - self.port = port + self._host = host + self._port = port - self.client = RpcAgentClient(self.host, self.port) - self.task_id = task_id + self._client = RpcAgentClient(host, port) + self._task_id = task_id # placeholder indicates whether the real message is still in rpc server - self.is_placeholder = True + self._is_placeholder = True def __getattr__(self, __name: str) -> Any: """Get attribute value from PlaceholderMessage. Get value from rpc @@ -252,49 +255,48 @@ def __getattr__(self, __name: str) -> Any: """ if ( __name not in PlaceholderMessage.LOCAL_ATTRS - and self.is_placeholder + and self._is_placeholder ): self.update_value() return MessageBase.__getattr__(self, __name) def to_str(self) -> str: - if self.is_placeholder: - return f"{self.name}: [message from {self.host}:{self.port}]" + if self._is_placeholder: + return f"{self.name}: [message from {self._host}:{self._port}]" else: return f"{self.name}: {self.content}" def update_value(self) -> MessageBase: """Get attribute values from rpc agent server immediately""" - if self.is_placeholder: + if self._is_placeholder: # retrieve real message from rpc agent server - result = self.client.call_func( + result = self._client.call_func( func_name="_get", - value=json.dumps({"task_id": self.task_id}), + value=json.dumps({"task_id": self._task_id}), ) self.update(deserialize(result)) # the actual value has been updated, not a placeholder any more - self.is_placeholder = False + self._is_placeholder = False return self def serialize(self) -> str: - if self.is_placeholder: + if self._is_placeholder: return json.dumps( { "__type": "PlaceholderMessage", "name": self.name, "content": None, "timestamp": self.timestamp, - "host": self.host, - "port": self.port, - "task_id": self.task_id, + "host": self._host, + "port": self._port, + "task_id": self._task_id, }, ) else: states = { k: v for k, v in self.items() - if k - not in ["host", "port", "client", "task_id", "is_placeholder"] + if k not in PlaceholderMessage.PLACEHOLDER_ATTRS } states["__type"] = "Msg" return json.dumps(states) diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 2ffaac43e..8319ba91d 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -80,18 +80,24 @@ def test_single_rpc_agent_server(self) -> None: # get name without waiting for the server self.assertEqual(result.name, "a") js_placeholder_result = result.serialize() - self.assertTrue(result.is_placeholder) + self.assertTrue(result._is_placeholder) # pylint: disable=W0212 placeholder_result = deserialize(js_placeholder_result) self.assertTrue(isinstance(placeholder_result, PlaceholderMessage)) self.assertEqual(placeholder_result.name, "a") - self.assertTrue(placeholder_result.is_placeholder) + self.assertTrue( + placeholder_result._is_placeholder, # pylint: disable=W0212 + ) # wait to get content self.assertEqual(result.content, msg.content) - self.assertFalse(result.is_placeholder) + self.assertFalse(result._is_placeholder) # pylint: disable=W0212 self.assertEqual(result.id, 0) - self.assertTrue(placeholder_result.is_placeholder) + self.assertTrue( + placeholder_result._is_placeholder, # pylint: disable=W0212 + ) self.assertEqual(placeholder_result.content, msg.content) - self.assertFalse(placeholder_result.is_placeholder) + self.assertFalse( + placeholder_result._is_placeholder, # pylint: disable=W0212 + ) self.assertEqual(placeholder_result.id, 0) # check msg js_msg_result = result.serialize() From c4a86dcc1c0465eb893cb8b4402560e1f4fab23a Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Mon, 22 Jan 2024 20:59:05 +0800 Subject: [PATCH 02/15] update placeholder message before reply --- src/agentscope/agents/rpc_agent.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index a2aeff030..7adb8eb44 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -275,6 +275,9 @@ def process_tasks(self) -> None: """Task processing thread.""" while True: task_id, task_msg = self.task_queue.get() + # TODO: optimize this and avoid blocking + if isinstance(task_msg, PlaceholderMessage): + task_msg.update_value() result = self.reply(task_msg) self.result_pool[task_id] = result From 4d6e036dad050b5fd1e4186c495342dde97f8f66 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Tue, 23 Jan 2024 11:49:35 +0800 Subject: [PATCH 03/15] refactor placeholder message --- examples/distributed/distributed_debate.py | 8 +++--- examples/distributed/distributed_dialog.py | 6 ++-- notebook/distributed_debate.ipynb | 10 +++---- notebook/distributed_dialog.ipynb | 2 +- src/agentscope/message.py | 32 ++++++++++++++-------- tests/rpc_agent_test.py | 13 +++++++++ 6 files changed, 46 insertions(+), 25 deletions(-) diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index f07e620fa..4fcc90102 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -108,13 +108,13 @@ def run_main_process(parsed_args: argparse.Namespace) -> None: with msghub(participants=participants, announcement=hint): for _ in range(3): pro_resp = pro_agent(x) - logger.chat(pro_resp.update_value()) + logger.chat(pro_resp) con_resp = con_agent(pro_resp) - logger.chat(con_resp.update_value()) + logger.chat(con_resp) x = judge_agent(con_resp) - logger.chat(x.update_value()) + logger.chat(x) x = judge_agent(x) - logger.chat(x.update_value()) + logger.chat(x) if __name__ == "__main__": diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index 83e7b7fd1..e9b67b58b 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -35,7 +35,7 @@ def parse_args() -> argparse.Namespace: def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: """Set up assistant rpc server""" agentscope.init( - model_configs="configs/model_configs.json", + model_configs="configs/model_configs_pxc.json", ) assistant_server_launcher = RpcAgentServerLauncher( name="Assitant", @@ -54,7 +54,7 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: def run_main_process(assistant_host: str, assistant_port: int) -> None: """Run dialog main process""" agentscope.init( - model_configs="configs/model_configs.json", + model_configs="configs/model_configs_pxc.json", ) assistant_agent = RpcDialogAgent( name="Assistant", @@ -73,7 +73,7 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: msg = user_agent() while not msg.content.endswith("exit"): msg = assistant_agent(msg) - logger.chat(msg.update_value()) + logger.chat(msg) time.sleep(0.5) msg = user_agent(msg) diff --git a/notebook/distributed_debate.ipynb b/notebook/distributed_debate.ipynb index 08ded268f..a97b8d1b4 100644 --- a/notebook/distributed_debate.ipynb +++ b/notebook/distributed_debate.ipynb @@ -153,13 +153,13 @@ "with msghub(participants=participants, announcement=hint):\n", " for _ in range(3):\n", " pro_resp = pro_agent(x)\n", - " logger.chat(pro_resp.update_value())\n", + " logger.chat(pro_resp)\n", " con_resp = con_agent(pro_resp)\n", - " logger.chat(con_resp.update_value())\n", + " logger.chat(con_resp)\n", " x = judge_agent(con_resp)\n", - " logger.chat(x.update_value())\n", + " logger.chat(x)\n", " x = judge_agent(x)\n", - " logger.chat(x.update_value())\n" + " logger.chat(x)\n" ] }, { @@ -187,7 +187,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.9" + "version": "3.1.0" } }, "nbformat": 4, diff --git a/notebook/distributed_dialog.ipynb b/notebook/distributed_dialog.ipynb index 83a443311..778ab37d4 100644 --- a/notebook/distributed_dialog.ipynb +++ b/notebook/distributed_dialog.ipynb @@ -119,7 +119,7 @@ "msg = user_agent()\n", "while not msg.content.endswith(\"exit\"):\n", " msg = assistant_agent(msg)\n", - " logger.chat(msg.update_value())\n", + " logger.chat(msg)\n", " time.sleep(0.5)\n", " msg = user_agent(msg)" ] diff --git a/src/agentscope/message.py b/src/agentscope/message.py index c0f0e0911..69e06ed7e 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -236,13 +236,16 @@ def __init__( timestamp=timestamp, **kwargs, ) + # placeholder indicates whether the real message is still in rpc server + self._is_placeholder = True self._host = host self._port = port - self._client = RpcAgentClient(host, port) self._task_id = task_id - # placeholder indicates whether the real message is still in rpc server - self._is_placeholder = True + + def __is_local(self, key: Any) -> bool: + return key in PlaceholderMessage.LOCAL_ATTRS \ + or not self._is_placeholder def __getattr__(self, __name: str) -> Any: """Get attribute value from PlaceholderMessage. Get value from rpc @@ -251,20 +254,25 @@ def __getattr__(self, __name: str) -> Any: Args: __name (`str`): Attribute name. - """ - if ( - __name not in PlaceholderMessage.LOCAL_ATTRS - and self._is_placeholder - ): + if not self.__is_local(__name): self.update_value() return MessageBase.__getattr__(self, __name) + def __getitem__(self, __key: Any) -> Any: + """Get item value from PlaceholderMessage. Get value from rpc + agent server if necessary. + + Args: + __key (`Any`): + Item name. + """ + if not self.__is_local(__key): + self.update_value() + return MessageBase.__getitem__(self, __key) + def to_str(self) -> str: - if self._is_placeholder: - return f"{self.name}: [message from {self._host}:{self._port}]" - else: - return f"{self.name}: {self.content}" + return f"{self.name}: {self.content}" def update_value(self) -> MessageBase: """Get attribute values from rpc agent server immediately""" diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 8319ba91d..ac951ba6e 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -79,11 +79,13 @@ def test_single_rpc_agent_server(self) -> None: result = agent_a(msg) # get name without waiting for the server self.assertEqual(result.name, "a") + self.assertEqual(result["name"], "a") js_placeholder_result = result.serialize() self.assertTrue(result._is_placeholder) # pylint: disable=W0212 placeholder_result = deserialize(js_placeholder_result) self.assertTrue(isinstance(placeholder_result, PlaceholderMessage)) self.assertEqual(placeholder_result.name, "a") + self.assertEqual(placeholder_result["name"], "a") self.assertTrue( placeholder_result._is_placeholder, # pylint: disable=W0212 ) @@ -131,6 +133,17 @@ def test_connect_to_an_existing_rpc_server(self) -> None: self.assertEqual(result.name, "a") # waiting for server self.assertEqual(result.content, msg.content) + # test dict usage + msg = Msg(name="System", content={"text": "hi world"}) + result = agent_a(msg) + # get name without waiting for the server + self.assertEqual(result["name"], "a") + # waiting for server + self.assertEqual(result["content"], msg.content) + # test to_str + msg = Msg(name="System", content={"text": "test"}) + result = agent_a(msg) + self.assertEqual(result.to_str(), "a: {'text': 'test'}") launcher.shutdown() def test_multi_rpc_agent(self) -> None: From 6c61af9e4d97e4f7bbd6f13f3dd55edf34536a2f Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Tue, 23 Jan 2024 18:13:30 +0800 Subject: [PATCH 04/15] refactor init method of rpc agent and add tests --- examples/distributed/distributed_dialog.py | 4 +- src/agentscope/_init.py | 106 ++++++++++++++------- src/agentscope/agents/rpc_agent.py | 10 +- src/agentscope/constants.py | 2 +- src/agentscope/message.py | 5 +- src/agentscope/models/__init__.py | 2 +- src/agentscope/utils/logging_utils.py | 13 ++- tests/rpc_agent_test.py | 85 ++++++++++++++++- 8 files changed, 174 insertions(+), 53 deletions(-) diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index e9b67b58b..ed1b62368 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -35,7 +35,7 @@ def parse_args() -> argparse.Namespace: def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: """Set up assistant rpc server""" agentscope.init( - model_configs="configs/model_configs_pxc.json", + model_configs="configs/model_configs.json", ) assistant_server_launcher = RpcAgentServerLauncher( name="Assitant", @@ -54,7 +54,7 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: def run_main_process(assistant_host: str, assistant_port: int) -> None: """Run dialog main process""" agentscope.init( - model_configs="configs/model_configs_pxc.json", + model_configs="configs/model_configs.json", ) assistant_agent = RpcDialogAgent( name="Assistant", diff --git a/src/agentscope/_init.py b/src/agentscope/_init.py index 6f75004b5..af8b0b597 100644 --- a/src/agentscope/_init.py +++ b/src/agentscope/_init.py @@ -4,7 +4,6 @@ import os import shutil from typing import Optional, Union, Sequence - from agentscope import agents from .agents import AgentBase from ._runtime import Runtime @@ -15,7 +14,7 @@ from .constants import _DEFAULT_DIR from .constants import _DEFAULT_LOG_LEVEL - +# init setting _INIT_SETTINGS = {} @@ -60,19 +59,25 @@ def init( cover the required arguments to initialize a specific agent object, otherwise the default values will be used. """ - - # TODO: add support to set quota for monitor - - # Load model configs if needed - if model_configs is not None: - read_model_configs(model_configs) - - # Init the runtime - Runtime.project = project - Runtime.name = name - - # Init file manager - file_manager.init(save_dir, save_api_invoke) + init_process( + model_configs=model_configs, + project=project, + name=name, + save_dir=save_dir, + save_api_invoke=save_api_invoke, + save_log=save_log, + logger_level=logger_level, + ) + + # save init settings for subprocess + _INIT_SETTINGS["model_configs"] = model_configs + _INIT_SETTINGS["project"] = project + _INIT_SETTINGS["name"] = name + _INIT_SETTINGS["runtime_id"] = Runtime.runtime_id + _INIT_SETTINGS["save_dir"] = save_dir + _INIT_SETTINGS["save_api_invoke"] = save_api_invoke + _INIT_SETTINGS["save_log"] = save_log + _INIT_SETTINGS["logger_level"] = logger_level # Save code if needed if save_code: @@ -83,13 +88,6 @@ def init( file_abs = os.path.join(cur_dir, filename) shutil.copy(file_abs, str(file_manager.dir_code)) - # Set logger and level - dir_log = str(file_manager.dir_log) if save_log else None - setup_logger(dir_log, logger_level) - - # Set monitor - _ = MonitorFactory.get_monitor(db_path=file_manager.path_db) - # Load config and init agent by configs if agent_configs is not None: if isinstance(agent_configs, str): @@ -108,16 +106,58 @@ def init( agent = agent_cls(**agent_args) agent_objs.append(agent) return agent_objs + return [] - # save init settings globally, will be used to init child process - _INIT_SETTINGS["model_configs"] = model_configs - _INIT_SETTINGS["project"] = project - _INIT_SETTINGS["name"] = name - _INIT_SETTINGS["save_dir"] = save_dir - _INIT_SETTINGS["save_log"] = save_log - _INIT_SETTINGS["save_code"] = save_code - _INIT_SETTINGS["save_api_invoke"] = save_api_invoke - _INIT_SETTINGS["logger_level"] = logger_level - _INIT_SETTINGS["agent_configs"] = agent_configs - return [] +def init_process( + model_configs: Optional[Union[dict, str, list]] = None, + project: Optional[str] = None, + name: Optional[str] = None, + runtime_id: Optional[str] = None, + save_dir: str = _DEFAULT_DIR, + save_api_invoke: bool = False, + save_log: bool = False, + logger_level: LOG_LEVEL = _DEFAULT_LOG_LEVEL, +) -> None: + """A entry to initialize the package in a process. + + Args: + project (`Optional[str]`, defaults to `None`): + The project name, which is used to identify the project. + name (`Optional[str]`, defaults to `None`): + The name for runtime, which is used to identify this runtime. + runtime_id (`Optional[str]`, defaults to `None`): + The id for runtime, which is used to identify this runtime. + save_dir (`str`, defaults to `./runs`): + The directory to save logs, files, codes, and api invocations. + If `dir` is `None`, when saving logs, files, codes, and api + invocations, the default directory `./runs` will be created. + save_api_invoke (`bool`, defaults to `False`): + Whether to save api invocations locally, including model and web + search invocation. + model_configs (`Optional[Sequence]`, defaults to `None`): + A sequence of pre-init model configs. + save_log (`bool`, defaults to `False`): + Whether to save logs locally. + logger_level (`LOG_LEVEL`, defaults to `"INFO"`): + The logging level of logger. + """ + # Load model configs if needed + if model_configs is not None: + read_model_configs(model_configs) + + # Init the runtime + Runtime.project = project + Runtime.name = name + if runtime_id is not None: + Runtime.runtime_id = runtime_id + + # Init file manager + file_manager.init(save_dir, save_api_invoke) + + # Init monitor + _ = MonitorFactory.get_monitor(db_path=file_manager.path_db) + + # Init logger + dir_log = str(file_manager.dir_log) if save_log else None + setup_logger(dir_log, logger_level) diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 7adb8eb44..b7dd62d63 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -16,6 +16,7 @@ from typing import Type from typing import Sequence from concurrent import futures +from loguru import logger try: import grpc @@ -29,15 +30,13 @@ except ImportError: ExpiringDict = None -from agentscope._init import _INIT_SETTINGS -from agentscope._init import init +from agentscope._init import init_process, _INIT_SETTINGS from agentscope.agents.agent import AgentBase from agentscope.message import MessageBase from agentscope.message import Msg from agentscope.message import PlaceholderMessage from agentscope.message import deserialize from agentscope.message import serialize -from agentscope.utils.logging_utils import logger from agentscope.rpc import ( RpcAgentClient, RpcMsg, @@ -144,6 +143,7 @@ def __init__( # prohibit servicer object from launching a new server assert not (is_servicer and launch_server) # launch_server is True only in the main process + self.server_launcher = None if launch_server: self.server_launcher = RpcAgentServerLauncher( name=name, @@ -164,8 +164,6 @@ def __init__( if not lazy_launch: self.server_launcher.launch() self.client = RpcAgentClient(host=self.host, port=self.port) - else: - self.server_launcher = None # is_servicer is True only in the rpc server process if is_servicer: self.result_pool = ExpiringDict( @@ -369,7 +367,7 @@ def setup_rcp_agent_server( """ if init_settings is not None: - init(**init_settings) + init_process(**init_settings) server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) servicer = servicer_class(**kwargs) add_RpcAgentServicer_to_server(servicer, server) diff --git a/src/agentscope/constants.py b/src/agentscope/constants.py index 5cc9fea61..2b557d75a 100644 --- a/src/agentscope/constants.py +++ b/src/agentscope/constants.py @@ -4,7 +4,7 @@ from enum import IntEnum PACKAGE_NAME = "agentscope" -MSG_TOKEN = f"<{PACKAGE_NAME}_msg>" +MSG_TOKEN = f"[{PACKAGE_NAME}_msg]" # default values diff --git a/src/agentscope/message.py b/src/agentscope/message.py index 69e06ed7e..ae0d84672 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -244,8 +244,9 @@ def __init__( self._task_id = task_id def __is_local(self, key: Any) -> bool: - return key in PlaceholderMessage.LOCAL_ATTRS \ - or not self._is_placeholder + return ( + key in PlaceholderMessage.LOCAL_ATTRS or not self._is_placeholder + ) def __getattr__(self, __name: str) -> Any: """Get attribute value from PlaceholderMessage. Get value from rpc diff --git a/src/agentscope/models/__init__.py b/src/agentscope/models/__init__.py index 4633d86a4..205f2607f 100644 --- a/src/agentscope/models/__init__.py +++ b/src/agentscope/models/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ Import modules in models package.""" import json -from typing import Union +from typing import Union, Sequence from loguru import logger diff --git a/src/agentscope/utils/logging_utils.py b/src/agentscope/utils/logging_utils.py index 2bb82a562..47813b898 100644 --- a/src/agentscope/utils/logging_utils.py +++ b/src/agentscope/utils/logging_utils.py @@ -174,15 +174,15 @@ def setup_logger( `"DEBUG"`, `"INFO"`, `"SUCCESS"`, `"WARNING"`, `"ERROR"`, `"CRITICAL"`. """ - # redirect stderr to record errors in logging - sys.stderr = _Stream() - # avoid reinit in subprocess if hasattr(logger, "chat"): return + # redirect stderr to record errors in logging + sys.stderr = _Stream() + # add chat function for logger - logger.level("CHAT", no=20, color="") + logger.level("CHAT", no=21, color="") logger.chat = _chat # set logging level @@ -193,11 +193,10 @@ def setup_logger( if path_log is not None: if not os.path.exists(path_log): os.makedirs(path_log) - - path_log_file = os.path.join(path_log, "file_{time}.log") + path_log_file = os.path.join(path_log, "all.log") path_log_file_only_chat = os.path.join( path_log, - "file_{time}.log.chat", + "chat.log", ) # save all logging into file diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index ac951ba6e..ce23609df 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -4,7 +4,10 @@ """ import unittest import time +import shutil +from loguru import logger +import agentscope from agentscope.agents import AgentBase from agentscope.agents import RpcAgentBase from agentscope.agents.rpc_agent import RpcAgentServerLauncher @@ -13,6 +16,7 @@ from agentscope.message import deserialize from agentscope.msghub import msghub from agentscope.pipelines import sequentialpipeline +from agentscope.utils import MonitorFactory, QuotaExceededError class DemoRpcAgent(RpcAgentBase): @@ -62,9 +66,50 @@ def reply(self, x: dict = None) -> dict: return msg +class DemoRpcAgentWithMonitor(RpcAgentBase): + """A demo Rpc agent that use monitor""" + + def reply(self, x: dict = None) -> dict: + monitor = MonitorFactory.get_monitor() + try: + monitor.update({"msg_num": 1}) + except QuotaExceededError: + x.content["quota_exceeded"] = True + logger.chat( + { + "name": self.name, + "content": "quota_exceeded", + }, + ) + return x + x.content["msg_num"] = monitor.get_value("msg_num") + logger.chat( + { + "name": self.name, + "content": f"msg_num {x.content['msg_num']}", + }, + ) + time.sleep(0.2) + return x + + class BasicRpcAgentTest(unittest.TestCase): "Test cases for Rpc Agent" + def setUp(self) -> None: + """Init for Rpc Agent Test""" + agentscope.init( + project="test", + name="rpc_agent", + save_dir="./test_runs", + save_log=True, + ) + + def tearDown(self) -> None: + MonitorFactory._instance = None # pylint: disable=W0212 + logger.remove() + shutil.rmtree("./test_runs") + def test_single_rpc_agent_server(self) -> None: """test setup a single rpc agent""" host = "localhost" @@ -85,7 +130,10 @@ def test_single_rpc_agent_server(self) -> None: placeholder_result = deserialize(js_placeholder_result) self.assertTrue(isinstance(placeholder_result, PlaceholderMessage)) self.assertEqual(placeholder_result.name, "a") - self.assertEqual(placeholder_result["name"], "a") + self.assertEqual( + placeholder_result["name"], # type: ignore [call-overload] + "a", + ) self.assertTrue( placeholder_result._is_placeholder, # pylint: disable=W0212 ) @@ -277,3 +325,38 @@ def test_msghub_compatibility(self) -> None: self.assertEqual(x_c.content["mem_size"], 7) x_c = sequentialpipeline(participants, x_c) self.assertEqual(x_c.content["mem_size"], 10) + + def test_standalone_multiprocess_init(self) -> None: + """test compatibility with agentscope.init""" + monitor = MonitorFactory.get_monitor() + monitor.register("msg_num", quota=10) + host = "localhost" + # automatically + port1 = 12001 + port2 = 12002 + # rpc agent a + agent_a = DemoRpcAgentWithMonitor( + name="a", + host=host, + port=port1, + lazy_launch=False, + ) + # local agent b + agent_b = DemoRpcAgentWithMonitor( + name="b", + host=host, + port=port2, + lazy_launch=False, + ) + msg = Msg(name="System", content={"msg_num": 0}) + j = 0 + for _ in range(5): + msg = agent_a(msg) + self.assertEqual(msg["content"]["msg_num"], j + 1) + msg = agent_b(msg) + self.assertEqual(msg["content"]["msg_num"], j + 2) + j += 2 + msg = agent_a(msg) + self.assertTrue(msg["content"]["quota_exceeded"]) + msg = agent_b(msg) + self.assertTrue(msg["content"]["quota_exceeded"]) From 5a18b81acc892a20145a69159b9f80bcabf5fde7 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Wed, 24 Jan 2024 15:49:28 +0800 Subject: [PATCH 05/15] standalone mutliprocess mode without ports --- src/agentscope/agents/rpc_agent.py | 138 +++++++++++++++++------------ tests/rpc_agent_test.py | 9 -- 2 files changed, 82 insertions(+), 65 deletions(-) diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index b7dd62d63..615d74962 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -4,6 +4,7 @@ from multiprocessing import Process from multiprocessing import Event from multiprocessing.synchronize import Event as EventClass +from multiprocessing import Pipe import socket import threading import time @@ -80,7 +81,7 @@ def __init__( use_memory: bool = True, memory_config: Optional[dict] = None, host: str = "localhost", - port: int = 80, + port: int = 12000, max_pool_size: int = 100, max_timeout_seconds: int = 1800, launch_server: bool = True, @@ -110,7 +111,7 @@ def __init__( The config of memory. host (`str`, defaults to "localhost"): Hostname of the rpc agent server. - port (`int`, defaults to `80`): + port (`int`, defaults to `None`): Port of the rpc agent server. max_pool_size (`int`, defaults to `100`): The max number of task results that the server can @@ -159,11 +160,9 @@ def __init__( max_timeout_seconds=max_timeout_seconds, local_mode=local_mode, ) - self.port = self.server_launcher.port self.client = None if not lazy_launch: - self.server_launcher.launch() - self.client = RpcAgentClient(host=self.host, port=self.port) + self._launch_server() # is_servicer is True only in the rpc server process if is_servicer: self.result_pool = ExpiringDict( @@ -180,6 +179,13 @@ def __init__( if not launch_server and not is_servicer: self.client = RpcAgentClient(host=self.host, port=self.port) + def _launch_server(self) -> None: + """Launch a rpc server and update the port and the client + """ + self.server_launcher.launch() + self.port = self.server_launcher.port + self.client = RpcAgentClient(host=self.host, port=self.port) + def get_task_id(self) -> int: """Get the auto-increment task id.""" with self.task_id_lock: @@ -294,8 +300,7 @@ def observe(self, x: Union[dict, Sequence[dict]]) -> None: The input to be observed. """ if self.client is None: - self.server_launcher.launch() - self.client = RpcAgentClient(host=self.host, port=self.port) + self._launch_server() self.client.call_func( func_name="_observe", value=serialize(x), # type: ignore [arg-type] @@ -312,8 +317,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> dict: if x is not None: assert isinstance(x, MessageBase) if self.client is None: - self.server_launcher.launch() - self.client = RpcAgentClient(host=self.host, port=self.port) + self._launch_server() res_msg = self.client.call_func( func_name="_call", value=x.serialize() if x is not None else "", @@ -340,6 +344,7 @@ def setup_rcp_agent_server( servicer_class: Type[RpcAgentServicer], start_event: EventClass = None, stop_event: EventClass = None, + pipe: int = None, max_workers: int = 4, local_mode: bool = True, init_settings: dict = None, @@ -358,6 +363,8 @@ def setup_rcp_agent_server( stop_event (`EventClass`, defaults to `None`): The stop Event instance used to determine whether the child process has been stopped. + pipe (`int`, defaults to `None`): + A pipe instance used to pass the actual port of the server. max_workers (`int`, defaults to `4`): max worker number of grpc server. local_mode (`bool`, defaults to `None`): @@ -368,18 +375,35 @@ def setup_rcp_agent_server( if init_settings is not None: init_process(**init_settings) - server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) - servicer = servicer_class(**kwargs) - add_RpcAgentServicer_to_server(servicer, server) - if local_mode: - server.add_insecure_port(f"localhost:{port}") - else: - server.add_insecure_port(f"0.0.0.0:{port}") - server.start() + while True: + try: + port = check_port(port) + logger.info( + f"Starting rpc server [{servicer_class.__name__}] at port" + f" [{port}]...", + ) + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=max_workers), + ) + kwargs['port'] = port + servicer = servicer_class(**kwargs) + add_RpcAgentServicer_to_server(servicer, server) + if local_mode: + server.add_insecure_port(f"localhost:{port}") + else: + server.add_insecure_port(f"0.0.0.0:{port}") + server.start() + break + except OSError: + logger.warning( + f"Failed to start rpc server at port [{port}], " + f"try another port", + ) logger.info( f"rpc server [{servicer_class.__name__}] at port [{port}] started " "successfully", ) + pipe.send(port) start_event.set() stop_event.wait() logger.info( @@ -392,6 +416,42 @@ def setup_rcp_agent_server( ) +def find_available_port() -> int: + """Get an unoccupied socket port number.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def check_port(port: Optional[int] = None) -> int: + """Check if the port is available. + + Args: + port (`int`): + the port number being checked. + + Returns: + `int`: the port number that passed the check. If the port is found + to be occupied, an available port number will be automatically + returned. + """ + if port is None: + new_port = find_available_port() + logger.warning( + "gRpc server port is not provided, automatically select " + f"[{new_port}] as the port number.", + ) + return new_port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + new_port = find_available_port() + logger.warning( + f"Port [{port}] is occupied, use [{new_port}] instead", + ) + return new_port + return port + + class RpcAgentServerLauncher: """Launcher of rpc agent server.""" @@ -453,55 +513,19 @@ def __init__( self.memory_config = memory_config self.agent_class = agent_class self.host = host - self.port = self.check_port(port) + self.port = check_port(port) self.max_pool_size = max_pool_size self.max_timeout_seconds = max_timeout_seconds self.local_model = local_mode self.server = None self.stop_event = None + self.parent_con = None self.kwargs = kwargs - def find_available_port(self) -> int: - """Get an unoccupied socket port number.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - return s.getsockname()[1] - - def check_port(self, port: int) -> int: - """Check if the port is available. - - Args: - port (`int`): - the port number being checked. - - Returns: - `int`: the port number that passed the check. If the port is found - to be occupied, an available port number will be automatically - returned. - """ - if port is None: - new_port = self.find_available_port() - logger.warning( - "gRpc server port is not provided, automatically select " - f"[{new_port}] as the port number.", - ) - return new_port - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if s.connect_ex(("localhost", port)) == 0: - new_port = self.find_available_port() - logger.warning( - f"Port [{port}] is occupied, use [{new_port}] instead", - ) - return new_port - return port - def launch(self) -> None: """launch a local rpc agent server.""" self.stop_event = Event() - logger.info( - f"Starting rpc server [{self.agent_class.__name__}] at port" - f" [{self.port}]...", - ) + self.parent_con, child_con = Pipe() start_event = Event() server_process = Process( target=setup_rcp_agent_server, @@ -510,6 +534,7 @@ def launch(self) -> None: "servicer_class": self.agent_class, "start_event": start_event, "stop_event": self.stop_event, + "pipe": child_con, "local_mode": self.local_model, "init_settings": _INIT_SETTINGS, "kwargs": { @@ -531,6 +556,7 @@ def launch(self) -> None: }, ) server_process.start() + self.port = self.parent_con.recv() start_event.wait() self.server = server_process diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index ce23609df..6c2e2978d 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -284,23 +284,14 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: def test_msghub_compatibility(self) -> None: """test compatibility with msghub""" - port1 = 12001 - port2 = 12002 - port3 = 12003 agent_a = DemoRpcAgentWithMemory( name="a", - lazy_launch=False, - port=port1, ) agent_b = DemoRpcAgentWithMemory( name="b", - lazy_launch=False, - port=port2, ) agent_c = DemoRpcAgentWithMemory( name="c", - lazy_launch=False, - port=port3, ) participants = [agent_a, agent_b, agent_c] annonuncement_msgs = [ From a80eec59422b3546e1a0dbd59705cc8fb9c6a988 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Wed, 24 Jan 2024 16:00:41 +0800 Subject: [PATCH 06/15] update distribtued examples --- docs/sphinx_doc/source/tutorial/208-distribute.md | 5 +---- notebook/distributed_debate.ipynb | 5 +---- notebook/distributed_dialog.ipynb | 3 +-- src/agentscope/agents/rpc_agent.py | 2 +- 4 files changed, 4 insertions(+), 11 deletions(-) diff --git a/docs/sphinx_doc/source/tutorial/208-distribute.md b/docs/sphinx_doc/source/tutorial/208-distribute.md index d96a95d7d..afe885f9a 100644 --- a/docs/sphinx_doc/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/source/tutorial/208-distribute.md @@ -60,10 +60,9 @@ But don't worry, `RpcAgentBase` shares the same interface as `AgentBase`, you on ### Run in multi-process mode AgentScope supports deployment in multi-process mode, where each agent is a sub-process of the application's main process, and all agents run on the same machine. -Its usage is very similar to single-process mode. The only difference lies in the initialization phase. +The usage is exactly the same as single process mode, and you only need to ensure that the agents used are instances of `RpcAgentBase` subclasses. Suppose you have classes `A` and `B`, both of which inherit from `RpcAgentBase`. -You can run the application in multi-process mode by passing in an additional parameter `port`, the other parts are completely the same as the single-process mode. ```python # import packages @@ -71,12 +70,10 @@ You can run the application in multi-process mode by passing in an additional pa a = A( name="A", ..., - port=12001, # port is required, other fields like host, launch_server and local_mode use the default value ) b = B( name="B", ..., - port=12002, # port is required ) x = None diff --git a/notebook/distributed_debate.ipynb b/notebook/distributed_debate.ipynb index a97b8d1b4..1113053f1 100644 --- a/notebook/distributed_debate.ipynb +++ b/notebook/distributed_debate.ipynb @@ -90,21 +90,18 @@ "\n", "pro_agent = RpcDialogAgent(\n", " name=\"Pro\",\n", - " port=12001,\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.\",\n", ")\n", "con_agent = RpcDialogAgent(\n", " name=\"Con\",\n", - " port=12002,\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.\",\n", ")\n", "judge_agent = RpcDialogAgent(\n", " name=\"Judge\",\n", - " port=12003,\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.\"\n", @@ -187,7 +184,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.1.0" + "version": "3.10.9" } }, "nbformat": 4, diff --git a/notebook/distributed_dialog.ipynb b/notebook/distributed_dialog.ipynb index 778ab37d4..5ec248465 100644 --- a/notebook/distributed_dialog.ipynb +++ b/notebook/distributed_dialog.ipynb @@ -88,7 +88,6 @@ "\n", "assistant_agent = RpcDialogAgent(\n", " name=\"Assistant\",\n", - " port=12010,\n", " sys_prompt=\"You are a helpful assistant.\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", @@ -141,7 +140,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.9" + "version": "3.1.0" } }, "nbformat": 4, diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 615d74962..4ba826c69 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -81,7 +81,7 @@ def __init__( use_memory: bool = True, memory_config: Optional[dict] = None, host: str = "localhost", - port: int = 12000, + port: int = None, max_pool_size: int = 100, max_timeout_seconds: int = 1800, launch_server: bool = True, From 7c3100dd69f9b906f4ef5ab760763863d6ec54bd Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 25 Jan 2024 14:23:53 +0800 Subject: [PATCH 07/15] refactor RpcAgent --- docs/sphinx_doc/source/agentscope.agents.rst | 8 - examples/distributed/distributed_debate.py | 13 +- examples/distributed/distributed_dialog.py | 23 +- notebook/distributed_debate.ipynb | 14 +- notebook/distributed_dialog.ipynb | 8 +- src/agentscope/agents/__init__.py | 2 - src/agentscope/agents/agent.py | 47 +- src/agentscope/agents/rpc_agent.py | 478 ++++++++----------- src/agentscope/agents/rpc_dialog_agent.py | 109 ----- tests/rpc_agent_test.py | 32 +- 10 files changed, 299 insertions(+), 435 deletions(-) delete mode 100644 src/agentscope/agents/rpc_dialog_agent.py diff --git a/docs/sphinx_doc/source/agentscope.agents.rst b/docs/sphinx_doc/source/agentscope.agents.rst index 0b6520701..aa529a12d 100644 --- a/docs/sphinx_doc/source/agentscope.agents.rst +++ b/docs/sphinx_doc/source/agentscope.agents.rst @@ -48,11 +48,3 @@ dict_dialog_agent module :members: :undoc-members: :show-inheritance: - -rpc_dialog_agent module -------------------------------- - -.. automodule:: agentscope.agents.dict_dialog_agent - :members: - :undoc-members: - :show-inheritance: \ No newline at end of file diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index 4fcc90102..987d38ec3 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -6,7 +6,7 @@ import agentscope from agentscope.msghub import msghub -from agentscope.agents.rpc_dialog_agent import RpcDialogAgent +from agentscope.agents.dialog_agent import DialogAgent from agentscope.agents.rpc_agent import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.utils.logging_utils import logger @@ -72,7 +72,7 @@ def setup_server(parsed_args: argparse.Namespace) -> None: host=host, port=port, local_mode=False, - agent_class=RpcDialogAgent, + agent_class=DialogAgent, **config, ) server_launcher.launch() @@ -84,20 +84,23 @@ def run_main_process(parsed_args: argparse.Namespace) -> None: agentscope.init( model_configs="configs/model_configs.json", ) - pro_agent = RpcDialogAgent( + pro_agent = DialogAgent( name="Pro", + ).to_distributed( host=parsed_args.pro_host, port=parsed_args.pro_port, launch_server=False, ) - con_agent = RpcDialogAgent( + con_agent = DialogAgent( name="Con", + ).to_distributed( host=parsed_args.con_host, port=parsed_args.con_port, launch_server=False, ) - judge_agent = RpcDialogAgent( + judge_agent = DialogAgent( name="Judge", + ).to_distributed( host=parsed_args.judge_host, port=parsed_args.judge_port, launch_server=False, diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index ed1b62368..915f82e29 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -7,7 +7,7 @@ import agentscope from agentscope.agents.user_agent import UserAgent -from agentscope.agents.rpc_dialog_agent import RpcDialogAgent +from agentscope.agents.dialog_agent import DialogAgent from agentscope.agents.rpc_agent import RpcAgentServerLauncher @@ -38,14 +38,16 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: model_configs="configs/model_configs.json", ) assistant_server_launcher = RpcAgentServerLauncher( - name="Assitant", - agent_class=RpcDialogAgent, - host=assistant_host, - port=assistant_port, - sys_prompt="You are a helpful assistant.", - model="gpt-3.5-turbo", - use_memory=True, - local_mode=False, + agent_class=DialogAgent, + agent_kwargs={ + "name": "Assitant", + "host": assistant_host, + "port": assistant_port, + "sys_prompt": "You are a helpful assistant.", + "model": "gpt-3.5-turbo", + "use_memory": True, + "local_mode": False, + }, ) assistant_server_launcher.launch() assistant_server_launcher.wait_until_terminate() @@ -56,8 +58,9 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: agentscope.init( model_configs="configs/model_configs.json", ) - assistant_agent = RpcDialogAgent( + assistant_agent = DialogAgent( name="Assistant", + ).to_distributed( host=assistant_host, port=assistant_port, launch_server=False, diff --git a/notebook/distributed_debate.ipynb b/notebook/distributed_debate.ipynb index 1113053f1..09bc590ee 100644 --- a/notebook/distributed_debate.ipynb +++ b/notebook/distributed_debate.ipynb @@ -84,28 +84,28 @@ "outputs": [], "source": [ "import agentscope\n", - "from agentscope.agents.rpc_dialog_agent import RpcDialogAgent\n", + "from agentscope.agents.dialog_agent import DialogAgent\n", "\n", "agentscope.init(model_configs=model_configs)\n", "\n", - "pro_agent = RpcDialogAgent(\n", + "pro_agent = DialogAgent(\n", " name=\"Pro\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.\",\n", - ")\n", - "con_agent = RpcDialogAgent(\n", + ").to_distributed()\n", + "con_agent = DialogAgent(\n", " name=\"Con\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.\",\n", - ")\n", - "judge_agent = RpcDialogAgent(\n", + ").to_distributed()\n", + "judge_agent = DialogAgent(\n", " name=\"Judge\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.\"\n", - ")" + ").to_distributed()" ] }, { diff --git a/notebook/distributed_dialog.ipynb b/notebook/distributed_dialog.ipynb index 5ec248465..46b1e3f78 100644 --- a/notebook/distributed_dialog.ipynb +++ b/notebook/distributed_dialog.ipynb @@ -80,18 +80,18 @@ "source": [ "import agentscope\n", "from agentscope.agents.user_agent import UserAgent\n", - "from agentscope.agents.rpc_dialog_agent import RpcDialogAgent\n", + "from agentscope.agents.dialog_agent import DialogAgent\n", "\n", "agentscope.init(\n", " model_configs=model_configs\n", ")\n", "\n", - "assistant_agent = RpcDialogAgent(\n", + "assistant_agent = DialogAgent(\n", " name=\"Assistant\",\n", " sys_prompt=\"You are a helpful assistant.\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", - ")\n", + ").to_distributed()\n", "user_agent = UserAgent(\n", " name=\"User\",\n", ")" @@ -140,7 +140,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.1.0" + "version": "3.10.9" } }, "nbformat": 4, diff --git a/src/agentscope/agents/__init__.py b/src/agentscope/agents/__init__.py index 14723727c..fd125d17b 100644 --- a/src/agentscope/agents/__init__.py +++ b/src/agentscope/agents/__init__.py @@ -2,7 +2,6 @@ """ Import all agent related modules in the package. """ from .agent import AgentBase from .operator import Operator -from .rpc_agent import RpcAgentBase from .dialog_agent import DialogAgent from .dict_dialog_agent import DictDialogAgent from .user_agent import UserAgent @@ -11,7 +10,6 @@ __all__ = [ "AgentBase", "Operator", - "RpcAgentBase", "DialogAgent", "DictDialogAgent", "UserAgent", diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index 6dee7c4a1..29e0b3d5f 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -2,20 +2,29 @@ """ Base class for Agent """ from __future__ import annotations +from abc import ABCMeta from typing import Optional from typing import Sequence from typing import Union from typing import Any from typing import Callable - from loguru import logger -from .operator import Operator -from ..models import load_model_by_name -from ..memory import TemporaryMemory +from agentscope.agents.operator import Operator +from agentscope.models import load_model_by_name +from agentscope.memory import TemporaryMemory + + +class _RecordInitSettingMeta(ABCMeta): + """A wrapper to record the init args into `init_settings` field.""" + + def __call__(cls, *args: tuple, **kwargs: dict) -> Any: + instance = super().__call__(*args, **kwargs) + instance.init_settings = {"args": args, "kwargs": kwargs} + return instance -class AgentBase(Operator): +class AgentBase(Operator, metaclass=_RecordInitSettingMeta): """Base class for all agents. All agents should inherit from this class and implement the `reply` @@ -172,3 +181,31 @@ def _broadcast_to_audience(self, x: dict) -> None: """Broadcast the input to all audiences.""" for agent in self._audience: agent.observe(x) + + def to_distributed( + self, + host: str = "localhost", + port: int = None, + max_pool_size: int = 100, + max_timeout_seconds: int = 1800, + launch_server: bool = True, + local_mode: bool = True, + lazy_launch: bool = True, + ) -> AgentBase: + """Convert current agent instance into a distributed version""" + from .rpc_agent import RpcAgent + + if issubclass(self.__class__, RpcAgent): + return self + return RpcAgent( + agent_class=self.__class__, + agent_configs=self.init_settings, + name=self.name, + host=host, + port=port, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + launch_server=launch_server, + local_mode=local_mode, + lazy_launch=lazy_launch, + ) diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 4ba826c69..124e46471 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -1,21 +1,22 @@ # -*- coding: utf-8 -*- """ Base class for Rpc Agent """ -from multiprocessing import Process -from multiprocessing import Event +from multiprocessing import ( + Process, + Event, + Pipe, +) from multiprocessing.synchronize import Event as EventClass -from multiprocessing import Pipe import socket import threading import time import json -from queue import Queue from typing import Any -from typing import Callable from typing import Optional from typing import Union from typing import Type from typing import Sequence +from queue import Queue from concurrent import futures from loguru import logger @@ -33,11 +34,12 @@ from agentscope._init import init_process, _INIT_SETTINGS from agentscope.agents.agent import AgentBase -from agentscope.message import MessageBase -from agentscope.message import Msg -from agentscope.message import PlaceholderMessage -from agentscope.message import deserialize -from agentscope.message import serialize +from agentscope.message import ( + Msg, + PlaceholderMessage, + deserialize, + serialize, +) from agentscope.rpc import ( RpcAgentClient, RpcMsg, @@ -64,22 +66,14 @@ def inner(rpc_agent, msg): # type: ignore [no-untyped-def] return inner -class RpcAgentBase(AgentBase, RpcAgentServicer): - """Abstract service of RpcAgent, also act as AgentBase. - - Note: - Please implement reply method based on the functionality of your - agent. - """ +class RpcAgent(AgentBase): + """A wrapper to extend an AgentBase into a gRPC Client.""" def __init__( self, + agent_class: Type[AgentBase], + agent_configs: dict, name: str, - config: Optional[dict] = None, - sys_prompt: Optional[str] = None, - model: Optional[Union[Callable[..., Any], str]] = None, - use_memory: bool = True, - memory_config: Optional[dict] = None, host: str = "localhost", port: int = None, max_pool_size: int = 100, @@ -87,218 +81,46 @@ def __init__( launch_server: bool = True, local_mode: bool = True, lazy_launch: bool = True, - is_servicer: bool = False, ) -> None: - """Init a RpcAgentBase instance. - - Args: - name (`str`): - The name of the agent. - config (`Optional[dict]`): - The configuration of the agent, if provided, the agent will - be initialized from the config rather than the other - parameters. - sys_prompt (`Optional[str]`): - The system prompt of the agent, which can be passed by args - or hard-coded in the agent. - model (`Optional[Union[Callable[..., Any], str]]`, defaults to - None): - The callable model object or the model name, which is used to - load model from configuration. - use_memory (`bool`, defaults to `True`): - Whether the agent has memory. - memory_config (`Optional[dict]`): - The config of memory. - host (`str`, defaults to "localhost"): - Hostname of the rpc agent server. - port (`int`, defaults to `None`): - Port of the rpc agent server. - max_pool_size (`int`, defaults to `100`): - The max number of task results that the server can - accommodate. Note that the oldest result will be deleted - after exceeding the pool size. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. Note that expired results will be - deleted. - launch_server (`bool`, defaults to `True`): - Launch a rpc server locally. - local_mode (`bool`, defaults to `True`): - The started server only listens to local requests. - lazy_launch (`bool`, defaults to `True`): - Only launch the server when the agent is called. - is_servicer (`bool`, defaults to `False`): - Used as a servicer of rpc server. - """ - super().__init__( - name, - config, - sys_prompt, - model, - use_memory, - memory_config, - ) + super().__init__(name=name) self.host = host self.port = port - self.is_servicer = is_servicer - - # prohibit servicer object from launching a new server - assert not (is_servicer and launch_server) - # launch_server is True only in the main process self.server_launcher = None + self.client = None if launch_server: self.server_launcher = RpcAgentServerLauncher( - name=name, - config=config, - sys_prompt=sys_prompt, - model=model, - use_memory=use_memory, - memory_config=memory_config, - agent_class=self.__class__, + agent_class=agent_class, + agent_args=agent_configs["args"], + agent_kwargs=agent_configs["kwargs"], host=host, port=port, max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, local_mode=local_mode, ) - self.client = None if not lazy_launch: self._launch_server() - # is_servicer is True only in the rpc server process - if is_servicer: - self.result_pool = ExpiringDict( - max_len=max_pool_size, - max_age_seconds=max_timeout_seconds, - ) - self.task_queue = Queue() - self.worker_thread = threading.Thread(target=self.process_tasks) - self.worker_thread.start() - self.task_id_lock = threading.Lock() - self.task_id_counter = 0 - - # connect to an existing rpc server - if not launch_server and not is_servicer: + else: self.client = RpcAgentClient(host=self.host, port=self.port) def _launch_server(self) -> None: - """Launch a rpc server and update the port and the client - """ + """Launch a rpc server and update the port and the client""" self.server_launcher.launch() self.port = self.server_launcher.port self.client = RpcAgentClient(host=self.host, port=self.port) - def get_task_id(self) -> int: - """Get the auto-increment task id.""" - with self.task_id_lock: - self.task_id_counter += 1 - return self.task_id_counter - - def call_func(self, request: RpcMsg, _: ServicerContext) -> RpcMsg: - if hasattr(self, request.target_func): - return getattr(self, request.target_func)(request) - else: - logger.error(f"Unsupported method {request.target_func}") - return RpcMsg( - value=Msg( - name=self.name, - content=f"Unsupported method {request.target_func}", - ).serialize(), - ) - - @rpc_servicer_method - def _call(self, request: RpcMsg) -> RpcMsg: - """Call function of RpcAgentService - - Args: - request (`RpcMsg`): - Message containing input parameters or input parameter - placeholders. - - Returns: - `RpcMsg`: A serialized Msg instance with attributes name, host, - port and task_id - """ - if request.value: - msg = deserialize(request.value) - else: - msg = None - task_id = self.get_task_id() - self.task_queue.put((task_id, msg)) - return RpcMsg( - value=Msg( - name=self.name, - content=None, - host=self.host, - port=self.port, - task_id=task_id, - ).serialize(), - ) - - @rpc_servicer_method - def _get(self, request: RpcMsg) -> RpcMsg: - """Get function of RpcAgentService - - Args: - request (`RpcMsg`): - Identifier of message, with json format:: - - { - 'task_id': int - } - - Returns: - `RpcMsg`: Concrete values of the specific message (or part of it). - """ - # todo: add format specification of request - msg = json.loads(request.value) - # todo: implement the waiting in a more elegant way, add timeout - while True: - result = self.result_pool.get(msg["task_id"], None) - if result is not None: - return RpcMsg(value=result.serialize()) - time.sleep(0.1) - - @rpc_servicer_method - def _observe(self, request: RpcMsg) -> RpcMsg: - """Observe function of RpcAgentService - - Args: - request (`RpcMsg`): - The serialized input to be observed. - - Returns: - `RpcMsg`: Empty RpcMsg. - """ - msgs = deserialize(request.value) - for msg in msgs: - if isinstance(msg, PlaceholderMessage): - msg.update_value() - self.memory.add(msgs) - return RpcMsg() - - def process_tasks(self) -> None: - """Task processing thread.""" - while True: - task_id, task_msg = self.task_queue.get() - # TODO: optimize this and avoid blocking - if isinstance(task_msg, PlaceholderMessage): - task_msg.update_value() - result = self.reply(task_msg) - self.result_pool[task_id] = result - def reply(self, x: dict = None) -> dict: - """Reply function used in the rpc agent server process.""" - raise NotImplementedError( - f"Agent [{type(self).__name__}] is missing the required " - f'"reply" function.', + if self.client is None: + self._launch_server() + res_msg = self.client.call_func( + func_name="_call", + value=x.serialize() if x is not None else "", + ) + return PlaceholderMessage( + **deserialize(res_msg), # type: ignore [arg-type] ) def observe(self, x: Union[dict, Sequence[dict]]) -> None: - """Observe the input, store it in memory and don't response to it. - - Args: - x (`Union[dict, Sequence[dict]]`): - The input to be observed. - """ if self.client is None: self._launch_server() self.client.call_func( @@ -306,29 +128,6 @@ def observe(self, x: Union[dict, Sequence[dict]]) -> None: value=serialize(x), # type: ignore [arg-type] ) - def __call__(self, *args: Any, **kwargs: Any) -> dict: - """Call function used in the main process.""" - if args is not None and len(args) > 0: - x = args[0] - elif kwargs is not None and len(kwargs) > 0: - x = kwargs["x"] - else: - x = None - if x is not None: - assert isinstance(x, MessageBase) - if self.client is None: - self._launch_server() - res_msg = self.client.call_func( - func_name="_call", - value=x.serialize() if x is not None else "", - ) - res = PlaceholderMessage( - **deserialize(res_msg), # type: ignore [arg-type] - ) - if self._audience is not None: - self._broadcast_to_audience(res) - return res - def stop(self) -> None: """Stop the RpcAgent and the launched rpc server.""" if self.server_launcher is not None: @@ -340,15 +139,19 @@ def __del__(self) -> None: def setup_rcp_agent_server( + agent_class: Type[AgentBase], + agent_args: tuple, + agent_kwargs: dict, + host: str, port: int, - servicer_class: Type[RpcAgentServicer], + init_settings: dict = None, start_event: EventClass = None, stop_event: EventClass = None, pipe: int = None, - max_workers: int = 4, local_mode: bool = True, - init_settings: dict = None, - kwargs: dict = None, + max_pool_size: int = 100, + max_timeout_seconds: int = 1800, + max_workers: int = 4, ) -> None: """Setup gRPC server rpc agent. @@ -375,18 +178,24 @@ def setup_rcp_agent_server( if init_settings is not None: init_process(**init_settings) + servicer = RpcServerSideWrapper( + agent_class(*agent_args, **agent_kwargs), + host=host, + port=port, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + ) while True: try: port = check_port(port) + servicer.port = port logger.info( - f"Starting rpc server [{servicer_class.__name__}] at port" + f"Starting rpc server [{agent_class.__name__}] at port" f" [{port}]...", ) server = grpc.server( futures.ThreadPoolExecutor(max_workers=max_workers), ) - kwargs['port'] = port - servicer = servicer_class(**kwargs) add_RpcAgentServicer_to_server(servicer, server) if local_mode: server.add_insecure_port(f"localhost:{port}") @@ -396,22 +205,22 @@ def setup_rcp_agent_server( break except OSError: logger.warning( - f"Failed to start rpc server at port [{port}], " + f"Failed to start rpc server at port [{port}]" f"try another port", ) logger.info( - f"rpc server [{servicer_class.__name__}] at port [{port}] started " + f"rpc server [{agent_class.__name__}] at port [{port}] started " "successfully", ) pipe.send(port) start_event.set() stop_event.wait() logger.info( - f"Stopping rpc server [{servicer_class.__name__}] at port [{port}]", + f"Stopping rpc server [{agent_class.__name__}] at port [{port}]", ) server.stop(0) logger.info( - f"rpc server [{servicer_class.__name__}] at port [{port}] stopped " + f"rpc server [{agent_class.__name__}] at port [{port}] stopped " "successfully", ) @@ -457,13 +266,9 @@ class RpcAgentServerLauncher: def __init__( self, - name: str, - config: Optional[dict] = None, - sys_prompt: Optional[str] = None, - model: Optional[Union[Callable[..., Any], str]] = None, - use_memory: bool = True, - memory_config: Optional[dict] = None, - agent_class: Type[RpcAgentBase] = None, + agent_class: Type[AgentBase] = None, + agent_args: tuple = (), + agent_kwargs: dict = None, host: str = "localhost", port: int = None, max_pool_size: int = 100, @@ -505,13 +310,10 @@ def __init__( Whether the started rpc server only listens to local requests. """ - self.name = name - self.config = config - self.sys_prompt = sys_prompt - self.model = model - self.use_memory = use_memory - self.memory_config = memory_config + # TODO: update docstring self.agent_class = agent_class + self.agent_args = agent_args + self.agent_kwargs = agent_kwargs self.host = host self.port = check_port(port) self.max_pool_size = max_pool_size @@ -530,29 +332,18 @@ def launch(self) -> None: server_process = Process( target=setup_rcp_agent_server, kwargs={ + "agent_class": self.agent_class, + "agent_args": self.agent_args, + "agent_kwargs": self.agent_kwargs, + "host": self.host, "port": self.port, - "servicer_class": self.agent_class, + "init_settings": _INIT_SETTINGS, "start_event": start_event, "stop_event": self.stop_event, "pipe": child_con, + "max_pool_size": self.max_pool_size, + "max_timeout_seconds": self.max_timeout_seconds, "local_mode": self.local_model, - "init_settings": _INIT_SETTINGS, - "kwargs": { - "name": self.name, - "config": self.config, - "sys_prompt": self.sys_prompt, - "model": self.model, - "use_memory": self.use_memory, - "memory_config": self.memory_config, - "host": self.host, - "port": self.port, - "max_pool_size": self.max_pool_size, - "max_timeout_seconds": self.max_timeout_seconds, - "launch_server": False, - "lazy_launch": False, - "is_servicer": True, - **self.kwargs, - }, }, ) server_process.start() @@ -580,3 +371,142 @@ def shutdown(self) -> None: f" [{self.port}] is killed.", ) self.server = None + + +class RpcServerSideWrapper(RpcAgentServicer): + """A wrapper to extend an AgentBase into a gRPC Servicer.""" + + def __init__( + self, + agent_instance: AgentBase, + host: str = "localhost", + port: int = None, + max_pool_size: int = 100, + max_timeout_seconds: int = 1800, + ): + """Init the service side wrapper. + + Args: + agent_instance (`AgentBase`): an instance of `AgentBase`. + host (`str`, defaults to "localhost"): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `100`): + The max number of task results that the server can + accommodate. Note that the oldest result will be deleted + after exceeding the pool size. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. Note that expired results will be + deleted. + """ + self.host = host + self.port = port + self.result_pool = ExpiringDict( + max_len=max_pool_size, + max_age_seconds=max_timeout_seconds, + ) + self.task_queue = Queue() + self.worker_thread = threading.Thread(target=self.process_tasks) + self.worker_thread.start() + self.task_id_lock = threading.Lock() + self.task_id_counter = 0 + self.agent = agent_instance + + def get_task_id(self) -> int: + """Get the auto-increment task id.""" + with self.task_id_lock: + self.task_id_counter += 1 + return self.task_id_counter + + def call_func(self, request: RpcMsg, _: ServicerContext) -> RpcMsg: + """Call the specific servicer function. + """ + if hasattr(self, request.target_func): + return getattr(self, request.target_func)(request) + else: + logger.error(f"Unsupported method {request.target_func}") + return RpcMsg( + value=Msg( + name=self.agent.name, + content=f"Unsupported method {request.target_func}", + ).serialize(), + ) + + def _call(self, request: RpcMsg) -> RpcMsg: + """Call function of RpcAgentService + + Args: + request (`RpcMsg`): + Message containing input parameters or input parameter + placeholders. + + Returns: + `RpcMsg`: A serialized Msg instance with attributes name, host, + port and task_id + """ + if request.value: + msg = deserialize(request.value) + else: + msg = None + task_id = self.get_task_id() + self.task_queue.put((task_id, msg)) + return RpcMsg( + value=Msg( + name=self.agent.name, + content=None, + host=self.host, + port=self.port, + task_id=task_id, + ).serialize(), + ) + + def _get(self, request: RpcMsg) -> RpcMsg: + """Get function of RpcAgentService + + Args: + request (`RpcMsg`): + Identifier of message, with json format:: + + { + 'task_id': int + } + + Returns: + `RpcMsg`: Concrete values of the specific message (or part of it). + """ + # todo: add format specification of request + msg = json.loads(request.value) + # todo: implement the waiting in a more elegant way, add timeout + while True: + result = self.result_pool.get(msg["task_id"], None) + if result is not None: + return RpcMsg(value=result.serialize()) + time.sleep(0.1) + + def _observe(self, request: RpcMsg) -> RpcMsg: + """Observe function of RpcAgentService + + Args: + request (`RpcMsg`): + The serialized input to be observed. + + Returns: + `RpcMsg`: Empty RpcMsg. + """ + msgs = deserialize(request.value) + for msg in msgs: + if isinstance(msg, PlaceholderMessage): + msg.update_value() + self.agent.observe(msgs) + return RpcMsg() + + def process_tasks(self) -> None: + """Task processing thread.""" + while True: + task_id, task_msg = self.task_queue.get() + # TODO: optimize this and avoid blocking + if isinstance(task_msg, PlaceholderMessage): + task_msg.update_value() + result = self.agent.reply(task_msg) + self.result_pool[task_id] = result diff --git a/src/agentscope/agents/rpc_dialog_agent.py b/src/agentscope/agents/rpc_dialog_agent.py deleted file mode 100644 index 6f81c15ba..000000000 --- a/src/agentscope/agents/rpc_dialog_agent.py +++ /dev/null @@ -1,109 +0,0 @@ -# -*- coding: utf-8 -*- -""" A rpc version of dialog agent.""" -import json -from typing import Any -from typing import Callable -from typing import Optional -from typing import Union -from loguru import logger - -from agentscope.message import Msg -from agentscope.agents.rpc_agent import RpcAgentBase -from agentscope.prompt import PromptEngine -from agentscope.prompt import PromptType - - -class RpcDialogAgent(RpcAgentBase): - """DialogAgent class""" - - def __init__( - self, - name: str, - config: Optional[dict] = None, - sys_prompt: Optional[str] = None, - model: Optional[Union[Callable[..., Any], str]] = None, - use_memory: bool = True, - memory_config: Optional[dict] = None, - host: str = "localhost", - port: int = 80, - max_pool_size: int = 100, - max_timeout_seconds: int = 1800, - launch_server: bool = True, - local_mode: bool = True, - lazy_launch: bool = True, - is_servicer: bool = False, - ) -> None: - super().__init__( - name=name, - config=config, - sys_prompt=sys_prompt, - model=model, - use_memory=use_memory, - memory_config=memory_config, - host=host, - port=port, - max_pool_size=max_pool_size, - max_timeout_seconds=max_timeout_seconds, - launch_server=launch_server, - local_mode=local_mode, - lazy_launch=lazy_launch, - is_servicer=is_servicer, - ) - # init prompt engine - if is_servicer: - self.engine = PromptEngine(self.model, prompt_type=PromptType.LIST) - - def reply(self, x: dict = None) -> dict: - """Reply function of the agent. Processes the input data, - generates a prompt using the current dialogue memory and system - prompt, and invokes the language model to produce a response. The - response is then formatted and added to the dialogue memory. - - Args: - x (`dict`, defaults to `None`): - A dictionary representing the user's input to the agent. - This input is added to the dialogue memory if provided. - Returns: - dict: A dictionary representing the message generated by the agent - in response to the user's input. It contains at least a - 'speak' key with the textual response and may include other - keys such as 'agreement' if provided by the language model. - Raises: - `json.decoder.JSONDecodeError`: - If the response from the language model is not valid JSON, - it defaults to treating the response as plain text. - """ - # record the input if needed - if x is not None: - self.memory.add(x) - - # prepare prompt - prompt = self.engine.join( - self.sys_prompt, - self.memory.get_memory(), - ) - - # call llm - response = self.model( - prompt, - parse_func=json.loads, - fault_handler=lambda x: {"speak": x}, - ) - - logger.debug(json.dumps(response, indent=4)) - - if isinstance(response, dict) and "speak" in response: - msg = Msg( - name=self.name, - content=response.get("speak", None) or response, - **response, - ) - else: - msg = Msg(self.name, response) - - logger.chat(msg) - - # record to memory - self.memory.add(msg) - - return msg diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 6c2e2978d..5068c9bf5 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -9,7 +9,6 @@ import agentscope from agentscope.agents import AgentBase -from agentscope.agents import RpcAgentBase from agentscope.agents.rpc_agent import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage @@ -19,7 +18,7 @@ from agentscope.utils import MonitorFactory, QuotaExceededError -class DemoRpcAgent(RpcAgentBase): +class DemoRpcAgent(AgentBase): """A demo Rpc agent for test usage.""" def __init__(self, **kwargs) -> None: # type: ignore [no-untyped-def] @@ -34,7 +33,7 @@ def reply(self, x: dict = None) -> dict: return x -class DemoRpcAgentAdd(RpcAgentBase): +class DemoRpcAgentAdd(AgentBase): """A demo Rpc agent for test usage""" def reply(self, x: dict = None) -> dict: @@ -54,7 +53,7 @@ def reply(self, x: dict = None) -> dict: return x -class DemoRpcAgentWithMemory(RpcAgentBase): +class DemoRpcAgentWithMemory(AgentBase): """A demo Rpc agent that count its memory""" def reply(self, x: dict = None) -> dict: @@ -66,7 +65,7 @@ def reply(self, x: dict = None) -> dict: return msg -class DemoRpcAgentWithMonitor(RpcAgentBase): +class DemoRpcAgentWithMonitor(AgentBase): """A demo Rpc agent that use monitor""" def reply(self, x: dict = None) -> dict: @@ -116,6 +115,7 @@ def test_single_rpc_agent_server(self) -> None: port = 12001 agent_a = DemoRpcAgent( name="a", + ).to_distributed( host=host, port=port, ) @@ -162,15 +162,18 @@ def test_single_rpc_agent_server(self) -> None: def test_connect_to_an_existing_rpc_server(self) -> None: """test connecting to an existing server""" launcher = RpcAgentServerLauncher( - name="a", - host="127.0.0.1", # choose port automatically - local_mode=False, agent_class=DemoRpcAgent, + agent_kwargs={ + "name": "a", + }, + local_mode=False, + host="127.0.0.1", ) launcher.launch() agent_a = DemoRpcAgent( name="a", + ).to_distributed( host="127.0.0.1", port=launcher.port, launch_server=False, @@ -202,18 +205,21 @@ def test_multi_rpc_agent(self) -> None: port3 = 12003 agent_a = DemoRpcAgentAdd( name="a", + ).to_distributed( host=host, port=port1, lazy_launch=False, ) agent_b = DemoRpcAgentAdd( name="b", + ).to_distributed( host=host, port=port2, lazy_launch=False, ) agent_c = DemoRpcAgentAdd( name="c", + ).to_distributed( host=host, port=port3, lazy_launch=False, @@ -259,6 +265,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent a agent_a = DemoRpcAgentAdd( name="a", + ).to_distributed( host=host, port=port1, lazy_launch=False, @@ -270,6 +277,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent c agent_c = DemoRpcAgentAdd( name="c", + ).to_distributed( host=host, port=port2, lazy_launch=False, @@ -286,13 +294,13 @@ def test_msghub_compatibility(self) -> None: """test compatibility with msghub""" agent_a = DemoRpcAgentWithMemory( name="a", - ) + ).to_distributed() agent_b = DemoRpcAgentWithMemory( name="b", - ) + ).to_distributed() agent_c = DemoRpcAgentWithMemory( name="c", - ) + ).to_distributed() participants = [agent_a, agent_b, agent_c] annonuncement_msgs = [ Msg(name="System", content="Announcement 1"), @@ -328,6 +336,7 @@ def test_standalone_multiprocess_init(self) -> None: # rpc agent a agent_a = DemoRpcAgentWithMonitor( name="a", + ).to_distributed( host=host, port=port1, lazy_launch=False, @@ -335,6 +344,7 @@ def test_standalone_multiprocess_init(self) -> None: # local agent b agent_b = DemoRpcAgentWithMonitor( name="b", + ).to_distributed( host=host, port=port2, lazy_launch=False, From c768864fcfab097c2fedbe5ee0d978e892b54ed1 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 25 Jan 2024 15:22:21 +0800 Subject: [PATCH 08/15] update docstring and tutorial of rpc agent --- docs/sphinx_doc/source/tutorial/201-agent.md | 1 - .../source/tutorial/208-distribute.md | 37 +++++---- examples/distributed/distributed_debate.py | 10 +-- examples/distributed/distributed_dialog.py | 9 +-- notebook/distributed_debate.ipynb | 6 +- notebook/distributed_dialog.ipynb | 3 +- src/agentscope/agents/agent.py | 24 +++++- src/agentscope/agents/rpc_agent.py | 77 +++++++++++-------- src/agentscope/agents/user_agent.py | 10 +-- tests/rpc_agent_test.py | 25 +++--- 10 files changed, 121 insertions(+), 81 deletions(-) diff --git a/docs/sphinx_doc/source/tutorial/201-agent.md b/docs/sphinx_doc/source/tutorial/201-agent.md index e498f18d1..b7e648eba 100644 --- a/docs/sphinx_doc/source/tutorial/201-agent.md +++ b/docs/sphinx_doc/source/tutorial/201-agent.md @@ -66,7 +66,6 @@ Below is a table summarizing the functionality of some of the key agents availab | Agent Type | Description | Typical Use Cases | | -------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | `AgentBase` | Serves as the superclass for all agents, providing essential attributes and methods. | The foundation for building any custom agent. | -| `RpcAgentBase` | Executes remote procedure calls in distributed mode. | The foundation for building any custom agent in distributed mode. | | `DialogAgent` | Manages dialogues by understanding context and generating coherent responses. | Customer service bots, virtual assistants. | | `UserAgent` | Interacts with the user to collect input, generating messages that may include URLs or additional specifics based on required keys. | Collecting user input for agents | | *More to Come* | AgentScope is continuously expanding its pool with more specialized agents for diverse applications. | | diff --git a/docs/sphinx_doc/source/tutorial/208-distribute.md b/docs/sphinx_doc/source/tutorial/208-distribute.md index afe885f9a..4a9d36101 100644 --- a/docs/sphinx_doc/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/source/tutorial/208-distribute.md @@ -45,24 +45,23 @@ A-->C Please follow the steps below to deploy your application distributedly. -### Write your agents +### Convert your agents -To use distributed functionality, your agent class needs to inherit from `RpcAgentBase`. -`RpcAgentBase` requires several additional parameters compared to `AgentBase` during initialization. +`AgentBase` provided the `to_dist` method to convert the agent into a distributed version. +`to_dist` requires several parameters. - `host`: the hostname or IP address of the machine where the agent runs, defaults to `localhost`. - `port`: the port of this agent's RPC server, defaults to `80`. - `launch_server`: whether to launch an RPC server locally, defaults to `True`. - `local_mode`: set to `True` if all agents run on the same machine, defaults to `True`. - -But don't worry, `RpcAgentBase` shares the same interface as `AgentBase`, you only need to implement the `reply` method. You can even copy the `reply` method of an `AgentBase` subclass and use it in `RpcAgentBase`. +- `lazy_launch`: if set to `True`, only launch the server when the agent is called. ### Run in multi-process mode AgentScope supports deployment in multi-process mode, where each agent is a sub-process of the application's main process, and all agents run on the same machine. -The usage is exactly the same as single process mode, and you only need to ensure that the agents used are instances of `RpcAgentBase` subclasses. +The usage is exactly the same as single process mode, and you only need to call the `to_dist` method after initialization. -Suppose you have classes `A` and `B`, both of which inherit from `RpcAgentBase`. +Suppose you have classes `A` and `B`, both of which inherit from `AgentBase`. ```python # import packages @@ -70,11 +69,11 @@ Suppose you have classes `A` and `B`, both of which inherit from `RpcAgentBase`. a = A( name="A", ..., -) +).to_dist() b = B( name="B", ..., -) +).to_dist() x = None while x is None or x.content != 'exit': @@ -90,8 +89,11 @@ AgentScope also supports to run agents on multiple machines. In this case, you n # import packages server_a = RpcAgentServerLauncher( - name="A", - ..., + agent_class=A, + agent_kwargs={ + "name": "A" + ... + }, host=ip_a, port=12001, local_mode=False, @@ -107,8 +109,11 @@ Please make sure that the two machines can access each other using the IP addres # import packages server_b = RpcAgentServerLauncher( - name="B", - ..., + agent_class=B, + agent_kwargs={ + "name": "B", + ... + }, host=ip_b, port=12001, local_mode=False, @@ -124,14 +129,16 @@ Then, you can run the application's main process on any machine that can access a = A( name="A", - ..., + ... +).to_dist( host=ip_a, port=12001, launch_server=False, ) b = B( name="B", - ..., + ... +).to_dist( host=ip_b, port=12002, launch_server=False, diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index 987d38ec3..300b2f537 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -69,11 +69,11 @@ def setup_server(parsed_args: argparse.Namespace) -> None: host = getattr(parsed_args, f"{parsed_args.role}_host") port = getattr(parsed_args, f"{parsed_args.role}_port") server_launcher = RpcAgentServerLauncher( + agent_class=DialogAgent, + agent_kwargs=config, host=host, port=port, local_mode=False, - agent_class=DialogAgent, - **config, ) server_launcher.launch() server_launcher.wait_until_terminate() @@ -86,21 +86,21 @@ def run_main_process(parsed_args: argparse.Namespace) -> None: ) pro_agent = DialogAgent( name="Pro", - ).to_distributed( + ).to_dist( host=parsed_args.pro_host, port=parsed_args.pro_port, launch_server=False, ) con_agent = DialogAgent( name="Con", - ).to_distributed( + ).to_dist( host=parsed_args.con_host, port=parsed_args.con_port, launch_server=False, ) judge_agent = DialogAgent( name="Judge", - ).to_distributed( + ).to_dist( host=parsed_args.judge_host, port=parsed_args.judge_port, launch_server=False, diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index 915f82e29..ce81b67be 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -2,7 +2,6 @@ """ An example of distributed dialog """ import argparse -import time from loguru import logger import agentscope @@ -41,13 +40,14 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: agent_class=DialogAgent, agent_kwargs={ "name": "Assitant", - "host": assistant_host, - "port": assistant_port, "sys_prompt": "You are a helpful assistant.", "model": "gpt-3.5-turbo", "use_memory": True, "local_mode": False, }, + host=assistant_host, + port=assistant_port, + local_mode=False, ) assistant_server_launcher.launch() assistant_server_launcher.wait_until_terminate() @@ -60,7 +60,7 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: ) assistant_agent = DialogAgent( name="Assistant", - ).to_distributed( + ).to_dist( host=assistant_host, port=assistant_port, launch_server=False, @@ -77,7 +77,6 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: while not msg.content.endswith("exit"): msg = assistant_agent(msg) logger.chat(msg) - time.sleep(0.5) msg = user_agent(msg) diff --git a/notebook/distributed_debate.ipynb b/notebook/distributed_debate.ipynb index 09bc590ee..10a532515 100644 --- a/notebook/distributed_debate.ipynb +++ b/notebook/distributed_debate.ipynb @@ -93,19 +93,19 @@ " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.\",\n", - ").to_distributed()\n", + ").to_dist()\n", "con_agent = DialogAgent(\n", " name=\"Con\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.\",\n", - ").to_distributed()\n", + ").to_dist()\n", "judge_agent = DialogAgent(\n", " name=\"Judge\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", " sys_prompt=\"Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.\"\n", - ").to_distributed()" + ").to_dist()" ] }, { diff --git a/notebook/distributed_dialog.ipynb b/notebook/distributed_dialog.ipynb index 46b1e3f78..07a6425fe 100644 --- a/notebook/distributed_dialog.ipynb +++ b/notebook/distributed_dialog.ipynb @@ -91,7 +91,7 @@ " sys_prompt=\"You are a helpful assistant.\",\n", " model=\"gpt-3.5-turbo\",\n", " use_memory=True,\n", - ").to_distributed()\n", + ").to_dist()\n", "user_agent = UserAgent(\n", " name=\"User\",\n", ")" @@ -119,7 +119,6 @@ "while not msg.content.endswith(\"exit\"):\n", " msg = assistant_agent(msg)\n", " logger.chat(msg)\n", - " time.sleep(0.5)\n", " msg = user_agent(msg)" ] } diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index 29e0b3d5f..463650f83 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -182,7 +182,7 @@ def _broadcast_to_audience(self, x: dict) -> None: for agent in self._audience: agent.observe(x) - def to_distributed( + def to_dist( self, host: str = "localhost", port: int = None, @@ -192,7 +192,27 @@ def to_distributed( local_mode: bool = True, lazy_launch: bool = True, ) -> AgentBase: - """Convert current agent instance into a distributed version""" + """Convert current agent instance into a distributed version. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + local_mode (`bool`, defaults to `True`): + Whether the started rpc server only listens to local + requests. + lazy_launch (`bool`, defaults to `True`): + Only launch the server when the agent is called. + + Returns: + `AgentBase`: the wrapped agent instance with distributed + functionality + """ from .rpc_agent import RpcAgent if issubclass(self.__class__, RpcAgent): diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 124e46471..746fe6303 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -71,9 +71,9 @@ class RpcAgent(AgentBase): def __init__( self, + name: str, agent_class: Type[AgentBase], agent_configs: dict, - name: str, host: str = "localhost", port: int = None, max_pool_size: int = 100, @@ -82,6 +82,28 @@ def __init__( local_mode: bool = True, lazy_launch: bool = True, ) -> None: + """Initialize a RpcAgent instance. + + Args: + agent_class (`Type[AgentBase]`, defaults to `None`): + The AgentBase subclass encapsulated by this wrapper. + agent_configs (`dict`): The args used to initialize the + agent_class. + name (`str`): Name of the agent. + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + local_mode (`bool`, defaults to `True`): + Whether the started rpc server only listens to local + requests. + lazy_launch (`bool`, defaults to `True`): + Only launch the server when the agent is called. + """ super().__init__(name=name) self.host = host self.port = port @@ -156,10 +178,18 @@ def setup_rcp_agent_server( """Setup gRPC server rpc agent. Args: + agent_class (`Type[AgentBase]`): + A subclass of AgentBase. + agent_args (`tuple`): The args tuple used to initialize the + agent_class. + agent_kwargs (`dict`): The args dict used to initialize the + agent_class. + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. port (`int`): The socket port monitored by grpc server. - servicer_class (`Type[RpcAgentServicer]`): - An implementation of RpcAgentBaseService. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. start_event (`EventClass`, defaults to `None`): An Event instance used to determine whether the child process has been started. @@ -168,12 +198,14 @@ def setup_rcp_agent_server( process has been stopped. pipe (`int`, defaults to `None`): A pipe instance used to pass the actual port of the server. - max_workers (`int`, defaults to `4`): - max worker number of grpc server. local_mode (`bool`, defaults to `None`): Only listen to local requests. - init_settings (`dict`, defaults to `None`): - Init settings. + max_pool_size (`int`, defaults to `100`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + max_workers (`int`, defaults to `4`): + max worker number of grpc server. """ if init_settings is not None: @@ -274,30 +306,16 @@ def __init__( max_pool_size: int = 100, max_timeout_seconds: int = 1800, local_mode: bool = True, - **kwargs: Any, ) -> None: """Init a rpc agent server launcher. Args: - name (`str`): - The name of the agent. - config (`Optional[dict]`): - The configuration of the agent, if provided, the agent will - be initialized from the config rather than the other - parameters. - sys_prompt (`Optional[str]`): - The system prompt of the agent, which can be passed by args - or hard-coded in the agent. - model (`Optional[Union[Callable[..., Any], str]]`, defaults to - None): - The callable model object or the model name, which is used to - load model from configuration. - use_memory (`bool`, defaults to `True`): - Whether the agent has memory. - memory_config (`Optional[dict]`): - The config of memory. - agent_class (`Type[RpcAgentBase]`, defaults to `None`): - The RpcAgentBase class used in rpc agent server as a servicer. + agent_class (`Type[AgentBase]`, defaults to `None`): + The AgentBase subclass encapsulated by this wrapper. + agent_args (`tuple`): The args tuple used to initialize the + agent_class. + agent_kwargs (`dict`): The args dict used to initialize the + agent_class. host (`str`, defaults to `"localhost"`): Hostname of the rpc agent server. port (`int`, defaults to `None`): @@ -310,7 +328,6 @@ def __init__( Whether the started rpc server only listens to local requests. """ - # TODO: update docstring self.agent_class = agent_class self.agent_args = agent_args self.agent_kwargs = agent_kwargs @@ -322,7 +339,6 @@ def __init__( self.server = None self.stop_event = None self.parent_con = None - self.kwargs = kwargs def launch(self) -> None: """launch a local rpc agent server.""" @@ -420,8 +436,7 @@ def get_task_id(self) -> int: return self.task_id_counter def call_func(self, request: RpcMsg, _: ServicerContext) -> RpcMsg: - """Call the specific servicer function. - """ + """Call the specific servicer function.""" if hasattr(self, request.target_func): return getattr(self, request.target_func)(request) else: diff --git a/src/agentscope/agents/user_agent.py b/src/agentscope/agents/user_agent.py index ac2339166..e8de167ac 100644 --- a/src/agentscope/agents/user_agent.py +++ b/src/agentscope/agents/user_agent.py @@ -46,15 +46,15 @@ def reply( x (`dict`, defaults to `None`): A dictionary containing initial data to be added to memory. Defaults to None. - required_keys (`Optional[Union[list[str], str]]`, defaults to - None): + required_keys \ + (`Optional[Union[list[str], str]]`, defaults to `None`): Strings that requires user to input, which will be used as the key of the returned dict. Defaults to None. Returns: - dict: A dictionary representing the message object that contains - the user's input and any additional details. This is also - stored in the object's memory. + `dict`: A dictionary representing the message object that contains + the user's input and any additional details. This is also + stored in the object's memory. """ if x is not None: self.memory.add(x) diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 5068c9bf5..be8279657 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -115,7 +115,7 @@ def test_single_rpc_agent_server(self) -> None: port = 12001 agent_a = DemoRpcAgent( name="a", - ).to_distributed( + ).to_dist( host=host, port=port, ) @@ -169,11 +169,12 @@ def test_connect_to_an_existing_rpc_server(self) -> None: }, local_mode=False, host="127.0.0.1", + port=12010, ) launcher.launch() agent_a = DemoRpcAgent( name="a", - ).to_distributed( + ).to_dist( host="127.0.0.1", port=launcher.port, launch_server=False, @@ -205,21 +206,21 @@ def test_multi_rpc_agent(self) -> None: port3 = 12003 agent_a = DemoRpcAgentAdd( name="a", - ).to_distributed( + ).to_dist( host=host, port=port1, lazy_launch=False, ) agent_b = DemoRpcAgentAdd( name="b", - ).to_distributed( + ).to_dist( host=host, port=port2, lazy_launch=False, ) agent_c = DemoRpcAgentAdd( name="c", - ).to_distributed( + ).to_dist( host=host, port=port3, lazy_launch=False, @@ -265,7 +266,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent a agent_a = DemoRpcAgentAdd( name="a", - ).to_distributed( + ).to_dist( host=host, port=port1, lazy_launch=False, @@ -277,7 +278,7 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: # rpc agent c agent_c = DemoRpcAgentAdd( name="c", - ).to_distributed( + ).to_dist( host=host, port=port2, lazy_launch=False, @@ -294,13 +295,13 @@ def test_msghub_compatibility(self) -> None: """test compatibility with msghub""" agent_a = DemoRpcAgentWithMemory( name="a", - ).to_distributed() + ).to_dist() agent_b = DemoRpcAgentWithMemory( name="b", - ).to_distributed() + ).to_dist() agent_c = DemoRpcAgentWithMemory( name="c", - ).to_distributed() + ).to_dist() participants = [agent_a, agent_b, agent_c] annonuncement_msgs = [ Msg(name="System", content="Announcement 1"), @@ -336,7 +337,7 @@ def test_standalone_multiprocess_init(self) -> None: # rpc agent a agent_a = DemoRpcAgentWithMonitor( name="a", - ).to_distributed( + ).to_dist( host=host, port=port1, lazy_launch=False, @@ -344,7 +345,7 @@ def test_standalone_multiprocess_init(self) -> None: # local agent b agent_b = DemoRpcAgentWithMonitor( name="b", - ).to_distributed( + ).to_dist( host=host, port=port2, lazy_launch=False, From 7db302b54feb3dddebaab89171a54b2b46005990 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 25 Jan 2024 15:50:39 +0800 Subject: [PATCH 09/15] update distributed examples --- examples/distributed/distributed_debate.py | 52 ++++++++++++---------- examples/distributed/distributed_dialog.py | 4 +- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index 300b2f537..90a0256ac 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -57,7 +57,7 @@ def parse_args() -> argparse.Namespace: def setup_server(parsed_args: argparse.Namespace) -> None: """Setup rpc server for participant agent""" agentscope.init( - model_configs="configs/model_configs.json", + model_configs="configs/model_configs_pxc.json", ) with open( "configs/debate_agent_configs.json", @@ -82,29 +82,35 @@ def setup_server(parsed_args: argparse.Namespace) -> None: def run_main_process(parsed_args: argparse.Namespace) -> None: """Setup the main debate competition process""" agentscope.init( - model_configs="configs/model_configs.json", - ) - pro_agent = DialogAgent( - name="Pro", - ).to_dist( - host=parsed_args.pro_host, - port=parsed_args.pro_port, - launch_server=False, - ) - con_agent = DialogAgent( - name="Con", - ).to_dist( - host=parsed_args.con_host, - port=parsed_args.con_port, - launch_server=False, - ) - judge_agent = DialogAgent( - name="Judge", - ).to_dist( - host=parsed_args.judge_host, - port=parsed_args.judge_port, - launch_server=False, + model_configs="configs/model_configs_pxc.json", ) + with open( + "configs/debate_agent_configs.json", + "r", + encoding="utf-8", + ) as f: + configs = json.load(f) + pro_agent = DialogAgent( + **configs["pro"], + ).to_dist( + host=parsed_args.pro_host, + port=parsed_args.pro_port, + launch_server=False, + ) + con_agent = DialogAgent( + **configs["con"], + ).to_dist( + host=parsed_args.con_host, + port=parsed_args.con_port, + launch_server=False, + ) + judge_agent = DialogAgent( + **configs["judge"], + ).to_dist( + host=parsed_args.judge_host, + port=parsed_args.judge_port, + launch_server=False, + ) participants = [pro_agent, con_agent, judge_agent] hint = Msg(name="System", content=ANNOUNCEMENT) x = None diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index ce81b67be..d170862ca 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -43,7 +43,6 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: "sys_prompt": "You are a helpful assistant.", "model": "gpt-3.5-turbo", "use_memory": True, - "local_mode": False, }, host=assistant_host, port=assistant_port, @@ -60,6 +59,9 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: ) assistant_agent = DialogAgent( name="Assistant", + sys_prompt="You are a helpful assistant.", + model="gpt-3.5-turbo", + use_memory=True, ).to_dist( host=assistant_host, port=assistant_port, From 5cebdcf2e31ea4986f58ba9d0c0733b3f8c27922 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 25 Jan 2024 15:56:08 +0800 Subject: [PATCH 10/15] clean code --- examples/distributed/distributed_debate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index 90a0256ac..a3c9c77a5 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -57,7 +57,7 @@ def parse_args() -> argparse.Namespace: def setup_server(parsed_args: argparse.Namespace) -> None: """Setup rpc server for participant agent""" agentscope.init( - model_configs="configs/model_configs_pxc.json", + model_configs="configs/model_configs.json", ) with open( "configs/debate_agent_configs.json", @@ -82,7 +82,7 @@ def setup_server(parsed_args: argparse.Namespace) -> None: def run_main_process(parsed_args: argparse.Namespace) -> None: """Setup the main debate competition process""" agentscope.init( - model_configs="configs/model_configs_pxc.json", + model_configs="configs/model_configs.json", ) with open( "configs/debate_agent_configs.json", From 4716b20d6ba8aac1a4eef9bbc739580462bc2994 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 25 Jan 2024 16:06:17 +0800 Subject: [PATCH 11/15] update default local_mode value of ServerLauncher --- docs/sphinx_doc/source/tutorial/208-distribute.md | 2 -- examples/distributed/distributed_debate.py | 1 - examples/distributed/distributed_dialog.py | 1 - src/agentscope/agents/rpc_agent.py | 4 ++-- 4 files changed, 2 insertions(+), 6 deletions(-) diff --git a/docs/sphinx_doc/source/tutorial/208-distribute.md b/docs/sphinx_doc/source/tutorial/208-distribute.md index 4a9d36101..9f73100d1 100644 --- a/docs/sphinx_doc/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/source/tutorial/208-distribute.md @@ -96,7 +96,6 @@ server_a = RpcAgentServerLauncher( }, host=ip_a, port=12001, - local_mode=False, ) server_a.launch() server_a.wait_until_terminate() @@ -116,7 +115,6 @@ server_b = RpcAgentServerLauncher( }, host=ip_b, port=12001, - local_mode=False, ) server_b.launch() server_b.wait_until_terminate() diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index a3c9c77a5..b0ccb4a45 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -73,7 +73,6 @@ def setup_server(parsed_args: argparse.Namespace) -> None: agent_kwargs=config, host=host, port=port, - local_mode=False, ) server_launcher.launch() server_launcher.wait_until_terminate() diff --git a/examples/distributed/distributed_dialog.py b/examples/distributed/distributed_dialog.py index d170862ca..0a6f30e51 100644 --- a/examples/distributed/distributed_dialog.py +++ b/examples/distributed/distributed_dialog.py @@ -46,7 +46,6 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: }, host=assistant_host, port=assistant_port, - local_mode=False, ) assistant_server_launcher.launch() assistant_server_launcher.wait_until_terminate() diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 746fe6303..044832315 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -305,7 +305,7 @@ def __init__( port: int = None, max_pool_size: int = 100, max_timeout_seconds: int = 1800, - local_mode: bool = True, + local_mode: bool = False, ) -> None: """Init a rpc agent server launcher. @@ -324,7 +324,7 @@ def __init__( Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): Timeout for task results. - local_mode (`bool`, defaults to `True`): + local_mode (`bool`, defaults to `False`): Whether the started rpc server only listens to local requests. """ From 644ecce3ee0d8ad86d38cf65012e1b0af94fffaf Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 25 Jan 2024 16:32:18 +0800 Subject: [PATCH 12/15] update launcher --- src/agentscope/agents/rpc_agent.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 044832315..76988a541 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -366,6 +366,10 @@ def launch(self) -> None: self.port = self.parent_con.recv() start_event.wait() self.server = server_process + logger.info( + f"Launch [{self.agent_class.__name__}] server at " + f"[{self.host}:{self.port}] success", + ) def wait_until_terminate(self) -> None: """Wait for server process""" From 4da84ceb4de2d86db66c554a33e24bd365ed6ee7 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 25 Jan 2024 16:46:43 +0800 Subject: [PATCH 13/15] update rpc agent client --- src/agentscope/rpc/rpc_agent_client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/agentscope/rpc/rpc_agent_client.py b/src/agentscope/rpc/rpc_agent_client.py index e1f5f3937..768a81515 100644 --- a/src/agentscope/rpc/rpc_agent_client.py +++ b/src/agentscope/rpc/rpc_agent_client.py @@ -30,8 +30,6 @@ def __init__(self, host: str, port: int) -> None: """ self.host = host self.port = port - self.channel = grpc.insecure_channel(f"{self.host}:{self.port}") - self.stub = RpcAgentStub(self.channel) def call_func(self, func_name: str, value: str = None) -> str: """Call the specific function of rpc server. @@ -43,7 +41,9 @@ def call_func(self, func_name: str, value: str = None) -> str: Returns: str: serialized return data. """ - result_msg = self.stub.call_func( - RpcMsg(value=value, target_func=func_name), - ) - return result_msg.value + with grpc.insecure_channel(f"{self.host}:{self.port}") as channel: + stub = RpcAgentStub(channel) + result_msg = stub.call_func( + RpcMsg(value=value, target_func=func_name), + ) + return result_msg.value From 68ecafdcd3965ed3f20319471ec001afe0a8002f Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 1 Feb 2024 17:19:35 +0800 Subject: [PATCH 14/15] opt docs and examples of distributed modules --- .../source/tutorial/208-distribute.md | 2 + .../configs/debate_agent_configs.json | 43 +++++++++------ examples/distributed/distributed_debate.py | 53 +++++++++---------- src/agentscope/agents/agent.py | 6 +-- 4 files changed, 56 insertions(+), 48 deletions(-) diff --git a/docs/sphinx_doc/source/tutorial/208-distribute.md b/docs/sphinx_doc/source/tutorial/208-distribute.md index 9f73100d1..22e0f8d63 100644 --- a/docs/sphinx_doc/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/source/tutorial/208-distribute.md @@ -56,6 +56,8 @@ Please follow the steps below to deploy your application distributedly. - `local_mode`: set to `True` if all agents run on the same machine, defaults to `True`. - `lazy_launch`: if set to `True`, only launch the server when the agent is called. +> The `to_dist` method is implemented based on [gRPC](https://grpc.io/). When 'launch_server' is set to `True`, it will start a gRPC server process, and the original agent will be transferred to the new process to run. + ### Run in multi-process mode AgentScope supports deployment in multi-process mode, where each agent is a sub-process of the application's main process, and all agents run on the same machine. diff --git a/examples/distributed/configs/debate_agent_configs.json b/examples/distributed/configs/debate_agent_configs.json index 658c6cce4..dec5af779 100644 --- a/examples/distributed/configs/debate_agent_configs.json +++ b/examples/distributed/configs/debate_agent_configs.json @@ -1,20 +1,29 @@ -{ - "pro": { - "name": "Pro", - "sys_prompt": "Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.", - "model": "gpt-3.5-turbo", - "use_memory": true +[ + { + "class": "DictDialogAgent", + "args": { + "name": "Pro", + "sys_prompt": "Assume the role of a debater who is arguing in favor of the proposition that AGI (Artificial General Intelligence) can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models are a viable path to AGI. Highlight the advancements in language understanding, adaptability, and scalability of GPT models as key factors in progressing towards AGI.", + "model": "gpt-3.5-turbo", + "use_memory": true + } }, - "con": { - "name": "Con", - "sys_prompt": "Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.", - "model": "gpt-3.5-turbo", - "use_memory": true + { + "class": "DictDialogAgent", + "args": { + "name": "Con", + "sys_prompt": "Assume the role of a debater who is arguing against the proposition that AGI can be achieved using the GPT model framework. Construct a coherent and persuasive argument, including scientific, technological, and theoretical evidence, to support the statement that GPT models, while impressive, are insufficient for reaching AGI. Discuss the limitations of GPT models such as lack of understanding, consciousness, ethical reasoning, and general problem-solving abilities that are essential for true AGI.", + "model": "gpt-3.5-turbo", + "use_memory": true + } }, - "judge": { - "name": "Judge", - "sys_prompt": "Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.", - "model": "gpt-3.5-turbo", - "use_memory": true + { + "class": "DictDialogAgent", + "args": { + "name": "Judge", + "sys_prompt": "Assume the role of an impartial judge in a debate where the affirmative side argues that AGI can be achieved using the GPT model framework, and the negative side contests this. Listen to both sides' arguments and provide an analytical judgment on which side presented a more compelling and reasonable case. Consider the strength of the evidence, the persuasiveness of the reasoning, and the overall coherence of the arguments presented by each side.", + "model": "gpt-3.5-turbo", + "use_memory": true + } } -} \ No newline at end of file +] \ No newline at end of file diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index b0ccb4a45..7144e1ad6 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -65,6 +65,11 @@ def setup_server(parsed_args: argparse.Namespace) -> None: encoding="utf-8", ) as f: configs = json.load(f) + configs = { + "pro": configs[0]["args"], + "con": configs[1]["args"], + "judge": configs[2]["args"], + } config = configs[parsed_args.role] host = getattr(parsed_args, f"{parsed_args.role}_host") port = getattr(parsed_args, f"{parsed_args.role}_port") @@ -80,36 +85,28 @@ def setup_server(parsed_args: argparse.Namespace) -> None: def run_main_process(parsed_args: argparse.Namespace) -> None: """Setup the main debate competition process""" - agentscope.init( + agents = agentscope.init( model_configs="configs/model_configs.json", + agent_configs="configs/debate_agent_configs.json", + ) + pro_agent = agents[0] + con_agent = agents[1] + judge_agent = agents[2] + pro_agent = pro_agent.to_dist( + host=parsed_args.pro_host, + port=parsed_args.pro_port, + launch_server=False, + ) + con_agent = con_agent.to_dist( + host=parsed_args.con_host, + port=parsed_args.con_port, + launch_server=False, + ) + judge_agent = judge_agent.to_dist( + host=parsed_args.judge_host, + port=parsed_args.judge_port, + launch_server=False, ) - with open( - "configs/debate_agent_configs.json", - "r", - encoding="utf-8", - ) as f: - configs = json.load(f) - pro_agent = DialogAgent( - **configs["pro"], - ).to_dist( - host=parsed_args.pro_host, - port=parsed_args.pro_port, - launch_server=False, - ) - con_agent = DialogAgent( - **configs["con"], - ).to_dist( - host=parsed_args.con_host, - port=parsed_args.con_port, - launch_server=False, - ) - judge_agent = DialogAgent( - **configs["judge"], - ).to_dist( - host=parsed_args.judge_host, - port=parsed_args.judge_port, - launch_server=False, - ) participants = [pro_agent, con_agent, judge_agent] hint = Msg(name="System", content=ANNOUNCEMENT) x = None diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index 463650f83..18cca85b0 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -16,11 +16,11 @@ class _RecordInitSettingMeta(ABCMeta): - """A wrapper to record the init args into `init_settings` field.""" + """A wrapper to record the init args into `_init_settings` field.""" def __call__(cls, *args: tuple, **kwargs: dict) -> Any: instance = super().__call__(*args, **kwargs) - instance.init_settings = {"args": args, "kwargs": kwargs} + instance._init_settings = {"args": args, "kwargs": kwargs} return instance @@ -219,7 +219,7 @@ def to_dist( return self return RpcAgent( agent_class=self.__class__, - agent_configs=self.init_settings, + agent_configs=self._init_settings, name=self.name, host=host, port=port, From 573dedcf6692724c58a16cd35ef045b8261868ed Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 1 Feb 2024 17:42:29 +0800 Subject: [PATCH 15/15] opt distributed example --- .pre-commit-config.yaml | 1 + examples/distributed/distributed_debate.py | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dea4142eb..35f778a37 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -88,6 +88,7 @@ repos: --disable=W1113, --disable=W0221, --disable=R0401, + --disable=W0632, ] - repo: https://github.com/regebro/pyroma rev: "4.0" diff --git a/examples/distributed/distributed_debate.py b/examples/distributed/distributed_debate.py index 7144e1ad6..669b05304 100644 --- a/examples/distributed/distributed_debate.py +++ b/examples/distributed/distributed_debate.py @@ -85,13 +85,10 @@ def setup_server(parsed_args: argparse.Namespace) -> None: def run_main_process(parsed_args: argparse.Namespace) -> None: """Setup the main debate competition process""" - agents = agentscope.init( + pro_agent, con_agent, judge_agent = agentscope.init( model_configs="configs/model_configs.json", agent_configs="configs/debate_agent_configs.json", ) - pro_agent = agents[0] - con_agent = agents[1] - judge_agent = agents[2] pro_agent = pro_agent.to_dist( host=parsed_args.pro_host, port=parsed_args.pro_port,