概览
Whoop Provider 展示了带 Token 刷新的 OAuth 2.0 身份验证实现,以及全面的恢复 (Recovery) 和表现 (Performance) 数据集成。本示例展示了带自动 Token 刷新的 OAuth 2.0 实现,非常适合学习现代 OAuth 模式。
核心特性
OAuth 2.0
带刷新 Token 的现代 OAuth 模式
恢复数据
HRV、静息心率、恢复分数
睡眠分析
详细的睡眠阶段和质量分析
运动数据
带强度分区的锻炼会话数据
Provider 结构
复制
from mirobody.pulse.theta.platform.base import BaseThetaProvider
import aiohttp
class ThetaWhoopProvider(BaseThetaProvider):
"""Whoop OAuth 2.0 Provider"""
def __init__(self):
super().__init__()
# OAuth 2.0 配置
self.client_id = safe_read_cfg("WHOOP_CLIENT_ID")
self.client_secret = safe_read_cfg("WHOOP_CLIENT_SECRET")
self.redirect_url = safe_read_cfg("WHOOP_REDIRECT_URL")
# API 端点
self.auth_url = "https://api.prod.whoop.com/oauth/oauth2/auth"
self.token_url = "https://api.prod.whoop.com/oauth/oauth2/token"
self.api_base_url = "https://api.prod.whoop.com/developer/v2"
# 权限范围 (Scopes)
self.scopes = "offline read:recovery read:sleep read:cycles read:workout"
数据映射
Whoop 提供了详细的恢复和表现指标:复制
WHOOP_INDICATOR_MAPPING = {
"sleep": {
"score.stage_summary.total_in_bed_time_milli": (
StandardIndicator.SLEEP_IN_BED.value.name,
lambda x: x,
"ms"
),
"score.stage_summary.total_light_sleep_time_milli": (
StandardIndicator.SLEEP_ANALYSIS_ASLEEP_CORE.value.name,
lambda x: x,
"ms"
),
"score.stage_summary.total_slow_wave_sleep_time_milli": (
StandardIndicator.SLEEP_ANALYSIS_ASLEEP_DEEP.value.name,
lambda x: x,
"ms"
),
"score.sleep_efficiency_percentage": (
StandardIndicator.SLEEP_EFFICIENCY.value.name,
lambda x: x,
"%"
)
},
"recovery": {
"score.hrv_rmssd_milli": (
StandardIndicator.HRV.value.name,
lambda x: x,
"ms"
),
"score.resting_heart_rate": (
StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
lambda x: x,
"count/min"
),
"score.respiratory_rate": (
StandardIndicator.RESPIRATORY_RATE.value.name,
lambda x: x,
"breaths/min"
)
}
}
OAuth 2.0 实现
发起绑定
复制
async def link(self, request: LinkRequest) -> Dict[str, Any]:
"""发起 OAuth 2.0 流程"""
user_id = request.user_id
# 生成用于 CSRF 保护的 state
state_payload = {"s": str(uuid.uuid4())}
state = urlencode(state_payload)
# 存储在 Redis 中
redis_client = await self._get_redis_client()
await redis_client.setex(f"oauth2:state:{state}", 900, user_id)
await redis_client.setex(f"oauth2:redir:{state}", 900, self.redirect_url)
await redis_client.aclose()
# 构建授权 URL
params = {
"client_id": self.client_id,
"response_type": "code",
"redirect_uri": self.redirect_url,
"scope": self.scopes,
"state": state,
}
authorization_url = f"{self.auth_url}?{urlencode(params)}"
return {"link_web_url": authorization_url}
处理回调
复制
async def callback(self, code: str, state: str) -> Dict[str, Any]:
"""处理 OAuth 2.0 回调"""
# 验证 state
redis_client = await self._get_redis_client()
user_id = await redis_client.get(f"oauth2:state:{state}")
redirect_uri = await redis_client.get(f"oauth2:redir:{state}")
await redis_client.delete(f"oauth2:state:{state}")
await redis_client.delete(f"oauth2:redir:{state}")
await redis_client.aclose()
if not user_id:
raise ValueError("无效或已过期的 state")
# 使用 code 交换 token
async with aiohttp.ClientSession() as session:
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": self.client_id,
"client_secret": self.client_secret,
}
async with session.post(self.token_url, data=data) as resp:
if resp.status != 200:
raise RuntimeError(f"Token 交换失败: {await resp.text()}")
token_data = await resp.json()
# 保存带有过期时间的凭据
access_token = token_data["access_token"]
refresh_token = token_data.get("refresh_token")
expires_in = token_data.get("expires_in")
expires_at = int(time.time()) + int(expires_in) if expires_in else None
await self.db_service.save_oauth2_credentials(
user_id, self.info.slug, access_token, refresh_token, expires_at
)
# 触发数据同步
asyncio.create_task(self._pull_and_push_for_user({
"user_id": user_id,
"access_token": access_token,
"refresh_token": refresh_token,
}))
return {"provider_slug": self.info.slug, "stage": "completed"}
Token 刷新
自动 Token 刷新确保持续的访问:复制
async def get_valid_access_token(self, user_id: str) -> Optional[str]:
"""获取有效的 access token,必要时进行刷新"""
# 获取存储的凭据
creds = await self.db_service.get_user_credentials(
user_id, self.info.slug, LinkType.OAUTH2
)
if not creds:
return None
access_token = creds["access_token"]
expires_at = creds.get("expires_at")
# 检查是否仍然有效
if expires_at and time.time() < expires_at:
return access_token
# Token 已过期,刷新它
refresh_token = creds.get("refresh_token")
if not refresh_token:
return None
async with aiohttp.ClientSession() as session:
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
}
async with session.post(self.token_url, data=data) as resp:
if resp.status != 200:
# 刷新失败,移除无效凭据
await self.db_service.delete_user_theta_provider(
user_id, self.info.slug
)
return None
token_data = await resp.json()
# 更新凭据
new_access_token = token_data["access_token"]
new_refresh_token = token_data.get("refresh_token", refresh_token)
new_expires_in = token_data.get("expires_in")
new_expires_at = int(time.time()) + int(new_expires_in) if new_expires_in else None
await self.db_service.save_oauth2_credentials(
user_id, self.info.slug, new_access_token, new_refresh_token, new_expires_at
)
return new_access_token
带分页的数据获取
复制
async def pull_from_vendor_api(
self,
access_token: str,
refresh_token: Optional[str] = None,
days: int = 2
) -> List[Dict[str, Any]]:
"""从 Whoop API 分页获取数据"""
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
all_raw_data = []
async with aiohttp.ClientSession() as session:
endpoints = {
"sleep": f"{self.api_base_url}/activity/sleep",
"recovery": f"{self.api_base_url}/recovery",
"workout": f"{self.api_base_url}/activity/workout",
"cycle": f"{self.api_base_url}/cycle"
}
for data_type, endpoint in endpoints.items():
try:
params = {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"limit": 25
}
# 处理分页
data = await self._fetch_paginated_data(
session, endpoint, headers, params
)
if data:
all_raw_data.append({
"data_type": data_type,
"data": data,
"timestamp": int(time.time() * 1000)
})
except Exception as e:
logging.error(f"获取 {data_type} 出错: {str(e)}")
continue
return all_raw_data
支持的数据类型
- 睡眠
- 恢复
- 周期 (Cycles)
- 锻炼
- 总在床时间
- 浅睡时间
- 深睡时间 (Slow-wave)
- REM 睡眠时间
- 睡眠效率
- 睡眠中断情况
- HRV (RMSSD)
- 静息心率
- 呼吸频率
- 血氧 (SpO2)
- 皮肤温度
- 恢复分数
- 压力分数 (Strain score)
- 千焦耳 (Kilojoules)
- 平均心率
- 最大心率
- 活动类型
- 持续时间
- 强度分区
- 卡路里消耗
- 心率统计
源代码
完整的 Whoop Provider 源代码位于:复制
connect/theta/mirobody_whoop/provider_whoop.py
与 OAuth 1.0 的主要区别
Token 类型
Token 类型
OAuth 2.0 使用 Bearer Token,OAuth 1.0 使用签名请求
Token 过期
Token 过期
OAuth 2.0 Token 会过期并需要刷新,OAuth 1.0 Token 通常不会过期
便捷性
便捷性
OAuth 2.0 更简单 - 无需计算复杂的签名
刷新流程
刷新流程
OAuth 2.0 内置了 Token 刷新机制,OAuth 1.0 通常需要重新授权