概览
Whoop Provider 展示了带 token refresh 的 OAuth 2.0 认证实现,以及恢复(recovery)与表现(performance)数据的完整集成方式。本示例展示带自动 token refresh 的 OAuth 2.0 实现,非常适合学习现代 OAuth 模式。
核心特性
OAuth 2.0
带 refresh tokens 的现代 OAuth
Recovery Data
HRV, resting HR, recovery scores
Sleep Analytics
详细的睡眠分期与质量分析
Workout Data
带强度分区的训练数据
Provider 结构
复制
from mirobody.pulse.theta.platform.base import BaseThetaProvider
import aiohttp
class ThetaWhoopProvider(BaseThetaProvider):
"""Whoop OAuth 2.0 Provider"""
def __init__(self):
super().__init__()
# OAuth 2.0 configuration
self.client_id = safe_read_cfg("WHOOP_CLIENT_ID")
self.client_secret = safe_read_cfg("WHOOP_CLIENT_SECRET")
self.redirect_url = safe_read_cfg("WHOOP_REDIRECT_URL")
# API endpoints
self.auth_url = "https://api.prod.whoop.com/oauth/oauth2/auth"
self.token_url = "https://api.prod.whoop.com/oauth/oauth2/token"
self.api_base_url = "https://api.prod.whoop.com/developer/v2"
# Scopes
self.scopes = "offline read:recovery read:sleep read:cycles read:workout"
数据映射
Whoop 提供了细粒度的恢复与表现指标:复制
WHOOP_INDICATOR_MAPPING = {
"sleep": {
"score.stage_summary.total_in_bed_time_milli": (
StandardIndicator.SLEEP_IN_BED.value.name,
lambda x: x,
"ms"
),
"score.stage_summary.total_light_sleep_time_milli": (
StandardIndicator.SLEEP_ANALYSIS_ASLEEP_CORE.value.name,
lambda x: x,
"ms"
),
"score.stage_summary.total_slow_wave_sleep_time_milli": (
StandardIndicator.SLEEP_ANALYSIS_ASLEEP_DEEP.value.name,
lambda x: x,
"ms"
),
"score.sleep_efficiency_percentage": (
StandardIndicator.SLEEP_EFFICIENCY.value.name,
lambda x: x,
"%"
)
},
"recovery": {
"score.hrv_rmssd_milli": (
StandardIndicator.HRV.value.name,
lambda x: x,
"ms"
),
"score.resting_heart_rate": (
StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
lambda x: x,
"count/min"
),
"score.respiratory_rate": (
StandardIndicator.RESPIRATORY_RATE.value.name,
lambda x: x,
"breaths/min"
)
}
}
OAuth 2.0 实现
发起链接(Link)
复制
async def link(self, request: LinkRequest) -> Dict[str, Any]:
"""Initiate OAuth 2.0 flow"""
user_id = request.user_id
# Generate state for CSRF protection
state_payload = {"s": str(uuid.uuid4())}
state = urlencode(state_payload)
# Store in Redis
redis_client = await self._get_redis_client()
await redis_client.setex(f"oauth2:state:{state}", 900, user_id)
await redis_client.setex(f"oauth2:redir:{state}", 900, self.redirect_url)
await redis_client.aclose()
# Build authorization URL
params = {
"client_id": self.client_id,
"response_type": "code",
"redirect_uri": self.redirect_url,
"scope": self.scopes,
"state": state,
}
authorization_url = f"{self.auth_url}?{urlencode(params)}"
return {"link_web_url": authorization_url}
处理回调(Callback)
复制
async def callback(self, code: str, state: str) -> Dict[str, Any]:
"""Handle OAuth 2.0 callback"""
# Validate state
redis_client = await self._get_redis_client()
user_id = await redis_client.get(f"oauth2:state:{state}")
redirect_uri = await redis_client.get(f"oauth2:redir:{state}")
await redis_client.delete(f"oauth2:state:{state}")
await redis_client.delete(f"oauth2:redir:{state}")
await redis_client.aclose()
if not user_id:
raise ValueError("Invalid or expired state")
# Exchange code for tokens
async with aiohttp.ClientSession() as session:
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": self.client_id,
"client_secret": self.client_secret,
}
async with session.post(self.token_url, data=data) as resp:
if resp.status != 200:
raise RuntimeError(f"Token exchange failed: {await resp.text()}")
token_data = await resp.json()
# Save credentials with expiration
access_token = token_data["access_token"]
refresh_token = token_data.get("refresh_token")
expires_in = token_data.get("expires_in")
expires_at = int(time.time()) + int(expires_in) if expires_in else None
await self.db_service.save_oauth2_credentials(
user_id, self.info.slug, access_token, refresh_token, expires_at
)
# Trigger data sync
asyncio.create_task(self._pull_and_push_for_user({
"user_id": user_id,
"access_token": access_token,
"refresh_token": refresh_token,
}))
return {"provider_slug": self.info.slug, "stage": "completed"}
Token Refresh
自动 token refresh 可确保持续可用的访问能力:复制
async def get_valid_access_token(self, user_id: str) -> Optional[str]:
"""Get valid access token, refreshing if necessary"""
# Get stored credentials
creds = await self.db_service.get_user_credentials(
user_id, self.info.slug, LinkType.OAUTH2
)
if not creds:
return None
access_token = creds["access_token"]
expires_at = creds.get("expires_at")
# Check if still valid
if expires_at and time.time() < expires_at:
return access_token
# Token expired, refresh it
refresh_token = creds.get("refresh_token")
if not refresh_token:
return None
async with aiohttp.ClientSession() as session:
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
}
async with session.post(self.token_url, data=data) as resp:
if resp.status != 200:
# Refresh failed, remove invalid credentials
await self.db_service.delete_user_theta_provider(
user_id, self.info.slug
)
return None
token_data = await resp.json()
# Update credentials
new_access_token = token_data["access_token"]
new_refresh_token = token_data.get("refresh_token", refresh_token)
new_expires_in = token_data.get("expires_in")
new_expires_at = int(time.time()) + int(new_expires_in) if new_expires_in else None
await self.db_service.save_oauth2_credentials(
user_id, self.info.slug, new_access_token, new_refresh_token, new_expires_at
)
return new_access_token
Data Fetching with Pagination
复制
async def pull_from_vendor_api(
self,
access_token: str,
refresh_token: Optional[str] = None,
days: int = 2
) -> List[Dict[str, Any]]:
"""Pull data from Whoop API with pagination"""
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
all_raw_data = []
async with aiohttp.ClientSession() as session:
endpoints = {
"sleep": f"{self.api_base_url}/activity/sleep",
"recovery": f"{self.api_base_url}/recovery",
"workout": f"{self.api_base_url}/activity/workout",
"cycle": f"{self.api_base_url}/cycle"
}
for data_type, endpoint in endpoints.items():
try:
params = {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"limit": 25
}
# Handle pagination
data = await self._fetch_paginated_data(
session, endpoint, headers, params
)
if data:
all_raw_data.append({
"data_type": data_type,
"data": data,
"timestamp": int(time.time() * 1000)
})
except Exception as e:
logging.error(f"Error fetching {data_type}: {str(e)}")
continue
return all_raw_data
Supported Data Types
- Sleep
- Recovery
- Cycles
- Workouts
- Total in bed time
- Light sleep
- Deep sleep (slow-wave)
- REM sleep
- Sleep efficiency
- Disturbances
- HRV (RMSSD)
- Resting heart rate
- Respiratory rate
- SpO2
- Skin temperature
- Recovery score
- Strain score
- Kilojoules
- Average heart rate
- Max heart rate
- Activity type
- Duration
- Intensity zones
- Calories burned
- Heart rate stats
Source Code
The complete Whoop provider source code is available at:复制
connect/theta/mirobody_whoop/provider_whoop.py
Key Differences from OAuth 1.0
Token Types
Token Types
OAuth 2.0 uses bearer tokens, OAuth 1.0 uses signed requests
Token Expiration
Token Expiration
OAuth 2.0 tokens expire and need refresh, OAuth 1.0 tokens don’t expire
Simplicity
Simplicity
OAuth 2.0 is simpler - no signature calculation required
Refresh Flow
Refresh Flow
OAuth 2.0 has built-in token refresh, OAuth 1.0 requires re-authorization