概览
Garmin Provider 是一个可用于生产环境的实现示例,展示了 OAuth 1.0 认证与全面的健康数据集成方式。本示例展示 Garmin Provider 的真实实现代码。构建你自己的 Providers 时,可以把它作为参考。
核心特性
OAuth 1.0
Three-legged OAuth 认证
Comprehensive Data
Activities, sleep, HRV, stress, body metrics
Rate Limiting
遵循 API rate limits
Error Handling
稳健的错误恢复
Provider 结构
复制
from mirobody.pulse.theta.platform.base import BaseThetaProvider
from requests_oauthlib import OAuth1Session
class ThetaGarminProvider(BaseThetaProvider):
"""Garmin Connect OAuth 1.0 Provider"""
def __init__(self):
super().__init__()
# Load configuration
self.client_id = safe_read_cfg("GARMIN_CLIENT_ID")
self.client_secret = safe_read_cfg("GARMIN_CLIENT_SECRET")
self.redirect_url = safe_read_cfg("GARMIN_REDIRECT_URL")
# OAuth 1.0 endpoints
self.request_token_url = "https://connectapi.garmin.com/oauth-service/oauth/request_token"
self.auth_url = "https://connect.garmin.com/oauthConfirm/"
self.access_token_url = "https://connectapi.garmin.com/oauth-service/oauth/access_token"
self.api_base_url = "https://apis.garmin.com/wellness-api/rest"
数据映射配置
Garmin 提供了非常丰富的健康数据。下面展示其映射方式:复制
GARMIN_DATA_CONFIG = {
"dailies": {
"timestamp_source": "calendarDate",
"simple_fields": {
"steps": {
"indicator": StandardIndicator.DAILY_STEPS.value.name,
"converter": lambda x: x,
"unit": "count"
},
"distanceInMeters": {
"indicator": StandardIndicator.DAILY_DISTANCE.value.name,
"converter": lambda x: x,
"unit": "m"
},
"activeKilocalories": {
"indicator": StandardIndicator.DAILY_CALORIES_ACTIVE.value.name,
"converter": lambda x: x,
"unit": "kcal"
},
"restingHeartRateInBeatsPerMinute": {
"indicator": StandardIndicator.DAILY_HEART_RATE_RESTING.value.name,
"converter": lambda x: x,
"unit": "count/min"
}
},
"time_series": {
"timeOffsetHeartRateSamples": {
"indicator": StandardIndicator.HEART_RATE.value.name,
"unit": "count/min",
"format": "list",
"value_field": "heartRateInBeatsPerMinute",
"offset_field": "timestampOffsetInSeconds"
}
}
},
"sleeps": {
"timestamp_source": "calendarDate",
"simple_fields": {
"durationInSeconds": {
"indicator": StandardIndicator.DAILY_SLEEP_DURATION.value.name,
"converter": lambda x: x * 1000, # seconds to milliseconds
"unit": "ms"
},
"deepSleepDurationInSeconds": {
"indicator": StandardIndicator.DAILY_DEEP_SLEEP.value.name,
"converter": lambda x: x * 1000,
"unit": "ms"
}
}
}
}
OAuth 1.0 实现
发起链接(Link)
复制
async def link(self, request: LinkRequest) -> Dict[str, Any]:
"""Initiate OAuth 1.0 flow"""
user_id = request.user_id
# Create OAuth 1.0 session
oauth = OAuth1Session(
client_key=self.client_id,
client_secret=self.client_secret,
)
# Get request token
resp = oauth.post(self.request_token_url)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get request token: {resp.text}")
# Parse response
params = parse_qs(resp.text)
oauth_token = params['oauth_token'][0]
oauth_token_secret = params['oauth_token_secret'][0]
# Store token secret in Redis
redis_client = await self._get_redis_client()
await redis_client.setex(f"oauth:secret:{oauth_token}", 900, oauth_token_secret)
await redis_client.setex(f"oauth:user:{oauth_token}", 900, user_id)
await redis_client.aclose()
# Build authorization URL
auth_params = {
"oauth_token": oauth_token,
"oauth_callback": self.redirect_url
}
authorization_url = f"{self.auth_url}?{urlencode(auth_params)}"
return {"link_web_url": authorization_url}
处理回调(Callback)
复制
async def callback(self, oauth_token: str, oauth_verifier: str) -> Dict[str, Any]:
"""Handle OAuth 1.0 callback"""
# Retrieve token secret from Redis
redis_client = await self._get_redis_client()
oauth_token_secret = await redis_client.get(f"oauth:secret:{oauth_token}")
user_id = await redis_client.get(f"oauth:user:{oauth_token}")
await redis_client.delete(f"oauth:secret:{oauth_token}")
await redis_client.delete(f"oauth:user:{oauth_token}")
await redis_client.aclose()
# Create OAuth session with verifier
oauth = OAuth1Session(
client_key=self.client_id,
client_secret=self.client_secret,
resource_owner_key=oauth_token,
resource_owner_secret=oauth_token_secret,
verifier=oauth_verifier
)
# Get access token
resp = oauth.post(self.access_token_url)
if resp.status_code != 200:
raise RuntimeError(f"Failed to get access token: {resp.text}")
# Parse tokens
params = parse_qs(resp.text)
access_token = params['oauth_token'][0]
access_token_secret = params['oauth_token_secret'][0]
# Save credentials
await self.db_service.save_oauth1_credentials(
user_id, self.info.slug, access_token, access_token_secret
)
# Trigger initial data pull
asyncio.create_task(self._pull_and_push_for_user({
"user_id": user_id,
"access_token": access_token,
"access_token_secret": access_token_secret,
}))
return {
"provider_slug": self.info.slug,
"stage": "completed"
}
数据拉取(Data Fetching)
Garmin 针对不同数据类型提供了多个 endpoints:复制
async def pull_from_vendor_api(
self,
access_token: str,
token_secret: str,
days: int = 2
) -> List[Dict[str, Any]]:
"""Pull data from Garmin API"""
# Create OAuth 1.0 session
oauth = OAuth1Session(
client_key=self.client_id,
client_secret=self.client_secret,
resource_owner_key=access_token,
resource_owner_secret=token_secret
)
# Calculate date range
end_date = datetime.now(timezone.utc).date()
start_date = end_date - timedelta(days=days)
all_raw_data = []
# Fetch different data types
endpoints = {
"dailies": f"{self.api_base_url}/dailies",
"sleeps": f"{self.api_base_url}/sleeps",
"hrv": f"{self.api_base_url}/hrv",
"stress": f"{self.api_base_url}/stressDetails"
}
for data_type, endpoint in endpoints.items():
try:
params = {
"uploadStartTimeInSeconds": int(start_date.timestamp()),
"uploadEndTimeInSeconds": int(end_date.timestamp())
}
resp = oauth.get(endpoint, params=params)
if resp.status_code == 200:
data = resp.json()
if data:
all_raw_data.append({
"data_type": data_type,
"data": data,
"timestamp": int(time.time() * 1000)
})
except Exception as e:
logging.error(f"Error fetching {data_type}: {str(e)}")
continue
return all_raw_data
Supported Data Types
The Garmin provider supports:- Activity
- Heart Rate
- Sleep
- Advanced
- Daily steps
- Distance (meters)
- Active calories
- Basal calories
- Floors climbed
- Active time
- Minimum daily
- Maximum daily
- Average daily
- Resting rate
- Time series samples
- Total duration
- Deep sleep
- Light sleep
- REM sleep
- Awake time
- HRV (heart rate variability)
- Stress levels
- Respiration rate
- Body composition
Source Code
The complete Garmin provider source code is available at:复制
connect/theta/mirobody_garmin_connect/provider_garmin.py