跳转到主要内容

概览

Whoop Provider 展示了带 token refresh 的 OAuth 2.0 认证实现,以及恢复(recovery)与表现(performance)数据的完整集成方式。
本示例展示带自动 token refresh 的 OAuth 2.0 实现,非常适合学习现代 OAuth 模式。

核心特性

OAuth 2.0

带 refresh tokens 的现代 OAuth

Recovery Data

HRV, resting HR, recovery scores

Sleep Analytics

详细的睡眠分期与质量分析

Workout Data

带强度分区的训练数据

Provider 结构

from mirobody.pulse.theta.platform.base import BaseThetaProvider
import aiohttp

class ThetaWhoopProvider(BaseThetaProvider):
    """Whoop OAuth 2.0 Provider"""
    
    def __init__(self):
        super().__init__()
        
        # OAuth 2.0 configuration
        self.client_id = safe_read_cfg("WHOOP_CLIENT_ID")
        self.client_secret = safe_read_cfg("WHOOP_CLIENT_SECRET")
        self.redirect_url = safe_read_cfg("WHOOP_REDIRECT_URL")
        
        # API endpoints
        self.auth_url = "https://api.prod.whoop.com/oauth/oauth2/auth"
        self.token_url = "https://api.prod.whoop.com/oauth/oauth2/token"
        self.api_base_url = "https://api.prod.whoop.com/developer/v2"
        
        # Scopes
        self.scopes = "offline read:recovery read:sleep read:cycles read:workout"

数据映射

Whoop 提供了细粒度的恢复与表现指标:
WHOOP_INDICATOR_MAPPING = {
    "sleep": {
        "score.stage_summary.total_in_bed_time_milli": (
            StandardIndicator.SLEEP_IN_BED.value.name,
            lambda x: x,
            "ms"
        ),
        "score.stage_summary.total_light_sleep_time_milli": (
            StandardIndicator.SLEEP_ANALYSIS_ASLEEP_CORE.value.name,
            lambda x: x,
            "ms"
        ),
        "score.stage_summary.total_slow_wave_sleep_time_milli": (
            StandardIndicator.SLEEP_ANALYSIS_ASLEEP_DEEP.value.name,
            lambda x: x,
            "ms"
        ),
        "score.sleep_efficiency_percentage": (
            StandardIndicator.SLEEP_EFFICIENCY.value.name,
            lambda x: x,
            "%"
        )
    },
    "recovery": {
        "score.hrv_rmssd_milli": (
            StandardIndicator.HRV.value.name,
            lambda x: x,
            "ms"
        ),
        "score.resting_heart_rate": (
            StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
            lambda x: x,
            "count/min"
        ),
        "score.respiratory_rate": (
            StandardIndicator.RESPIRATORY_RATE.value.name,
            lambda x: x,
            "breaths/min"
        )
    }
}

OAuth 2.0 实现

发起链接(Link)

async def link(self, request: LinkRequest) -> Dict[str, Any]:
    """Initiate OAuth 2.0 flow"""
    user_id = request.user_id
    
    # Generate state for CSRF protection
    state_payload = {"s": str(uuid.uuid4())}
    state = urlencode(state_payload)
    
    # Store in Redis
    redis_client = await self._get_redis_client()
    await redis_client.setex(f"oauth2:state:{state}", 900, user_id)
    await redis_client.setex(f"oauth2:redir:{state}", 900, self.redirect_url)
    await redis_client.aclose()
    
    # Build authorization URL
    params = {
        "client_id": self.client_id,
        "response_type": "code",
        "redirect_uri": self.redirect_url,
        "scope": self.scopes,
        "state": state,
    }
    authorization_url = f"{self.auth_url}?{urlencode(params)}"
    
    return {"link_web_url": authorization_url}

处理回调(Callback)

