跳转到主要内容

概览

Whoop Provider 展示了带 Token 刷新的 OAuth 2.0 身份验证实现,以及全面的恢复 (Recovery) 和表现 (Performance) 数据集成。
本示例展示了带自动 Token 刷新的 OAuth 2.0 实现,非常适合学习现代 OAuth 模式。

核心特性

OAuth 2.0

带刷新 Token 的现代 OAuth 模式

恢复数据

HRV、静息心率、恢复分数

睡眠分析

详细的睡眠阶段和质量分析

运动数据

带强度分区的锻炼会话数据

Provider 结构

from mirobody.pulse.theta.platform.base import BaseThetaProvider
import aiohttp

class ThetaWhoopProvider(BaseThetaProvider):
    """Whoop OAuth 2.0 Provider"""
    
    def __init__(self):
        super().__init__()
        
        # OAuth 2.0 配置
        self.client_id = safe_read_cfg("WHOOP_CLIENT_ID")
        self.client_secret = safe_read_cfg("WHOOP_CLIENT_SECRET")
        self.redirect_url = safe_read_cfg("WHOOP_REDIRECT_URL")
        
        # API 端点
        self.auth_url = "https://api.prod.whoop.com/oauth/oauth2/auth"
        self.token_url = "https://api.prod.whoop.com/oauth/oauth2/token"
        self.api_base_url = "https://api.prod.whoop.com/developer/v2"
        
        # 权限范围 (Scopes)
        self.scopes = "offline read:recovery read:sleep read:cycles read:workout"

数据映射

Whoop 提供了详细的恢复和表现指标:
WHOOP_INDICATOR_MAPPING = {
    "sleep": {
        "score.stage_summary.total_in_bed_time_milli": (
            StandardIndicator.SLEEP_IN_BED.value.name,
            lambda x: x,
            "ms"
        ),
        "score.stage_summary.total_light_sleep_time_milli": (
            StandardIndicator.SLEEP_ANALYSIS_ASLEEP_CORE.value.name,
            lambda x: x,
            "ms"
        ),
        "score.stage_summary.total_slow_wave_sleep_time_milli": (
            StandardIndicator.SLEEP_ANALYSIS_ASLEEP_DEEP.value.name,
            lambda x: x,
            "ms"
        ),
        "score.sleep_efficiency_percentage": (
            StandardIndicator.SLEEP_EFFICIENCY.value.name,
            lambda x: x,
            "%"
        )
    },
    "recovery": {
        "score.hrv_rmssd_milli": (
            StandardIndicator.HRV.value.name,
            lambda x: x,
            "ms"
        ),
        "score.resting_heart_rate": (
            StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
            lambda x: x,
            "count/min"
        ),
        "score.respiratory_rate": (
            StandardIndicator.RESPIRATORY_RATE.value.name,
            lambda x: x,
            "breaths/min"
        )
    }
}

OAuth 2.0 实现

发起绑定

async def link(self, request: LinkRequest) -> Dict[str, Any]:
    """发起 OAuth 2.0 流程"""
    user_id = request.user_id
    
    # 生成用于 CSRF 保护的 state
    state_payload = {"s": str(uuid.uuid4())}
    state = urlencode(state_payload)
    
    # 存储在 Redis 中
    redis_client = await self._get_redis_client()
    await redis_client.setex(f"oauth2:state:{state}", 900, user_id)
    await redis_client.setex(f"oauth2:redir:{state}", 900, self.redirect_url)
    await redis_client.aclose()
    
    # 构建授权 URL
    params = {
        "client_id": self.client_id,
        "response_type": "code",
        "redirect_uri": self.redirect_url,
        "scope": self.scopes,
        "state": state,
    }
    authorization_url = f"{self.auth_url}?{urlencode(params)}"
    
    return {"link_web_url": authorization_url}

处理回调

