跳转到主要内容

概览

Garmin Provider 是一个可用于生产环境的实现示例,展示了 OAuth 1.0 认证与全面的健康数据集成方式。
本示例展示了 Garmin Provider 的真实实现代码。在构建你自己的 Provider 时,可以将其作为参考。

核心特性

OAuth 1.0

三向 (Three-legged) OAuth 身份验证

全面数据

活动、睡眠、HRV、压力、身体指标

速率限制

遵循 API 速率限制

错误处理

健壮的错误恢复机制

Provider 结构

from mirobody.pulse.theta.platform.base import BaseThetaProvider
from requests_oauthlib import OAuth1Session

class ThetaGarminProvider(BaseThetaProvider):
    """Garmin Connect OAuth 1.0 Provider"""
    
    def __init__(self):
        super().__init__()
        # 加载配置
        self.client_id = safe_read_cfg("GARMIN_CLIENT_ID")
        self.client_secret = safe_read_cfg("GARMIN_CLIENT_SECRET")
        self.redirect_url = safe_read_cfg("GARMIN_REDIRECT_URL")
        
        # OAuth 1.0 端点
        self.request_token_url = "https://connectapi.garmin.com/oauth-service/oauth/request_token"
        self.auth_url = "https://connect.garmin.com/oauthConfirm/"
        self.access_token_url = "https://connectapi.garmin.com/oauth-service/oauth/access_token"
        self.api_base_url = "https://apis.garmin.com/wellness-api/rest"

数据映射配置

Garmin 提供了详尽的健康数据。以下是其映射方式:
GARMIN_DATA_CONFIG = {
    "dailies": {
        "timestamp_source": "calendarDate",
        "simple_fields": {
            "steps": {
                "indicator": StandardIndicator.DAILY_STEPS.value.name,
                "converter": lambda x: x,
                "unit": "count"
            },
            "distanceInMeters": {
                "indicator": StandardIndicator.DAILY_DISTANCE.value.name,
                "converter": lambda x: x,
                "unit": "m"
            },
            "activeKilocalories": {
                "indicator": StandardIndicator.DAILY_CALORIES_ACTIVE.value.name,
                "converter": lambda x: x,
                "unit": "kcal"
            },
            "restingHeartRateInBeatsPerMinute": {
                "indicator": StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
                "converter": lambda x: x,
                "unit": "count/min"
            }
        },
        "time_series": {
            "timeOffsetHeartRateSamples": {
                "indicator": StandardIndicator.HEART_RATE.value.name,
                "unit": "count/min",
                "format": "list",
                "value_field": "heartRateInBeatsPerMinute",
                "offset_field": "timestampOffsetInSeconds"
            }
        }
    },
    "sleeps": {
        "timestamp_source": "calendarDate",
        "simple_fields": {
            "durationInSeconds": {
                "indicator": StandardIndicator.DAILY_SLEEP_DURATION.value.name,
                "converter": lambda x: x * 1000,  # 秒转换为毫秒
                "unit": "ms"
            },
            "deepSleepDurationInSeconds": {
                "indicator": StandardIndicator.DAILY_DEEP_SLEEP.value.name,
                "converter": lambda x: x * 1000,
                "unit": "ms"
            }
        }
    }
}

OAuth 1.0 实现

发起绑定

async def link(self, request: LinkRequest) -> Dict[str, Any]:
    """发起 OAuth 1.0 流程"""
    user_id = request.user_id
    
    # 创建 OAuth 1.0 会话
    oauth = OAuth1Session(
        client_key=self.client_id,
        client_secret=self.client_secret,
    )
    
    # 获取 request token
    resp = oauth.post(self.request_token_url)
    if resp.status_code != 200:
        raise RuntimeError(f"获取 request token 失败: {resp.text}")
    
    # 解析响应
    params = parse_qs(resp.text)
    oauth_token = params['oauth_token'][0]
    oauth_token_secret = params['oauth_token_secret'][0]
    
    # 将 token secret 存储在 Redis 中
    redis_client = await self._get_redis_client()
    await redis_client.setex(f"oauth:secret:{oauth_token}", 900, oauth_token_secret)
    await redis_client.setex(f"oauth:user:{oauth_token}", 900, user_id)
    await redis_client.aclose()
    
    # 构建授权 URL
    auth_params = {
        "oauth_token": oauth_token,
        "oauth_callback": self.redirect_url
    }
    authorization_url = f"{self.auth_url}?{urlencode(auth_params)}"
    
    return {"link_web_url": authorization_url}