async def callback(self, code: str, state: str) -> Dict[str, Any]:
    """Handle OAuth 2.0 callback"""
    
    # Validate state
    redis_client = await self._get_redis_client()
    user_id = await redis_client.get(f"oauth2:state:{state}")
    redirect_uri = await redis_client.get(f"oauth2:redir:{state}")
    await redis_client.delete(f"oauth2:state:{state}")
    await redis_client.delete(f"oauth2:redir:{state}")
    await redis_client.aclose()
    
    if not user_id:
        raise ValueError("Invalid or expired state")
    
    # Exchange code for tokens
    async with aiohttp.ClientSession() as session:
        data = {
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": redirect_uri,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }
        
        async with session.post(self.token_url, data=data) as resp:
            if resp.status != 200:
                raise RuntimeError(f"Token exchange failed: {await resp.text()}")
            token_data = await resp.json()
    
    # Save credentials with expiration
    access_token = token_data["access_token"]
    refresh_token = token_data.get("refresh_token")
    expires_in = token_data.get("expires_in")
    expires_at = int(time.time()) + int(expires_in) if expires_in else None
    
    await self.db_service.save_oauth2_credentials(
        user_id, self.info.slug, access_token, refresh_token, expires_at
    )
    
    # Trigger data sync
    asyncio.create_task(self._pull_and_push_for_user({
        "user_id": user_id,
        "access_token": access_token,
        "refresh_token": refresh_token,
    }))
    
    return {"provider_slug": self.info.slug, "stage": "completed"}

Token Refresh

自动 token refresh 可确保持续可用的访问能力:
async def get_valid_access_token(self, user_id: str) -> Optional[str]:
    """Get valid access token, refreshing if necessary"""
    
    # Get stored credentials
    creds = await self.db_service.get_user_credentials(
        user_id, self.info.slug, LinkType.OAUTH2
    )
    
    if not creds:
        return None
    
    access_token = creds["access_token"]
    expires_at = creds.get("expires_at")
    
    # Check if still valid
    if expires_at and time.time() < expires_at:
        return access_token
    
    # Token expired, refresh it
    refresh_token = creds.get("refresh_token")
    if not refresh_token:
        return None
    
    async with aiohttp.ClientSession() as session:
        data = {
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }
        
        async with session.post(self.token_url, data=data) as resp:
            if resp.status != 200:
                # Refresh failed, remove invalid credentials
                await self.db_service.delete_user_theta_provider(
                    user_id, self.info.slug
                )
                return None
            
            token_data = await resp.json()
    
    # Update credentials
    new_access_token = token_data["access_token"]
    new_refresh_token = token_data.get("refresh_token", refresh_token)
    new_expires_in = token_data.get("expires_in")
    new_expires_at = int(time.time()) + int(new_expires_in) if new_expires_in else None
    
    await self.db_service.save_oauth2_credentials(
        user_id, self.info.slug, new_access_token, new_refresh_token, new_expires_at
    )
    
    return new_access_token

Data Fetching with Pagination

async def pull_from_vendor_api(
    self,
    access_token: str,
    refresh_token: Optional[str] = None,
    days: int = 2
) -> List[Dict[str, Any]]:
    """Pull data from Whoop API with pagination"""
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json"
    }
    
    end_date = datetime.now(timezone.utc)
    start_date = end_date - timedelta(days=days)
    
    all_raw_data = []
    
    async with aiohttp.ClientSession() as session:
        endpoints = {
            "sleep": f"{self.api_base_url}/activity/sleep",
            "recovery": f"{self.api_base_url}/recovery",
            "workout": f"{self.api_base_url}/activity/workout",
            "cycle": f"{self.api_base_url}/cycle"
        }
        
        for data_type, endpoint in endpoints.items():
            try:
                params = {
                    "start": start_date.isoformat(),
                    "end": end_date.isoformat(),
                    "limit": 25
                }
                
                # Handle pagination
                data = await self._fetch_paginated_data(
                    session, endpoint, headers, params
                )
                
                if data:
                    all_raw_data.append({
                        "data_type": data_type,
                        "data": data,
                        "timestamp": int(time.time() * 1000)
                    })
            except Exception as e:
                logging.error(f"Error fetching {data_type}: {str(e)}")
                continue
    
    return all_raw_data

Supported Data Types

  • Total in bed time
  • Light sleep
  • Deep sleep (slow-wave)
  • REM sleep
  • Sleep efficiency
  • Disturbances

Source Code

The complete Whoop provider source code is available at:
connect/theta/mirobody_whoop/provider_whoop.py

Key Differences from OAuth 1.0

OAuth 2.0 uses bearer tokens, OAuth 1.0 uses signed requests
OAuth 2.0 tokens expire and need refresh, OAuth 1.0 tokens don’t expire
OAuth 2.0 is simpler - no signature calculation required
OAuth 2.0 has built-in token refresh, OAuth 1.0 requires re-authorization

Next Steps