fix: add retry mechanism for file uploads and response handling in image.py
parent
775aa9148c
commit
ab434b0a9e
|
|
@ -124,20 +124,20 @@ class BaseVideoUnderstandNode(IVideoUnderstandNode):
|
||||||
if self.node.id == val['node_id'] and 'video_list' in val:
|
if self.node.id == val['node_id'] and 'video_list' in val:
|
||||||
if val['dialogue_type'] == 'WORKFLOW':
|
if val['dialogue_type'] == 'WORKFLOW':
|
||||||
return chat_record.get_ai_message()
|
return chat_record.get_ai_message()
|
||||||
return AIMessage(content=val['answer'])
|
return AIMessage(content=val['answer'] or '')
|
||||||
return chat_record.get_ai_message()
|
return chat_record.get_ai_message()
|
||||||
|
|
||||||
def generate_history_human_message_for_details(self, chat_record):
|
def generate_history_human_message_for_details(self, chat_record):
|
||||||
for data in chat_record.details.values():
|
for data in chat_record.details.values():
|
||||||
if self.node.id == data['node_id'] and 'video_list' in data:
|
if self.node.id == data['node_id'] and 'video_list' in data:
|
||||||
video_list = data['video_list']
|
video_list = data['video_list']
|
||||||
if len(video_list) == 0 or data['dialogue_type'] == 'WORKFLOW':
|
# 增加对 None 和空列表的检查
|
||||||
|
if not video_list or len(video_list) == 0 or data['dialogue_type'] == 'WORKFLOW':
|
||||||
return HumanMessage(content=chat_record.problem_text)
|
return HumanMessage(content=chat_record.problem_text)
|
||||||
file_id_list = [video.get('file_id') for video in video_list]
|
file_id_list = [video.get('file_id') for video in video_list]
|
||||||
return HumanMessage(content=[
|
return HumanMessage(content=[
|
||||||
{'type': 'text', 'text': data['question']},
|
{'type': 'text', 'text': data['question']},
|
||||||
*[{'type': 'video_url', 'video_url': {'url': f'./oss/file/{file_id}'}} for file_id in file_id_list]
|
*[{'type': 'video_url', 'video_url': {'url': f'./oss/file/{file_id}'}} for file_id in file_id_list]
|
||||||
|
|
||||||
])
|
])
|
||||||
return HumanMessage(content=chat_record.problem_text)
|
return HumanMessage(content=chat_record.problem_text)
|
||||||
|
|
||||||
|
|
@ -145,7 +145,7 @@ class BaseVideoUnderstandNode(IVideoUnderstandNode):
|
||||||
start_index = len(history_chat_record) - dialogue_number
|
start_index = len(history_chat_record) - dialogue_number
|
||||||
history_message = reduce(lambda x, y: [*x, *y], [
|
history_message = reduce(lambda x, y: [*x, *y], [
|
||||||
[self.generate_history_human_message(history_chat_record[index], video_model),
|
[self.generate_history_human_message(history_chat_record[index], video_model),
|
||||||
self.generate_history_ai_message(history_chat_record[index]), video_model]
|
self.generate_history_ai_message(history_chat_record[index])]
|
||||||
for index in
|
for index in
|
||||||
range(start_index if start_index > 0 else 0, len(history_chat_record))], [])
|
range(start_index if start_index > 0 else 0, len(history_chat_record))], [])
|
||||||
return history_message
|
return history_message
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
# coding=utf-8
|
# coding=utf-8
|
||||||
import datetime
|
import datetime
|
||||||
|
import time
|
||||||
from typing import Dict, Optional, Any, Iterator
|
from typing import Dict, Optional, Any, Iterator
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
|
@ -81,13 +82,24 @@ class QwenVLChatModel(MaxKBBaseModel, BaseChatOpenAI):
|
||||||
return f"oss://{key}"
|
return f"oss://{key}"
|
||||||
|
|
||||||
def upload_file_and_get_url(self, file_stream, file_name):
|
def upload_file_and_get_url(self, file_stream, file_name):
|
||||||
"""上传文件并获取URL"""
|
max_retries = 3
|
||||||
# 1. 获取上传凭证,上传凭证接口有限流,超出限流将导致请求失败
|
|
||||||
policy_data = self.get_upload_policy(self.openai_api_key.get_secret_value(), self.model_name)
|
|
||||||
# 2. 上传文件到OSS
|
|
||||||
oss_url = self.upload_file_to_oss(policy_data, file_stream, file_name)
|
|
||||||
|
|
||||||
return oss_url
|
retry_delay = 1 # 初始重试延迟(秒)
|
||||||
|
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
# 1. 获取上传凭证,上传凭证接口有限流,超出限流将导致请求失败
|
||||||
|
policy_data = self.get_upload_policy(self.openai_api_key.get_secret_value(), self.model_name)
|
||||||
|
# 2. 上传文件到OSS
|
||||||
|
oss_url = self.upload_file_to_oss(policy_data, file_stream, file_name)
|
||||||
|
return oss_url
|
||||||
|
except Exception as e:
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
# 指数退避策略
|
||||||
|
time.sleep(retry_delay * (2 ** attempt))
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise Exception(f"文件上传失败,已重试{max_retries}次: {str(e)}")
|
||||||
|
|
||||||
def stream(
|
def stream(
|
||||||
self,
|
self,
|
||||||
|
|
@ -129,32 +141,54 @@ class QwenVLChatModel(MaxKBBaseModel, BaseChatOpenAI):
|
||||||
**self.extra_body,
|
**self.extra_body,
|
||||||
"stream": True,
|
"stream": True,
|
||||||
}
|
}
|
||||||
response = requests.post(url, headers=headers, json=data, stream=True)
|
|
||||||
if response.status_code != 200:
|
# 增加重试机制
|
||||||
raise Exception(f"Failed to get response: {response.text}")
|
max_retries = 3
|
||||||
for line in response.iter_lines():
|
retry_delay = 1
|
||||||
if line:
|
|
||||||
try:
|
for attempt in range(max_retries):
|
||||||
decoded_line = line.decode('utf-8')
|
try:
|
||||||
# 检查是否是有效的SSE数据行
|
response = requests.post(url, headers=headers, json=data, stream=True, timeout=30)
|
||||||
if decoded_line.startswith('data: '):
|
if response.status_code != 200:
|
||||||
# 提取JSON部分
|
raise Exception(f"Failed to get response: {response.text}")
|
||||||
json_str = decoded_line[6:] # 移除 'data: ' 前缀
|
|
||||||
# 检查是否是结束标记
|
for line in response.iter_lines():
|
||||||
if json_str.strip() == '[DONE]':
|
if line:
|
||||||
|
try:
|
||||||
|
decoded_line = line.decode('utf-8')
|
||||||
|
# 检查是否是有效的SSE数据行
|
||||||
|
if decoded_line.startswith('data: '):
|
||||||
|
# 提取JSON部分
|
||||||
|
json_str = decoded_line[6:] # 移除 'data: ' 前缀
|
||||||
|
# 检查是否是结束标记
|
||||||
|
if json_str.strip() == '[DONE]':
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 尝试解析JSON
|
||||||
|
chunk_data = json.loads(json_str)
|
||||||
|
|
||||||
|
if 'choices' in chunk_data and chunk_data['choices']:
|
||||||
|
delta = chunk_data['choices'][0].get('delta', {})
|
||||||
|
content = delta.get('content', '')
|
||||||
|
if content:
|
||||||
|
yield AIMessage(content=content)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
# 忽略无法解析的行
|
||||||
continue
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
# 处理其他可能的异常
|
||||||
|
continue
|
||||||
|
break # 成功执行则退出重试循环
|
||||||
|
|
||||||
# 尝试解析JSON
|
except (requests.exceptions.ProxyError, requests.exceptions.ConnectionError) as e:
|
||||||
chunk_data = json.loads(json_str)
|
if attempt < max_retries - 1:
|
||||||
|
time.sleep(retry_delay * (2 ** attempt)) # 指数退避
|
||||||
if 'choices' in chunk_data and chunk_data['choices']:
|
|
||||||
delta = chunk_data['choices'][0].get('delta', {})
|
|
||||||
content = delta.get('content', '')
|
|
||||||
if content:
|
|
||||||
yield AIMessage(content=content)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
# 忽略无法解析的行
|
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
else:
|
||||||
# 处理其他可能的异常
|
raise Exception(f"网络连接失败,已重试{max_retries}次: {str(e)}")
|
||||||
|
except Exception as e:
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
time.sleep(retry_delay * (2 ** attempt))
|
||||||
continue
|
continue
|
||||||
|
else:
|
||||||
|
raise Exception(f"请求失败,已重试{max_retries}次: {str(e)}")
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ class VolcanicEngineImage(MaxKBBaseModel, BaseChatOpenAI):
|
||||||
return f'data:{video_format};base64,{base64_video}'
|
return f'data:{video_format};base64,{base64_video}'
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_video_format(file_name):
|
def get_video_format(file_name):
|
||||||
extension = file_name.split('.')[-1].lower()
|
extension = file_name.split('.')[-1].lower()
|
||||||
format_map = {
|
format_map = {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue