跳转到主要内容

系统架构

Mirobody Health 采用模块化、分层架构,将关注点分离,并支持轻松扩展。

核心组件

API 层

基于 FastAPI 的 REST API,用于对外集成

提供商系统

面向健康设备的可插拔 OAuth 集成

数据流水线

健康数据转换的 ETL 流水线

AI 服务

面向对话的多 LLM 提供商集成

MCP 协议

面向 AI agent 的 Model Context Protocol

存储层

PostgreSQL 与 Redis 用于数据持久化

架构分层

1. API 层

API 层为系统交互提供 RESTful 端点(endpoints):
HTTP Server (FastAPI)
├── /api/v1/pulse/          # 健康提供商端点
├── /api/chat               # AI 聊天接口
├── /api/history            # 会话历史
└── /mcp                    # MCP 协议端点
核心特性:
  • 使用 FastAPI 进行异步请求处理
  • 自动生成 API 文档(OpenAPI/Swagger)
  • 为 Web 客户端提供 CORS 支持
  • 使用 Pydantic 模型进行请求校验
  • 基于 JWT 的身份验证

2. 提供商系统

面向健康设备集成的可插拔架构: 组件:
  • BaseThetaProvider: 所有提供商的抽象基类
  • ThetaDataFormatter: 标准数据转换工具
  • ThetaTimeUtils: 时区和时间戳处理
  • 提供商特定实现(Garmin, Whoop 等)

3. 数据流水线 (TBD)

健康数据的 ETL(提取、转换、加载)流水线:
┌─────────────┐
│  Pull Data  │ ─┐
└─────────────┘  │

┌─────────────┐  │  ┌──────────────┐
│  Save Raw   │ ◄┴─▶│ Push Service │
└─────────────┘     └──────────────┘
        │                   │
        ▼                   ▼
┌─────────────┐    ┌──────────────┐
│  Format     │    │  Upload      │
└─────────────┘    └──────────────┘
        │                   │
        ▼                   ▼
┌─────────────┐    ┌──────────────┐
│ Standardize │    │  Platform    │
└─────────────┘    └──────────────┘
流程:
  1. Pull (拉取):从厂商 API 获取数据
  2. Save (保存):将原始数据存入数据库
  3. Format (格式化):转换为标准格式
  4. Upload (上传):推送到 Mirobody 平台

4. AI 服务

面向智能健康洞察的多 LLM 提供商集成:
AI Services
├── OpenRouter (多模型)
├── OpenAI 
├── Google
├── Anthropic
└── 本地 LLM (即将推出)
特性:
  • 与提供商无关的接口
  • 对话上下文管理
  • 用于健康数据访问的工具调用 (Tool calling)
  • 会话历史持久化

5. MCP 协议

支持通过 Model Context Protocol 集成 AI agent:
MCP Server
├── Tools/        # 可用工具
│   ├── get_user_health_profile
│   ├── get_health_indicator
│   ├── genetic_service
│   └── 您的自定义工具
├── Resources/    # 可用资源
能力:
  • JSON-RPC 2.0 接口
  • 工具发现与执行
  • 资源管理
  • Agent 编排

6. 存储层

面向不同数据类型的双存储系统:
用于:
  • 用户凭据(加密)
  • OAuth 令牌与刷新令牌
  • 健康原始数据(JSONB)
  • 提供商配置
  • 会话历史

数据流

OAuth 身份验证流程

健康数据同步

设计模式

插件架构

提供商是动态发现并加载的:
# 提供商发现
THETA_PROVIDER_DIRS = ["connect/theta"]

# 自动加载
for directory in THETA_PROVIDER_DIRS:
    providers = discover_providers(directory)
    for provider_class in providers:
        provider = provider_class.create_provider(config)
        if provider:
            register_provider(provider)

工厂模式

