257 lines
9.0 KiB
Python
257 lines
9.0 KiB
Python
"""
|
||
MinerU Tianshu - Unified Startup Script
|
||
天枢统一启动脚本
|
||
|
||
一键启动所有服务:API Server + LitServe Workers + Task Scheduler
|
||
"""
|
||
import subprocess
|
||
import signal
|
||
import sys
|
||
import time
|
||
import os
|
||
from loguru import logger
|
||
from pathlib import Path
|
||
import argparse
|
||
|
||
|
||
class TianshuLauncher:
|
||
"""天枢服务启动器"""
|
||
|
||
def __init__(
|
||
self,
|
||
output_dir='/tmp/mineru_tianshu_output',
|
||
api_port=8000,
|
||
worker_port=9000,
|
||
workers_per_device=1,
|
||
devices='auto',
|
||
accelerator='auto'
|
||
):
|
||
self.output_dir = output_dir
|
||
self.api_port = api_port
|
||
self.worker_port = worker_port
|
||
self.workers_per_device = workers_per_device
|
||
self.devices = devices
|
||
self.accelerator = accelerator
|
||
self.processes = []
|
||
|
||
def start_services(self):
|
||
"""启动所有服务"""
|
||
logger.info("=" * 70)
|
||
logger.info("🚀 MinerU Tianshu - Starting All Services")
|
||
logger.info("=" * 70)
|
||
logger.info("天枢 - 企业级多GPU文档解析服务")
|
||
logger.info("")
|
||
|
||
try:
|
||
# 1. 启动 API Server
|
||
logger.info("📡 [1/3] Starting API Server...")
|
||
env = os.environ.copy()
|
||
env['API_PORT'] = str(self.api_port)
|
||
api_proc = subprocess.Popen(
|
||
[sys.executable, 'api_server.py'],
|
||
cwd=Path(__file__).parent,
|
||
env=env
|
||
)
|
||
self.processes.append(('API Server', api_proc))
|
||
time.sleep(3)
|
||
|
||
if api_proc.poll() is not None:
|
||
logger.error("❌ API Server failed to start!")
|
||
return False
|
||
|
||
logger.info(f" ✅ API Server started (PID: {api_proc.pid})")
|
||
logger.info(f" 📖 API Docs: http://localhost:{self.api_port}/docs")
|
||
logger.info("")
|
||
|
||
# 2. 启动 LitServe Worker Pool
|
||
logger.info("⚙️ [2/3] Starting LitServe Worker Pool...")
|
||
worker_cmd = [
|
||
sys.executable, 'litserve_worker.py',
|
||
'--output-dir', self.output_dir,
|
||
'--accelerator', self.accelerator,
|
||
'--workers-per-device', str(self.workers_per_device),
|
||
'--port', str(self.worker_port),
|
||
'--devices', str(self.devices) if isinstance(self.devices, str) else ','.join(map(str, self.devices))
|
||
]
|
||
|
||
worker_proc = subprocess.Popen(
|
||
worker_cmd,
|
||
cwd=Path(__file__).parent
|
||
)
|
||
self.processes.append(('LitServe Workers', worker_proc))
|
||
time.sleep(5)
|
||
|
||
if worker_proc.poll() is not None:
|
||
logger.error("❌ LitServe Workers failed to start!")
|
||
return False
|
||
|
||
logger.info(f" ✅ LitServe Workers started (PID: {worker_proc.pid})")
|
||
logger.info(f" 🔌 Worker Port: {self.worker_port}")
|
||
logger.info(f" 👷 Workers per Device: {self.workers_per_device}")
|
||
logger.info("")
|
||
|
||
# 3. 启动 Task Scheduler
|
||
logger.info("🔄 [3/3] Starting Task Scheduler...")
|
||
scheduler_cmd = [
|
||
sys.executable, 'task_scheduler.py',
|
||
'--litserve-url', f'http://localhost:{self.worker_port}/predict',
|
||
'--wait-for-workers'
|
||
]
|
||
|
||
scheduler_proc = subprocess.Popen(
|
||
scheduler_cmd,
|
||
cwd=Path(__file__).parent
|
||
)
|
||
self.processes.append(('Task Scheduler', scheduler_proc))
|
||
time.sleep(3)
|
||
|
||
if scheduler_proc.poll() is not None:
|
||
logger.error("❌ Task Scheduler failed to start!")
|
||
return False
|
||
|
||
logger.info(f" ✅ Task Scheduler started (PID: {scheduler_proc.pid})")
|
||
logger.info("")
|
||
|
||
# 启动成功
|
||
logger.info("=" * 70)
|
||
logger.info("✅ All Services Started Successfully!")
|
||
logger.info("=" * 70)
|
||
logger.info("")
|
||
logger.info("📚 Quick Start:")
|
||
logger.info(f" • API Documentation: http://localhost:{self.api_port}/docs")
|
||
logger.info(f" • Submit Task: POST http://localhost:{self.api_port}/api/v1/tasks/submit")
|
||
logger.info(f" • Query Status: GET http://localhost:{self.api_port}/api/v1/tasks/{{task_id}}")
|
||
logger.info(f" • Queue Stats: GET http://localhost:{self.api_port}/api/v1/queue/stats")
|
||
logger.info("")
|
||
logger.info("🔧 Service Details:")
|
||
for name, proc in self.processes:
|
||
logger.info(f" • {name:20s} PID: {proc.pid}")
|
||
logger.info("")
|
||
logger.info("⚠️ Press Ctrl+C to stop all services")
|
||
logger.info("=" * 70)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to start services: {e}")
|
||
self.stop_services()
|
||
return False
|
||
|
||
def stop_services(self, signum=None, frame=None):
|
||
"""停止所有服务"""
|
||
logger.info("")
|
||
logger.info("=" * 70)
|
||
logger.info("⏹️ Stopping All Services...")
|
||
logger.info("=" * 70)
|
||
|
||
for name, proc in self.processes:
|
||
if proc.poll() is None: # 进程仍在运行
|
||
logger.info(f" Stopping {name} (PID: {proc.pid})...")
|
||
proc.terminate()
|
||
|
||
# 等待所有进程结束
|
||
for name, proc in self.processes:
|
||
try:
|
||
proc.wait(timeout=10)
|
||
logger.info(f" ✅ {name} stopped")
|
||
except subprocess.TimeoutExpired:
|
||
logger.warning(f" ⚠️ {name} did not stop gracefully, forcing...")
|
||
proc.kill()
|
||
proc.wait()
|
||
|
||
logger.info("=" * 70)
|
||
logger.info("✅ All Services Stopped")
|
||
logger.info("=" * 70)
|
||
sys.exit(0)
|
||
|
||
def wait(self):
|
||
"""等待所有服务"""
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
|
||
# 检查进程状态
|
||
for name, proc in self.processes:
|
||
if proc.poll() is not None:
|
||
logger.error(f"❌ {name} unexpectedly stopped!")
|
||
self.stop_services()
|
||
return
|
||
|
||
except KeyboardInterrupt:
|
||
self.stop_services()
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(
|
||
description='MinerU Tianshu - 统一启动脚本',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
示例:
|
||
# 使用默认配置启动(自动检测GPU)
|
||
python start_all.py
|
||
|
||
# 使用CPU模式
|
||
python start_all.py --accelerator cpu
|
||
|
||
# 指定输出目录和端口
|
||
python start_all.py --output-dir /data/output --api-port 8080
|
||
|
||
# 每个GPU启动2个worker
|
||
python start_all.py --accelerator cuda --workers-per-device 2
|
||
|
||
# 只使用指定的GPU
|
||
python start_all.py --accelerator cuda --devices 0,1
|
||
"""
|
||
)
|
||
|
||
parser.add_argument('--output-dir', type=str, default='/tmp/mineru_tianshu_output',
|
||
help='输出目录 (默认: /tmp/mineru_tianshu_output)')
|
||
parser.add_argument('--api-port', type=int, default=8000,
|
||
help='API服务器端口 (默认: 8000)')
|
||
parser.add_argument('--worker-port', type=int, default=9000,
|
||
help='Worker服务器端口 (默认: 9000)')
|
||
parser.add_argument('--accelerator', type=str, default='auto',
|
||
choices=['auto', 'cuda', 'cpu', 'mps'],
|
||
help='加速器类型 (默认: auto,自动检测)')
|
||
parser.add_argument('--workers-per-device', type=int, default=1,
|
||
help='每个GPU的worker数量 (默认: 1)')
|
||
parser.add_argument('--devices', type=str, default='auto',
|
||
help='使用的GPU设备,逗号分隔 (默认: auto,使用所有GPU)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 处理 devices 参数
|
||
devices = args.devices
|
||
if devices != 'auto':
|
||
try:
|
||
devices = [int(d) for d in devices.split(',')]
|
||
except:
|
||
logger.warning(f"Invalid devices format: {devices}, using 'auto'")
|
||
devices = 'auto'
|
||
|
||
# 创建启动器
|
||
launcher = TianshuLauncher(
|
||
output_dir=args.output_dir,
|
||
api_port=args.api_port,
|
||
worker_port=args.worker_port,
|
||
workers_per_device=args.workers_per_device,
|
||
devices=devices,
|
||
accelerator=args.accelerator
|
||
)
|
||
|
||
# 设置信号处理
|
||
signal.signal(signal.SIGINT, launcher.stop_services)
|
||
signal.signal(signal.SIGTERM, launcher.stop_services)
|
||
|
||
# 启动服务
|
||
if launcher.start_services():
|
||
launcher.wait()
|
||
else:
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|