Program/mooc/utils/wechat_client.py
烟雨如花 4f34f5b8be init
2024-12-31 22:27:04 +08:00

54 lines
2.5 KiB
Python

import json
import httpx
from typing import Optional, Dict, Any
from mooc.core.config import settings
class WeChatClient:
def __init__(self, appid: str = None, appsecret: str = None):
self.appid = appid or settings.WECHAT_APPID
self.appsecret = appsecret or settings.WECHAT_APPSECRET
self._access_token = ""
async def _get(self, url: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(url, verify=False)
return response.json()
async def _post(self, url: str, data: Dict[str, Any]) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, verify=False)
return response.json()
async def get_access_token(self) -> str:
"""获取access token"""
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.appsecret}"
result = await self._get(url)
if "access_token" in result:
self._access_token = result["access_token"]
return self._access_token
raise Exception(f"Failed to get access token: {result}")
async def code2session(self, code: str) -> Dict[str, Any]:
"""小程序登录凭证校验"""
url = f"https://api.weixin.qq.com/sns/jscode2session?appid={self.appid}&secret={self.appsecret}&js_code={code}&grant_type=authorization_code"
result = await self._get(url)
if "errcode" in result and result["errcode"] != 0:
raise Exception(f"Code2Session failed: {result['errmsg']}")
return result
async def get_unlimited_qrcode(self, scene: str, access_token: str = None) -> bytes:
"""获取小程序码"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={access_token}"
data = {"scene": scene}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, verify=False)
return response.content
async def send_template_message(self, data: Dict[str, Any], access_token: str = None) -> Dict[str, Any]:
"""发送订阅消息"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token={access_token}"
return await self._post(url, data)