跳转到主要内容

概览

该模块提供全面的健康数据文件处理能力,包括文件上传、健康指标提取和文件删除。系统采用模块化设计,支持多种文件类型,并提供实时进度反馈。

特性

  • 多种上传方式: WebSocket 实时上传和 REST API 批量上传
  • 实时进度反馈: WebSocket 连接提供实时进度更新
  • 智能文件识别: 自动识别文件类型并选择合适的处理器 (Handler)
  • 健康指标提取: 使用 LLM 自动提取健康指标数据
  • 多格式支持: PDF、图片、音频、基因数据等
  • PDF 并行处理: 多页 PDF 并行处理以提高效率
  • 文件摘要生成: 自动生成文件内容摘要
  • 级联删除: 删除文件时自动清理关联的健康数据

支持的文件类型

文件类型MIME 类型处理器 (Handler)描述
PDFapplication/pdfPDFHandler多页并行处理,自动提取健康指标
图片 (Images)image/*ImageHandler支持 JPEG, PNG, GIF, WebP;识别健康报告并提取指标
音频 (Audio)audio/*AudioHandler语音转文字,用于提取口头健康信息
基因数据特定格式GeneticHandler基因检测报告解析

文件上传方式

WebSocket 上传 (推荐)

WebSocket 上传提供实时进度反馈,是上传大文件和需要实时状态更新场景的理想选择。

端点 (Endpoint)

ws://<host>/ws/upload-health-report?token=<auth_token>

连接流程

消息类型

1. 上传开始 (upload_start)
客户端发送:
{
    "type": "upload_start",
    "messageId": "unique-message-id",
    "sessionId": "session-id",
    "query": "用户备注",
    "isFirstMessage": false,
    "query_user_id": "目标用户-id",
    "files": [
        {
            "filename": "report.pdf",
            "contentType": "application/pdf",
            "size": 1024000
        }
    ]
}
字段类型必填描述
typestring必须为 “upload_start”
messageIdstring唯一消息 ID,未提供时自动生成
sessionIdstring会话 ID
querystring用户备注或查询文本
isFirstMessageboolean是否为新会话的第一条消息
query_user_idstring代理上传的目标用户 ID
filesarray文件元数据数组
服务端响应:
{
    "type": "upload_start",
    "messageId": "generated-message-id",
    "sessionId": "session-id",
    "status": "uploading",
    "progress": 0,
    "message": "Ready to receive 1 files",
    "files": [...]
}
2. 上传分块 (upload_chunk)
客户端发送:
{
    "type": "upload_chunk",
    "messageId": "message-id",
    "filename": "report.pdf",
    "chunk": "<base64-encoded-data>",
    "chunkIndex": 0,
    "totalChunks": 10,
    "contentType": "application/pdf",
    "fileSize": 1024000
}
服务端响应:
{
    "type": "file_progress",
    "messageId": "message-id",
    "filename": "report.pdf",
    "progress": 10.0,
    "status": "uploading",
    "message": "Uploading report.pdf: 10.0%"
}
3. 文件接收完成 (file_received)
{
    "type": "file_received",
    "messageId": "message-id",
    "filename": "report.pdf",
    "status": "received",
    "message": "File report.pdf received successfully",
    "size": 1024000
}
4. 处理进度 (upload_progress)
{
    "type": "upload_progress",
    "messageId": "message-id",
    "status": "processing",
    "progress": 65,
    "message": "开始分析 report.pdf 中的健康指标...",
    "filename": "report.pdf",
    "timestamp": "2024-01-15T10:30:00.000Z"
}
5. 上传完成 (upload_completed)
{
    "type": "upload_completed",
    "messageId": "message-id",
    "status": "completed",
    "progress": 100,
    "message": "处理完成:1 个文件成功",
    "successful_files": 1,
    "failed_files": 0,
    "total_files": 1,
    "results": {
        "success": true,
        "message": "文件处理完成",
        "type": "pdf",
        "url_thumb": ["https://..."],
        "url_full": ["https://..."],
        "files": [
            {
                "filename": "report.pdf",
                "type": "pdf",
                "url_thumb": "https://...",
                "url_full": "https://...",
                "file_key": "uploads/xxx.pdf",
                "file_size": 1024000,
                "file_abstract": "健康报告摘要...",
                "file_name": "2024 年度健康报告",
                "success": true
            }
        ]
    }
}
6. 心跳 (ping/pong)
客户端发送:
{
    "type": "ping"
}
服务端响应:
{
    "type": "pong",
    "timestamp": "2024-01-15T10:30:00.000Z"
}

超时机制

类型时长描述
空闲超时5 分钟无活动时自动断开
上传超时30 分钟在活动上传期间延长超时时间
心跳30 秒推荐的 ping 间隔

REST API 上传

REST API 提供了一种简单的文件上传方法,适用于简单的场景或不需要实时进度的应用程序。

端点 (Endpoint)

POST /files/upload
Content-Type: multipart/form-data
Authorization: Bearer <token>

请求参数

参数类型必填描述
filesFile[]要上传的文件列表
folderstring自定义文件夹前缀,默认为 ‘uploads’

响应示例

{
    "code": 0,
    "msg": "所有 2 个文件上传成功",
    "data": [
        {
            "file_url": "https://storage.example.com/uploads/20240115_xxx.pdf",
            "file_name": "report.pdf",
            "file_key": "uploads/20240115_xxx.pdf",
            "file_size": 1024000,
            "file_type": "application/pdf",
            "upload_time": "2024-01-15T10:30:00.000Z",
            "duration": null
        }
    ]
}

响应码

代码描述
0所有上传成功
1部分或全部失败

处理架构

概览

┌───────────────────────────────────────────────────────────────┐
│                         文件上传                              │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐        │
│  │  WebSocket  │    │  REST API   │    │    直接     │        │
│  │    上传     │    │    上传     │    │    上传     │        │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘        │
│         │                  │                  │                │
│         └──────────────────┼──────────────────┘                │
│                            ▼                                   │
│                   ┌─────────────────┐                          │
│                   │  FileProcessor  │                          │
│                   └────────┬────────┘                          │
│                            │                                   │
│         ┌──────────────────┼──────────────────┐                │
│         ▼                  ▼                  ▼                │
│  ┌─────────────┐   ┌─────────────┐   ┌─────────────┐          │
│  │ PDFHandler  │   │ImageHandler │   │AudioHandler │   ...    │
│  └──────┬──────┘   └──────┬──────┘   └──────┬──────┘          │
│         │                  │                  │                │
│         └──────────────────┼──────────────────┘                │
│                            ▼                                   │
│                  ┌───────────────────┐                         │
│                  │IndicatorExtractor │                         │
│                  └─────────┬─────────┘                         │
│                            ▼                                   │
│                   ┌─────────────────┐                          │
│                   │    数据库       │                          │
│                   │  (th_messages,  │                          │
│                   │  th_series_data)│                          │
│                   └─────────────────┘                          │
│└───────────────────────────────────────────────────────────────┘

处理步骤

1. 文件上传阶段 (0-30%)

  • 接收文件数据
  • 验证文件类型和大小
  • 生成唯一文件标识符
  • 上传到对象存储 (S3/OSS/MinIO)

2. 文件类型识别 (30-35%)

系统通过 FileHandlerFactory 自动识别文件类型:
# 处理器选择优先级
1. GeneticHandler  - 基因数据文件
2. ImageHandler    - 图片文件 (image/*)
3. PDFHandler      - PDF 文档
4. AudioHandler    - 音频文件 (audio/*)

3. 内容处理阶段 (35-90%)

PDF 文件处理:
单页/少量页面 PDF:
  35-65%: 文件上传并保存
  65-90%: LLM 健康指标提取

多页 PDF (>2 页):
  35-50%: 文件上传并拆分 PDF
  50-70%: 并行处理各个页面
  70-90%: 结果合并并去重
图片文件处理:
35-55%: 图片上传
55-90%: 识别 + 指标提取

4. 摘要生成阶段 (90-95%)

  • 使用 LLM 生成文件内容摘要
  • 生成智能文件名

5. 结果保存阶段 (95-100%)

  • 将处理结果保存到数据库
  • 将健康指标同步到 th_series_data
  • 更新用户健康个人资料 (User Health Profile)

健康指标提取

配置

config.yaml
ENABLE_INDICATOR_EXTRACTION: 1  # 设置为 1 以启用指标提取

支持的指标类型

  • 血常规 (白细胞, 红细胞, 血红蛋白等)
  • 生化 (肝功能, 肾功能, 血脂等)
  • 体检 (血压, 心率, 体重等)
  • 肿瘤标志物
  • 甲状腺功能
  • 其他医疗检测指标

提取结果格式

{
    "indicators": [
        {
            "original_indicator": "白细胞计数",
            "value": "6.5",
            "unit": "10^9/L",
            "reference_range": "4.0-10.0",
            "status": "normal"
        },
        {
            "original_indicator": "血红蛋白",
            "value": "145",
            "unit": "g/L",
            "reference_range": "120-160",
            "status": "normal"
        }
    ],
    "content_info": {
        "date_time": "2024-01-15",
        "institution": "XX 医院"
    },
    "file_abstract": "2024 年 1 月健康报告,包括血常规、生化全项..."
}

数据存储

提取的指标数据存储在 th_series_data 表中:
字段描述
user_id用户 ID
indicator_id指标 ID (链接到指标维度表)
value指标值
unit计量单位
source_table来源表名
source_table_id来源记录 ID (message_id + file_key)
recorded_at检测日期/时间

文件删除

端点 (Endpoint)

POST /api/v1/data/delete-files
Content-Type: application/json
Authorization: Bearer <token>

请求体

{
    "message_id": "message-uuid",
    "file_keys": ["uploads/xxx.pdf", "uploads/yyy.jpg"]
}
参数类型必填描述
message_idstring消息 ID
file_keysstring[]要删除的文件密钥列表;如果为空,则删除所有文件

响应示例

{
    "code": 0,
    "msg": "成功删除 2 个文件",
    "data": {
        "success": true,
        "message_id": "message-uuid",
        "deleted_files": [
            {
                "file_key": "uploads/xxx.pdf",
                "filename": "report.pdf",
                "type": "pdf",
                "status": "deleted"
            }
        ],
        "failed_deletions": [],
        "remaining_files_count": 0,
        "message_deleted": true
    }
}

级联删除

删除文件时,系统会自动执行级联删除:
  1. 存储删除: 从对象存储 (S3/OSS) 中删除文件
  2. 数据库更新: 更新 th_messages 表中的文件列表
  3. 健康数据清理: 从 th_series_data 中删除关联的健康指标
  4. 基因数据清理: 如果是基因文件,则从 th_genetic_data 中删除数据
  5. 消息标记: 如果所有文件均已删除,则将消息标记为已删除

API 参考

文件服务端点

方法端点描述
WS/ws/upload-health-reportWebSocket 文件上传
POST/files/uploadREST API 文件上传
POST/api/v1/data/delete-files删除文件
GET/files/{file_path}获取文件内容 (代理访问)

身份验证

所有端点都需要有效的身份验证令牌:
  • REST API: 使用 Authorization: Bearer <token> 标头
  • WebSocket: 通过 URL 参数 ?token=<token> 传递

数据模型

FileUploadData

interface FileUploadData {
    file_url: string;      // 文件访问 URL
    file_name: string;     // 原始文件名
    file_key: string;      // 存储密钥
    file_size: number;     // 文件大小 (字节)
    file_type: string;     // MIME 类型
    upload_time: string;   // 上传时间戳
    duration?: number;     // 音频时长 (毫秒,仅限音频文件)
}

FileDeleteRequest

interface FileDeleteRequest {
    message_id: string;        // 消息 ID
    file_keys?: string[];      // 要删除的文件密钥列表
}

FileProcessingResult

interface FileProcessingResult {
    success: boolean;
    message: string;
    type: string;              // 文件类型:pdf, image, audio 等
    filename: string;          // 原始文件名
    full_url: string;          // 完整访问 URL
    file_key: string;          // 存储密钥
    file_abstract: string;     // 文件摘要
    file_name: string;         // AI 生成的文件名
    raw?: string;              // 从提取的数据生成的格式化 Markdown 内容
}

错误处理

WebSocket 错误消息

{
    "type": "upload_error",
    "messageId": "message-id",
    "status": "failed",
    "message": "上传开始失败:<error_details>"
}

REST API 错误响应

{
    "code": 1,
    "msg": "文件上传失败:<error_details>",
    "data": null
}

常见错误

错误类型描述解决方案
Invalid token令牌无效或已过期获取新令牌
File type not supported不支持的文件类型使用支持的文件格式
File is empty文件内容为空检查文件内容
Upload session not found上传会话不存在重新开始上传
Permission denied无操作权限检查用户权限

超时处理

WebSocket 连接在超时时会收到通知:
{
    "type": "connection_timeout",
    "message": "Connection closed due to 5 minutes of inactivity",
    "idle_seconds": 300,
    "timeout_type": "idle",
    "active_uploads_count": 0
}

最佳实践

大文件上传

  • 使用带有分块传输的 WebSocket 上传
  • 推荐分块大小:1MB
  • 实现断点续传机制

批量上传

  • 每次上传限制在 10 个文件以内
  • 总文件大小不应超过 100MB

进度监控

  • 在 WebSocket 上传期间监听 upload_progress 消息
  • 处理 file_progress 以显示单个文件的进度

错误处理

  • 实现重试机制 (推荐:最多 3 次重试)
  • 捕获并显示对用户友好的错误消息

连接保活

  • WebSocket 连接每 30 秒发送一次 ping
  • 处理 pong 响应以确认连接状态

代码示例

Python REST API 上传

import requests
from pathlib import Path

def upload_files(file_paths: list, token: str, folder: str = None) -> dict:
    """上传文件到服务器。"""
    url = "http://localhost:18080/files/upload"
    headers = {"Authorization": f"Bearer {token}"}
    
    files = []
    for file_path in file_paths:
        path = Path(file_path)
        files.append(('files', (path.name, open(path, 'rb'))))
    
    params = {}
    if folder:
        params['folder'] = folder
    
    try:
        response = requests.post(url, headers=headers, files=files, params=params)
        return response.json()
    finally:
        for _, file_tuple in files:
            file_tuple[1].close()

# 用法
result = upload_files(
    file_paths=["./report.pdf", "./lab_results.jpg"],
    token="您的身份验证令牌",
    folder="health-reports"
)

文件删除

def delete_files(message_id: str, file_keys: list, token: str) -> dict:
    """从消息中删除文件。"""
    url = "http://localhost:18080/api/v1/data/delete-files"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "message_id": message_id,
        "file_keys": file_keys
    }
    
    response = requests.post(url, headers=headers, json=payload)
    return response.json()

下一步