async def callback(self, code: str, state: str) -> Dict[str, Any]:
    """处理 OAuth 2.0 回调"""
    
    # 验证 state
    redis_client = await self._get_redis_client()
    user_id = await redis_client.get(f"oauth2:state:{state}")
    redirect_uri = await redis_client.get(f"oauth2:redir:{state}")
    await redis_client.delete(f"oauth2:state:{state}")
    await redis_client.delete(f"oauth2:redir:{state}")
    await redis_client.aclose()
    
    if not user_id:
        raise ValueError("无效或已过期的 state")
    
    # 使用 code 交换 token
    async with aiohttp.ClientSession() as session:
        data = {
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": redirect_uri,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }
        
        async with session.post(self.token_url, data=data) as resp:
            if resp.status != 200:
                raise RuntimeError(f"Token 交换失败: {await resp.text()}")
            token_data = await resp.json()
    
    # 保存带有过期时间的凭据
    access_token = token_data["access_token"]
    refresh_token = token_data.get("refresh_token")
    expires_in = token_data.get("expires_in")
    expires_at = int(time.time()) + int(expires_in) if expires_in else None
    
    await self.db_service.save_oauth2_credentials(
        user_id, self.info.slug, access_token, refresh_token, expires_at
    )
    
    # 触发数据同步
    asyncio.create_task(self._pull_and_push_for_user({
        "user_id": user_id,
        "access_token": access_token,
        "refresh_token": refresh_token,
    }))
    
    return {"provider_slug": self.info.slug, "stage": "completed"}

Token 刷新

自动 Token 刷新确保持续的访问:
async def get_valid_access_token(self, user_id: str) -> Optional[str]:
    """获取有效的 access token,必要时进行刷新"""
    
    # 获取存储的凭据
    creds = await self.db_service.get_user_credentials(
        user_id, self.info.slug, LinkType.OAUTH2
    )
    
    if not creds:
        return None
    
    access_token = creds["access_token"]
    expires_at = creds.get("expires_at")
    
    # 检查是否仍然有效
    if expires_at and time.time() < expires_at:
        return access_token
    
    # Token 已过期,刷新它
    refresh_token = creds.get("refresh_token")
    if not refresh_token:
        return None
    
    async with aiohttp.ClientSession() as session:
        data = {
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }
        
        async with session.post(self.token_url, data=data) as resp:
            if resp.status != 200:
                # 刷新失败,移除无效凭据
                await self.db_service.delete_user_theta_provider(
                    user_id, self.info.slug
                )
                return None
            
            token_data = await resp.json()
    
    # 更新凭据
    new_access_token = token_data["access_token"]
    new_refresh_token = token_data.get("refresh_token", refresh_token)
    new_expires_in = token_data.get("expires_in")
    new_expires_at = int(time.time()) + int(new_expires_in) if new_expires_in else None
    
    await self.db_service.save_oauth2_credentials(
        user_id, self.info.slug, new_access_token, new_refresh_token, new_expires_at
    )
    
    return new_access_token

带分页的数据获取

async def pull_from_vendor_api(
    self,
    access_token: str,
    refresh_token: Optional[str] = None,
    days: int = 2
) -> List[Dict[str, Any]]:
    """从 Whoop API 分页获取数据"""
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json"
    }
    
    end_date = datetime.now(timezone.utc)
    start_date = end_date - timedelta(days=days)
    
    all_raw_data = []
    
    async with aiohttp.ClientSession() as session:
        endpoints = {
            "sleep": f"{self.api_base_url}/activity/sleep",
            "recovery": f"{self.api_base_url}/recovery",
            "workout": f"{self.api_base_url}/activity/workout",
            "cycle": f"{self.api_base_url}/cycle"
        }
        
        for data_type, endpoint in endpoints.items():
            try:
                params = {
                    "start": start_date.isoformat(),
                    "end": end_date.isoformat(),
                    "limit": 25
                }
                
                # 处理分页
                data = await self._fetch_paginated_data(
                    session, endpoint, headers, params
                )
                
                if data:
                    all_raw_data.append({
                        "data_type": data_type,
                        "data": data,
                        "timestamp": int(time.time() * 1000)
                    })
            except Exception as e:
                logging.error(f"获取 {data_type} 出错: {str(e)}")
                continue
    
    return all_raw_data

支持的数据类型

  • 总在床时间
  • 浅睡时间
  • 深睡时间 (Slow-wave)
  • REM 睡眠时间
  • 睡眠效率
  • 睡眠中断情况

源代码

完整的 Whoop Provider 源代码位于:
connect/theta/mirobody_whoop/provider_whoop.py

与 OAuth 1.0 的主要区别

OAuth 2.0 使用 Bearer Token,OAuth 1.0 使用签名请求
OAuth 2.0 Token 会过期并需要刷新,OAuth 1.0 Token 通常不会过期
OAuth 2.0 更简单 - 无需计算复杂的签名
OAuth 2.0 内置了 Token 刷新机制,OAuth 1.0 通常需要重新授权

下一步