提供商使用工厂方法进行有条件的实例化:
@classmethod
def create_provider(cls, config: Dict) -> Optional['Provider']:
    """工厂方法 - 如果配置无效则返回 None"""
    if not cls._validate_config(config):
        return None
    return cls()

策略模式

针对 OAuth 1.0 vs 2.0 的不同 OAuth 策略:
class BaseThetaProvider:
    def link(self, request) -> Dict:
        """由 OAuth1 或 OAuth2 策略实现"""
        pass
    
    def callback(self, *args, **kwargs) -> Dict:
        """由 OAuth1 或 OAuth2 策略实现"""
        pass

仓库模式

通过服务抽象数据库操作:
class DatabaseService:
    async def save_oauth2_credentials(self, ...): pass
    async def get_user_credentials(self, ...): pass
    async def delete_user_theta_provider(self, ...): pass

安全架构

  • 基于 JWT 的 API 身份验证
  • 用于提供商访问的 OAuth 1.0/2.0
  • 加密存储凭据
  • 按用户隔离的提供商访问控制
  • 数据库级别加密
  • 加密 OAuth 令牌
  • 所有外部通信使用 HTTPS
  • 敏感数据绝不记录到日志
  • 用户范围的数据访问
  • 提供商特定的权限
  • API 速率限制
  • CORS 策略
  • 外部 API 使用 TLS/SSL
  • 针对 OAuth CSRF 保护的 state 参数
  • 令牌刷新机制
  • 安全的会话管理

可扩展性考虑

水平扩展

系统支持水平扩展:

无状态 API

API 服务器是无状态的,可在负载均衡器后实现水平扩展

分布式缓存

Redis 支持集群,用于分布式缓存

数据库连接池

连接池支持多个 API 实例并发访问

异步处理

Async/await 提升了每个实例的并发处理能力

部署架构

Docker Compose 架构

services:
  backend:
    image: mirobody-backend
    ports: ["18080:18080"]
    depends_on: [db, redis]
    
  db:
    image: postgres:15
    volumes: ["pgdata:/var/lib/postgresql/data"]
    
  redis:
    image: redis:7
    volumes: ["redis-data:/data"]

volumes:
  pgdata:
  redis-data:

监控与可观测性

日志

整个系统使用结构化日志:
# 日志级别
LOG_LEVEL: INFO  # DEBUG, INFO, WARNING, ERROR, CRITICAL

# 上下文日志
logging.info(
    "Data pull completed",
    extra={
        "user_id": user_id,
        "provider": provider_slug,
        "records": len(data),
        "duration_ms": duration
    }
)

指标

追踪关键性能指标:
  • 请求延迟
  • 提供商成功率
  • 数据处理吞吐量
  • 按提供商分类的错误率
  • 令牌刷新率

健康检查

GET /health
{
  "status": "healthy",
  "version": "1.0.1",
  "services": {
    "database": "connected",
    "redis": "connected"
  }
}

技术栈

  • Python 3.12+: 核心语言
  • FastAPI: Web 框架
  • asyncio: 异步 I/O
  • Pydantic: 数据校验
  • SQLAlchemy: 数据库 ORM

扩展点

该架构提供了多个扩展点:
新增健康设备集成请参见开发
  1. 请参阅添加自定义工具以为 AI agent 添加新工具
  2. 请参阅添加自定义 MCP以为 AI agent 添加新 MCP
添加新的健康指标:
  1. StandardIndicator 枚举中定义
  2. 指定标准单位
  3. 更新提供商映射
添加新的 LLM 提供商:
  1. 实现提供商接口
  2. 添加配置
  3. 在 agent 系统中注册
  4. 部署

最佳实践

关注点分离

每一层只负责单一职责

依赖注入

通过注入使用服务,而非硬编码

面向接口设计

面向接口编程,而非面向实现

配置驱动

行为通过配置控制

异步优先

所有 I/O 操作使用 Async/await

错误处理

优雅降级与恢复

下一步