概览
Garmin Provider 是一个可用于生产环境的实现示例,展示了 OAuth 1.0 认证与全面的健康数据集成方式。本示例展示了 Garmin Provider 的真实实现代码。在构建你自己的 Provider 时,可以将其作为参考。
核心特性
OAuth 1.0
三向 (Three-legged) OAuth 身份验证
全面数据
活动、睡眠、HRV、压力、身体指标
速率限制
遵循 API 速率限制
错误处理
健壮的错误恢复机制
Provider 结构
复制
from mirobody.pulse.theta.platform.base import BaseThetaProvider
from requests_oauthlib import OAuth1Session
class ThetaGarminProvider(BaseThetaProvider):
"""Garmin Connect OAuth 1.0 Provider"""
def __init__(self):
super().__init__()
# 加载配置
self.client_id = safe_read_cfg("GARMIN_CLIENT_ID")
self.client_secret = safe_read_cfg("GARMIN_CLIENT_SECRET")
self.redirect_url = safe_read_cfg("GARMIN_REDIRECT_URL")
# OAuth 1.0 端点
self.request_token_url = "https://connectapi.garmin.com/oauth-service/oauth/request_token"
self.auth_url = "https://connect.garmin.com/oauthConfirm/"
self.access_token_url = "https://connectapi.garmin.com/oauth-service/oauth/access_token"
self.api_base_url = "https://apis.garmin.com/wellness-api/rest"
数据映射配置
Garmin 提供了详尽的健康数据。以下是其映射方式:复制
GARMIN_DATA_CONFIG = {
"dailies": {
"timestamp_source": "calendarDate",
"simple_fields": {
"steps": {
"indicator": StandardIndicator.DAILY_STEPS.value.name,
"converter": lambda x: x,
"unit": "count"
},
"distanceInMeters": {
"indicator": StandardIndicator.DAILY_DISTANCE.value.name,
"converter": lambda x: x,
"unit": "m"
},
"activeKilocalories": {
"indicator": StandardIndicator.DAILY_CALORIES_ACTIVE.value.name,
"converter": lambda x: x,
"unit": "kcal"
},
"restingHeartRateInBeatsPerMinute": {
"indicator": StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
"converter": lambda x: x,
"unit": "count/min"
}
},
"time_series": {
"timeOffsetHeartRateSamples": {
"indicator": StandardIndicator.HEART_RATE.value.name,
"unit": "count/min",
"format": "list",
"value_field": "heartRateInBeatsPerMinute",
"offset_field": "timestampOffsetInSeconds"
}
}
},
"sleeps": {
"timestamp_source": "calendarDate",
"simple_fields": {
"durationInSeconds": {
"indicator": StandardIndicator.DAILY_SLEEP_DURATION.value.name,
"converter": lambda x: x * 1000, # 秒转换为毫秒
"unit": "ms"
},
"deepSleepDurationInSeconds": {
"indicator": StandardIndicator.DAILY_DEEP_SLEEP.value.name,
"converter": lambda x: x * 1000,
"unit": "ms"
}
}
}
}
OAuth 1.0 实现
发起绑定
复制
async def link(self, request: LinkRequest) -> Dict[str, Any]:
"""发起 OAuth 1.0 流程"""
user_id = request.user_id
# 创建 OAuth 1.0 会话
oauth = OAuth1Session(
client_key=self.client_id,
client_secret=self.client_secret,
)
# 获取 request token
resp = oauth.post(self.request_token_url)
if resp.status_code != 200:
raise RuntimeError(f"获取 request token 失败: {resp.text}")
# 解析响应
params = parse_qs(resp.text)
oauth_token = params['oauth_token'][0]
oauth_token_secret = params['oauth_token_secret'][0]
# 将 token secret 存储在 Redis 中
redis_client = await self._get_redis_client()
await redis_client.setex(f"oauth:secret:{oauth_token}", 900, oauth_token_secret)
await redis_client.setex(f"oauth:user:{oauth_token}", 900, user_id)
await redis_client.aclose()
# 构建授权 URL
auth_params = {
"oauth_token": oauth_token,
"oauth_callback": self.redirect_url
}
authorization_url = f"{self.auth_url}?{urlencode(auth_params)}"
return {"link_web_url": authorization_url}
处理回调
复制
async def callback(self, oauth_token: str, oauth_verifier: str) -> Dict[str, Any]:
"""处理 OAuth 1.0 回调"""
# 从 Redis 中获取 token secret
redis_client = await self._get_redis_client()
oauth_token_secret = await redis_client.get(f"oauth:secret:{oauth_token}")
user_id = await redis_client.get(f"oauth:user:{oauth_token}")
await redis_client.delete(f"oauth:secret:{oauth_token}")
await redis_client.delete(f"oauth:user:{oauth_token}")
await redis_client.aclose()
# 使用 verifier 创建 OAuth 会话
oauth = OAuth1Session(
client_key=self.client_id,
client_secret=self.client_secret,
resource_owner_key=oauth_token,
resource_owner_secret=oauth_token_secret,
verifier=oauth_verifier
)
# 获取 access token
resp = oauth.post(self.access_token_url)
if resp.status_code != 200:
raise RuntimeError(f"获取 access token 失败: {resp.text}")
# 解析 token
params = parse_qs(resp.text)
access_token = params['oauth_token'][0]
access_token_secret = params['oauth_token_secret'][0]
# 保存凭据
await self.db_service.save_oauth1_credentials(
user_id, self.info.slug, access_token, access_token_secret
)
# 触发初始数据拉取
asyncio.create_task(self._pull_and_push_for_user({
"user_id": user_id,
"access_token": access_token,
"access_token_secret": access_token_secret,
}))
return {
"provider_slug": self.info.slug,
"stage": "completed"
}
数据获取
Garmin 为不同的数据类型提供了多个端点:复制
async def pull_from_vendor_api(
self,
access_token: str,
token_secret: str,
days: int = 2
) -> List[Dict[str, Any]]:
"""从 Garmin API 拉取数据"""
# 创建 OAuth 1.0 会话
oauth = OAuth1Session(
client_key=self.client_id,
client_secret=self.client_secret,
resource_owner_key=access_token,
resource_owner_secret=token_secret
)
# 计算日期范围
end_date = datetime.now(timezone.utc).date()
start_date = end_date - timedelta(days=days)
all_raw_data = []
# 获取不同类型的数据
endpoints = {
"dailies": f"{self.api_base_url}/dailies",
"sleeps": f"{self.api_base_url}/sleeps",
"hrv": f"{self.api_base_url}/hrv",
"stress": f"{self.api_base_url}/stressDetails"
}
for data_type, endpoint in endpoints.items():
try:
params = {
"uploadStartTimeInSeconds": int(start_date.timestamp()),
"uploadEndTimeInSeconds": int(end_date.timestamp())
}
resp = oauth.get(endpoint, params=params)
if resp.status_code == 200:
data = resp.json()
if data:
all_raw_data.append({
"data_type": data_type,
"data": data,
"timestamp": int(time.time() * 1000)
})
except Exception as e:
logging.error(f"获取 {data_type} 出错: {str(e)}")
continue
return all_raw_data
支持的数据类型
Garmin Provider 支持:- 活动
- 心率
- 睡眠
- 高级指标
- 每日步数
- 距离(米)
- 活动消耗卡路里
- 基础代谢卡路里
- 爬楼层数
- 活动时间
- 每日最低心率
- 每日最高心率
- 每日平均心率
- 静息心率
- 心率时间序列样本
- 总睡眠时长
- 深睡时长
- 浅睡时长
- REM 睡眠时长
- 清醒时间
- HRV(心率变异性)
- 压力水平
- 呼吸频率
- 身体成分
源代码
完整的 Garmin Provider 源代码位于:复制
connect/theta/mirobody_garmin_connect/provider_garmin.py