Skip to main content

Overview

The Whoop provider demonstrates OAuth 2.0 authentication with token refresh and comprehensive recovery and performance data integration.
This example shows OAuth 2.0 implementation with automatic token refresh. Perfect for learning modern OAuth patterns.

Key Features

OAuth 2.0

Modern OAuth with refresh tokens

Recovery Data

HRV, resting HR, recovery scores

Sleep Analytics

Detailed sleep stages and quality

Workout Data

Exercise sessions with intensity zones

Provider Structure

from mirobody.pulse.theta.platform.base import BaseThetaProvider
import aiohttp

class ThetaWhoopProvider(BaseThetaProvider):
    """Whoop OAuth 2.0 Provider"""
    
    def __init__(self):
        super().__init__()
        
        # OAuth 2.0 configuration
        self.client_id = safe_read_cfg("WHOOP_CLIENT_ID")
        self.client_secret = safe_read_cfg("WHOOP_CLIENT_SECRET")
        self.redirect_url = safe_read_cfg("WHOOP_REDIRECT_URL")
        
        # API endpoints
        self.auth_url = "https://api.prod.whoop.com/oauth/oauth2/auth"
        self.token_url = "https://api.prod.whoop.com/oauth/oauth2/token"
        self.api_base_url = "https://api.prod.whoop.com/developer/v2"
        
        # Scopes
        self.scopes = "offline read:recovery read:sleep read:cycles read:workout"

Data Mapping

Whoop provides detailed recovery and performance metrics:
WHOOP_INDICATOR_MAPPING = {
    "sleep": {
        "score.stage_summary.total_in_bed_time_milli": (
            StandardIndicator.SLEEP_IN_BED.value.name,
            lambda x: x,
            "ms"
        ),
        "score.stage_summary.total_light_sleep_time_milli": (
            StandardIndicator.SLEEP_ANALYSIS_ASLEEP_CORE.value.name,
            lambda x: x,
            "ms"
        ),
        "score.stage_summary.total_slow_wave_sleep_time_milli": (
            StandardIndicator.SLEEP_ANALYSIS_ASLEEP_DEEP.value.name,
            lambda x: x,
            "ms"
        ),
        "score.sleep_efficiency_percentage": (
            StandardIndicator.SLEEP_EFFICIENCY.value.name,
            lambda x: x,
            "%"
        )
    },
    "recovery": {
        "score.hrv_rmssd_milli": (
            StandardIndicator.HRV.value.name,
            lambda x: x,
            "ms"
        ),
        "score.resting_heart_rate": (
            StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
            lambda x: x,
            "count/min"
        ),
        "score.respiratory_rate": (
            StandardIndicator.RESPIRATORY_RATE.value.name,
            lambda x: x,
            "breaths/min"
        )
    }
}

OAuth 2.0 Implementation

async def link(self, request: LinkRequest) -> Dict[str, Any]:
    """Initiate OAuth 2.0 flow"""
    user_id = request.user_id
    
    # Generate state for CSRF protection
    state_payload = {"s": str(uuid.uuid4())}
    state = urlencode(state_payload)
    
    # Store in Redis
    redis_client = await self._get_redis_client()
    await redis_client.setex(f"oauth2:state:{state}", 900, user_id)
    await redis_client.setex(f"oauth2:redir:{state}", 900, self.redirect_url)
    await redis_client.aclose()
    
    # Build authorization URL
    params = {
        "client_id": self.client_id,
        "response_type": "code",
        "redirect_uri": self.redirect_url,
        "scope": self.scopes,
        "state": state,
    }
    authorization_url = f"{self.auth_url}?{urlencode(params)}"
    
    return {"link_web_url": authorization_url}

Handling the Callback

