diff --git a/mineru/cli/fast_api.py b/mineru/cli/fast_api.py index d393c75..23e95f3 100644 --- a/mineru/cli/fast_api.py +++ b/mineru/cli/fast_api.py @@ -5,6 +5,8 @@ import re import tempfile import asyncio import threading +import hashlib +import json import uvicorn import click import zipfile @@ -36,13 +38,54 @@ _request_semaphore: Optional[asyncio.Semaphore] = None # --- 任务进度跟踪 --- _task_progress: dict = {} +_task_progress_dir = Path( + os.getenv("MINERU_TASK_PROGRESS_DIR", Path(tempfile.gettempdir()) / "mineru-task-progress") +) + + +def _task_progress_path(task_id: str) -> Path: + task_key = hashlib.sha256(task_id.encode("utf-8")).hexdigest() + return _task_progress_dir / f"{task_key}.json" + + +def _store_task_progress(task_id: str, state: dict) -> None: + """Store progress in memory and on disk so API worker processes share it.""" + state = dict(state) + state["task_id"] = task_id + _task_progress[task_id] = state + + try: + _task_progress_dir.mkdir(parents=True, exist_ok=True) + progress_path = _task_progress_path(task_id) + temp_path = progress_path.with_name( + f"{progress_path.name}.{os.getpid()}.{threading.get_ident()}.tmp" + ) + with open(temp_path, "w", encoding="utf-8") as fp: + json.dump(state, fp, ensure_ascii=False) + os.replace(temp_path, progress_path) + except Exception as exc: + logger.warning(f"Failed to persist task progress for {task_id}: {exc}") + + +def _get_task_progress(task_id: str) -> Optional[dict]: + try: + with open(_task_progress_path(task_id), "r", encoding="utf-8") as fp: + state = json.load(fp) + _task_progress[task_id] = state + return state + except (FileNotFoundError, json.JSONDecodeError, OSError): + return _task_progress.get(task_id) def _update_task_progress(task_id: Optional[str], progress: int, stage: str): """更新任务进度(安全调用,task_id 为 None 时静默跳过)""" - if task_id and task_id in _task_progress: - _task_progress[task_id]["progress"] = min(progress, 100) - _task_progress[task_id]["stage"] = stage + if not task_id: + return + state = _get_task_progress(task_id) + if state is not None: + state["progress"] = min(progress, 100) + state["stage"] = stage + _store_task_progress(task_id, state) class ProgressTracker: @@ -248,12 +291,29 @@ def get_infer_result(file_suffix_identifier: str, pdf_name: str, parse_dir: str) api_router = APIRouter(prefix="/api") +@api_router.post("/parse_tasks/{task_id}", status_code=201) +async def create_parse_task(task_id: str): + """Register a task before the multipart upload starts.""" + state = { + "progress": 0, + "stage": "等待上传", + "status": "pending", + "error": None, + "file_names": "", + } + _store_task_progress(task_id, state) + logger.info(f"Registered parse task pid={os.getpid()} task_id={task_id}") + return state + + @api_router.get("/parse_progress/{task_id}") async def get_parse_progress(task_id: str): """查询解析任务的实时进度""" - if task_id not in _task_progress: + state = _get_task_progress(task_id) + if state is None: + logger.warning(f"Parse task not found pid={os.getpid()} task_id={task_id}") raise HTTPException(status_code=404, detail="Task not found") - return _task_progress[task_id] + return state @api_router.post(path="/file_parse", dependencies=[Depends(limit_concurrency)]) @@ -274,6 +334,7 @@ async def parse_pdf( response_format_zip: bool = Form(False), start_page_id: int = Form(0), end_page_id: int = Form(99999), + form_task_id: Optional[str] = Form(None, alias="task_id"), x_task_id: Optional[str] = Header(None), ): # 从 app 实例状态中获取配置 (FastAPI 实例会在下方创建) @@ -281,12 +342,16 @@ async def parse_pdf( config = getattr(app.state, "config", {}) # 初始化进度跟踪 - task_id = x_task_id or str(uuid.uuid4()) + task_id = x_task_id or form_task_id or str(uuid.uuid4()) file_names_str = ", ".join(f.filename or "unknown" for f in files) - _task_progress[task_id] = { + _store_task_progress(task_id, { "progress": 0, "stage": "准备中", "status": "processing", "error": None, "file_names": file_names_str, - } + }) + logger.info( + f"Started parse task pid={os.getpid()} task_id={task_id} " + f"header_task_id={x_task_id} form_task_id={form_task_id}" + ) try: unique_dir = os.path.join(output_dir, str(uuid.uuid4())) @@ -353,7 +418,9 @@ async def parse_pdf( _update_task_progress(task_id, 97, "生成结果文件") _update_task_progress(task_id, 100, "转换完成") - _task_progress[task_id]["status"] = "completed" + completed_state = _get_task_progress(task_id) or {} + completed_state["status"] = "completed" + _store_task_progress(task_id, completed_state) # 清理日志捕获和 stderr 捕获 stderr_capture.stop() @@ -416,8 +483,10 @@ async def parse_pdf( _current_task_id = None except Exception: pass - _task_progress[task_id]["status"] = "failed" - _task_progress[task_id]["error"] = str(e) + failed_state = _get_task_progress(task_id) or {} + failed_state["status"] = "failed" + failed_state["error"] = str(e) + _store_task_progress(task_id, failed_state) return JSONResponse(status_code=500, content={"error": f"Internal Error: {str(e)}"}) @@ -481,4 +550,4 @@ def main(ctx, host, port, reload, **kwargs): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/web_ui/src/api/document.ts b/web_ui/src/api/document.ts index a1addbc..170a99f 100644 --- a/web_ui/src/api/document.ts +++ b/web_ui/src/api/document.ts @@ -32,7 +32,7 @@ export interface ParseResult { export interface ParseProgress { progress: number stage: string - status: 'processing' | 'completed' | 'failed' + status: 'pending' | 'processing' | 'completed' | 'failed' error: string | null file_names: string } @@ -65,6 +65,9 @@ export const documentApi = { if (params.server_url) { formData.append('server_url', params.server_url) } + if (taskId) { + formData.append('task_id', taskId) + } return request.post('/api/file_parse', formData, { headers: { @@ -80,6 +83,12 @@ export const documentApi = { }) }, + createParseTask(taskId: string): Promise { + return request.post(`/api/parse_tasks/${encodeURIComponent(taskId)}`).then(result => { + return result as unknown as ParseProgress + }) + }, + /** * 查询解析进度 */ @@ -88,4 +97,4 @@ export const documentApi = { return result as unknown as ParseProgress }) } -} \ No newline at end of file +} diff --git a/web_ui/src/composables/useDocumentProcessor.ts b/web_ui/src/composables/useDocumentProcessor.ts index c29f22c..ec22cc3 100644 --- a/web_ui/src/composables/useDocumentProcessor.ts +++ b/web_ui/src/composables/useDocumentProcessor.ts @@ -200,9 +200,10 @@ export function useDocumentProcessor() { // 生成任务ID const taskId = `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` - startProgressPolling(taskId) try { + await documentApi.createParseTask(taskId) + startProgressPolling(taskId) processingStage.value = '提交文档到解析服务' const params: ParseParams = { files: uploadedFiles.value, diff --git a/web_ui/vite.config.ts b/web_ui/vite.config.ts index 70d39ee..d697784 100644 --- a/web_ui/vite.config.ts +++ b/web_ui/vite.config.ts @@ -13,7 +13,7 @@ export default defineConfig({ port: 3000, proxy: { '/api': { - target: 'http://localhost:8000', + target: 'http://10.100.52.76:8000', changeOrigin: true } }