处理回调

async def callback(self, oauth_token: str, oauth_verifier: str) -> Dict[str, Any]:
    """处理 OAuth 1.0 回调"""
    
    # 从 Redis 中获取 token secret
    redis_client = await self._get_redis_client()
    oauth_token_secret = await redis_client.get(f"oauth:secret:{oauth_token}")
    user_id = await redis_client.get(f"oauth:user:{oauth_token}")
    await redis_client.delete(f"oauth:secret:{oauth_token}")
    await redis_client.delete(f"oauth:user:{oauth_token}")
    await redis_client.aclose()
    
    # 使用 verifier 创建 OAuth 会话
    oauth = OAuth1Session(
        client_key=self.client_id,
        client_secret=self.client_secret,
        resource_owner_key=oauth_token,
        resource_owner_secret=oauth_token_secret,
        verifier=oauth_verifier
    )
    
    # 获取 access token
    resp = oauth.post(self.access_token_url)
    if resp.status_code != 200:
        raise RuntimeError(f"获取 access token 失败: {resp.text}")
    
    # 解析 token
    params = parse_qs(resp.text)
    access_token = params['oauth_token'][0]
    access_token_secret = params['oauth_token_secret'][0]
    
    # 保存凭据
    await self.db_service.save_oauth1_credentials(
        user_id, self.info.slug, access_token, access_token_secret
    )
    
    # 触发初始数据拉取
    asyncio.create_task(self._pull_and_push_for_user({
        "user_id": user_id,
        "access_token": access_token,
        "access_token_secret": access_token_secret,
    }))
    
    return {
        "provider_slug": self.info.slug,
        "stage": "completed"
    }

数据获取

Garmin 为不同的数据类型提供了多个端点:
async def pull_from_vendor_api(
    self,
    access_token: str,
    token_secret: str,
    days: int = 2
) -> List[Dict[str, Any]]:
    """从 Garmin API 拉取数据"""
    
    # 创建 OAuth 1.0 会话
    oauth = OAuth1Session(
        client_key=self.client_id,
        client_secret=self.client_secret,
        resource_owner_key=access_token,
        resource_owner_secret=token_secret
    )
    
    # 计算日期范围
    end_date = datetime.now(timezone.utc).date()
    start_date = end_date - timedelta(days=days)
    
    all_raw_data = []
    
    # 获取不同类型的数据
    endpoints = {
        "dailies": f"{self.api_base_url}/dailies",
        "sleeps": f"{self.api_base_url}/sleeps",
        "hrv": f"{self.api_base_url}/hrv",
        "stress": f"{self.api_base_url}/stressDetails"
    }
    
    for data_type, endpoint in endpoints.items():
        try:
            params = {
                "uploadStartTimeInSeconds": int(start_date.timestamp()),
                "uploadEndTimeInSeconds": int(end_date.timestamp())
            }
            
            resp = oauth.get(endpoint, params=params)
            if resp.status_code == 200:
                data = resp.json()
                if data:
                    all_raw_data.append({
                        "data_type": data_type,
                        "data": data,
                        "timestamp": int(time.time() * 1000)
                    })
        except Exception as e:
            logging.error(f"获取 {data_type} 出错: {str(e)}")
            continue
    
    return all_raw_data

支持的数据类型

Garmin Provider 支持:
  • 每日步数
  • 距离(米)
  • 活动消耗卡路里
  • 基础代谢卡路里
  • 爬楼层数
  • 活动时间

源代码

完整的 Garmin Provider 源代码位于:
connect/theta/mirobody_garmin_connect/provider_garmin.py

下一步