async def callback(self, code: str, state: str) -> Dict[str, Any]:
    """Handle OAuth 2.0 callback"""
    
    # Validate state
    redis_client = await self._get_redis_client()
    user_id = await redis_client.get(f"oauth2:state:{state}")
    redirect_uri = await redis_client.get(f"oauth2:redir:{state}")
    await redis_client.delete(f"oauth2:state:{state}")
    await redis_client.delete(f"oauth2:redir:{state}")
    await redis_client.aclose()
    
    if not user_id:
        raise ValueError("Invalid or expired state")
    
    # Exchange code for tokens
    async with aiohttp.ClientSession() as session:
        data = {
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": redirect_uri,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }
        
        async with session.post(self.token_url, data=data) as resp:
            if resp.status != 200:
                raise RuntimeError(f"Token exchange failed: {await resp.text()}")
            token_data = await resp.json()
    
    # Save credentials with expiration
    access_token = token_data["access_token"]
    refresh_token = token_data.get("refresh_token")
    expires_in = token_data.get("expires_in")
    expires_at = int(time.time()) + int(expires_in) if expires_in else None
    
    await self.db_service.save_oauth2_credentials(
        user_id, self.info.slug, access_token, refresh_token, expires_at
    )
    
    # Trigger data sync
    asyncio.create_task(self._pull_and_push_for_user({
        "user_id": user_id,
        "access_token": access_token,
        "refresh_token": refresh_token,
    }))
    
    return {"provider_slug": self.info.slug, "stage": "completed"}

Token Refresh

Automatic token refresh ensures uninterrupted access:
async def get_valid_access_token(self, user_id: str) -> Optional[str]:
    """Get valid access token, refreshing if necessary"""
    
    # Get stored credentials
    creds = await self.db_service.get_user_credentials(
        user_id, self.info.slug, LinkType.OAUTH2
    )
    
    if not creds:
        return None
    
    access_token = creds["access_token"]
    expires_at = creds.get("expires_at")
    
    # Check if still valid
    if expires_at and time.time() < expires_at:
        return access_token
    
    # Token expired, refresh it
    refresh_token = creds.get("refresh_token")
    if not refresh_token:
        return None
    
    async with aiohttp.ClientSession() as session:
        data = {
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }
        
        async with session.post(self.token_url, data=data) as resp:
            if resp.status != 200:
                # Refresh failed, remove invalid credentials
                await self.db_service.delete_user_theta_provider(
                    user_id, self.info.slug
                )
                return None
            
            token_data = await resp.json()
    
    # Update credentials
    new_access_token = token_data["access_token"]
    new_refresh_token = token_data.get("refresh_token", refresh_token)
    new_expires_in = token_data.get("expires_in")
    new_expires_at = int(time.time()) + int(new_expires_in) if new_expires_in else None
    
    await self.db_service.save_oauth2_credentials(
        user_id, self.info.slug, new_access_token, new_refresh_token, new_expires_at
    )
    
    return new_access_token

Data Fetching with Pagination

async def pull_from_vendor_api(
    self,
    access_token: str,
    refresh_token: Optional[str] = None,
    days: int = 2
) -> List[Dict[str, Any]]:
    """Pull data from Whoop API with pagination"""
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json"
    }
    
    end_date = datetime.now(timezone.utc)
    start_date = end_date - timedelta(days=days)
    
    all_raw_data = []
    
    async with aiohttp.ClientSession() as session:
        endpoints = {
            "sleep": f"{self.api_base_url}/activity/sleep",
            "recovery": f"{self.api_base_url}/recovery",
            "workout": f"{self.api_base_url}/activity/workout",
            "cycle": f"{self.api_base_url}/cycle"
        }
        
        for data_type, endpoint in endpoints.items():
            try:
                params = {
                    "start": start_date.isoformat(),
                    "end": end_date.isoformat(),
                    "limit": 25
                }
                
                # Handle pagination
                data = await self._fetch_paginated_data(
                    session, endpoint, headers, params
                )
                
                if data:
                    all_raw_data.append({
                        "data_type": data_type,
                        "data": data,
                        "timestamp": int(time.time() * 1000)
                    })
            except Exception as e:
                logging.error(f"Error fetching {data_type}: {str(e)}")
                continue
    
    return all_raw_data

Supported Data Types

  • Total in bed time
  • Light sleep
  • Deep sleep (slow-wave)
  • REM sleep
  • Sleep efficiency
  • Disturbances

Source Code

The complete Whoop provider source code is available at:
connect/theta/mirobody_whoop/provider_whoop.py

Key Differences from OAuth 1.0

OAuth 2.0 uses bearer tokens, OAuth 1.0 uses signed requests
OAuth 2.0 tokens expire and need refresh, OAuth 1.0 tokens don’t expire
OAuth 2.0 is simpler - no signature calculation required
OAuth 2.0 has built-in token refresh, OAuth 1.0 requires re-authorization

Next Steps