UnisKB/apps/application/chat_pipeline/pipeline_manage.py

58 lines
2.0 KiB
Python
Raw Normal View History

2024-01-16 08:46:54 +00:00
# coding=utf-8
"""
@project: maxkb
@Author
@file pipeline_manage.py
@date2024/1/9 17:40
@desc:
"""
import time
from functools import reduce
from typing import List, Type, Dict
from application.chat_pipeline.I_base_chat_pipeline import IBaseChatPipelineStep
2024-09-09 06:47:25 +00:00
from common.handle.base_to_response import BaseToResponse
from common.handle.impl.response.system_to_response import SystemToResponse
2024-01-16 08:46:54 +00:00
2024-04-15 02:59:50 +00:00
class PipelineManage:
2024-09-09 06:47:25 +00:00
def __init__(self, step_list: List[Type[IBaseChatPipelineStep]],
base_to_response: BaseToResponse = SystemToResponse()):
2024-01-16 08:46:54 +00:00
# 步骤执行器
self.step_list = [step() for step in step_list]
# 上下文
self.context = {'message_tokens': 0, 'answer_tokens': 0}
2024-09-09 06:47:25 +00:00
self.base_to_response = base_to_response
2024-01-16 08:46:54 +00:00
def run(self, context: Dict = None):
self.context['start_time'] = time.time()
if context is not None:
for key, value in context.items():
self.context[key] = value
for step in self.step_list:
step.run(self)
def get_details(self):
return reduce(lambda x, y: {**x, **y}, [{item.get('step_type'): item} for item in
filter(lambda r: r is not None,
[row.get_details(self) for row in self.step_list])], {})
2024-09-09 06:47:25 +00:00
def get_base_to_response(self):
return self.base_to_response
2024-01-16 08:46:54 +00:00
class builder:
def __init__(self):
self.step_list: List[Type[IBaseChatPipelineStep]] = []
2024-09-09 06:47:25 +00:00
self.base_to_response = SystemToResponse()
2024-01-16 08:46:54 +00:00
def append_step(self, step: Type[IBaseChatPipelineStep]):
self.step_list.append(step)
return self
2024-09-09 06:47:25 +00:00
def add_base_to_response(self, base_to_response: BaseToResponse):
self.base_to_response = base_to_response
return self
2024-01-16 08:46:54 +00:00
def build(self):
2024-09-09 06:47:25 +00:00
return PipelineManage(step_list=self.step_list, base_to_response=self.base_to_response)