Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

customize operator in awel error. #2319

Open
atczyh opened this issue Jan 23, 2025 · 2 comments
Open

customize operator in awel error. #2319

atczyh opened this issue Jan 23, 2025 · 2 comments

Comments

@atczyh
Copy link

atczyh commented Jan 23, 2025

我自定义了一个str2stream_operator.py算子主要用于将Datasource Executor Operator算子输出的内容中data数据提取出来并转成流式输出:
1.算子放在目录dbgpt\core\interface\operators下,参考案例继承StreamifyAbsOperator实现该功能。
2.修改了dbgpt\core\operators的__init__.py文件添加算子的信息
from dbgpt.core.interface.operators.str2stream_operator import ( # noqa: F401
str2streamOperator,
)
3.建立测试工作流测试算子是否能够正常运行,经过测试算子在以下工作流中可正常运行。
Image
Image

4.将算子添加到带LLM Operator的工作流中出现错误。
Image
Image
2025-01-23 08:59:15 DESKTOP-TMVQ7II dbgpt.core.awel.runner.local_runner[9124] INFO Run operator <class 'dbgpt.core.interface.operators.str2stream_operator.str2streamOperator'>(86039032-ed2b-4e1d-85e5-33fc903bb263) error, error message: Traceback (most recent call last):
File "j:\db-gpt-main\dbgpt\core\awel\runner\local_runner.py", line 192, in _execute_node
await node._run(dag_ctx, task_ctx.log_id)
File "j:\db-gpt-main\dbgpt\core\awel\operators\base.py", line 248, in _run
return await self._do_run(dag_ctx)
File "j:\db-gpt-main\dbgpt\core\awel\operators\stream_operator.py", line 27, in _do_run
output = await curr_task_ctx.task_input.parent_outputs[0].task_output.streamify(
File "j:\db-gpt-main\dbgpt\core\awel\task\base.py", line 185, in streamify
raise NotImplementedError
NotImplementedError
INFO: 127.0.0.1:53741 - "GET /api/v1/chat/dialogue/list HTTP/1.1" 200 OK
2025-01-23 08:59:33 DESKTOP-TMVQ7II asyncio[9124] ERROR Task exception was never retrieved
future: <Task finished name='Task-741' coro=<<async_generator_athrow without name>()> exception=RuntimeError('DAG context not found with event loop task id 2159373769280, task_name: Task-741')>
Traceback (most recent call last):
File "j:\db-gpt-main\dbgpt\core\awel\util\chat_util.py", line 87, in safe_chat_stream_with_dag_task
async for output in chat_stream_with_dag_task(
File "j:\db-gpt-main\dbgpt\core\awel\util\chat_util.py", line 162, in chat_stream_with_dag_task
async for output in await task.call_stream(request):
File "j:\db-gpt-main\dbgpt\core\awel\operators\base.py", line 337, in call_stream
out_ctx = await self._runner.execute_workflow(
File "j:\db-gpt-main\dbgpt\core\awel\runner\local_runner.py", line 114, in execute_workflow
await self._execute_node(
File "j:\db-gpt-main\dbgpt\core\awel\runner\local_runner.py", line 213, in _execute_node
raise e
File "j:\db-gpt-main\dbgpt\core\awel\runner\local_runner.py", line 192, in _execute_node
await node._run(dag_ctx, task_ctx.log_id)
File "j:\db-gpt-main\dbgpt\core\awel\operators\base.py", line 248, in _run
return await self._do_run(dag_ctx)
File "j:\db-gpt-main\dbgpt\core\awel\operators\stream_operator.py", line 27, in _do_run
output = await curr_task_ctx.task_input.parent_outputs[0].task_output.streamify(
File "j:\db-gpt-main\dbgpt\core\awel\task\base.py", line 185, in streamify
raise NotImplementedError
NotImplementedError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "j:\db-gpt-main\dbgpt\core\awel\util\chat_util.py", line 95, in safe_chat_stream_with_dag_task
yield ModelOutput(error_code=1, text=simple_error_msg, incremental=incremental)
GeneratorExit
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "j:\db-gpt-main\dbgpt\core\awel\util\chat_util.py", line 98, in safe_chat_stream_with_dag_task
await task.dag._after_dag_end(task.current_event_loop_task_id)
File "j:\db-gpt-main\dbgpt\core\awel\dag\base.py", line 926, in _after_dag_end
raise RuntimeError(
RuntimeError: DAG context not found with event loop task id 2159373769280, task_name: Task-741
5.算子代码如下
https://github.com/atczyh/paddletest/blob/master/str2stream_operator.py
请问以上问题该如何解决?已困扰我很长时间了。
注:我的操作系统是win10,采用Miniconda进行源码部署,python=3.10,部署版本为V0.6.3

@Aries-ckt
Copy link
Collaborator

try:
            print('streamify',data)
            parsed_data=data.replace("data:```vis-db-chart",'').replace("```vis-db-chart\n",'').replace("\n```",'')
            print('streamify2',parsed_data)
            parsed_data = json.loads(parsed_data)
            data_dd=parsed_data['data']
            print(len(data_dd))
            for row in data_dd:

it looks like your func content is not a async method.

@Aries-ckt Aries-ckt changed the title 自定义算子在awel中调用出错 customize operator in awel error. Jan 23, 2025
@atczyh
Copy link
Author

atczyh commented Jan 24, 2025

try:
print('streamify',data)
parsed_data=data.replace("data:vis-db-chart",'').replace("vis-db-chart\n",'').replace("\n```",'')
print('streamify2',parsed_data)
parsed_data = json.loads(parsed_data)
data_dd=parsed_data['data']
print(len(data_dd))
for row in data_dd:
it looks like your func content is not a async method.

这个方法是参考案例http://docs.dbgpt.cn/docs/awel/awel_tutorial/basic_syntax/2.5_streamify_operator 进行编写的,对上级节点Datasource Executor Operator传递的str进行处理。目前感觉问题出在在LLM Operator算子上,如果awel的末端节点是非流式输出则LLM Operator带动Datasource Executor Operator进行非流式输出。如果末端节点是流式输出LLM Operator带动Datasource Executor Operator进行流式输出,这会导致继承自StreamifyAbsOperator自定义算子报错,因为StreamifyAbsOperator只能接收非流式输入。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants