54 lines
2.5 KiB
Python
54 lines
2.5 KiB
Python
![]() |
import json
|
||
|
import httpx
|
||
|
from typing import Optional, Dict, Any
|
||
|
from mooc.core.config import settings
|
||
|
|
||
|
class WeChatClient:
|
||
|
def __init__(self, appid: str = None, appsecret: str = None):
|
||
|
self.appid = appid or settings.WECHAT_APPID
|
||
|
self.appsecret = appsecret or settings.WECHAT_APPSECRET
|
||
|
self._access_token = ""
|
||
|
|
||
|
async def _get(self, url: str) -> Dict[str, Any]:
|
||
|
async with httpx.AsyncClient() as client:
|
||
|
response = await client.get(url, verify=False)
|
||
|
return response.json()
|
||
|
|
||
|
async def _post(self, url: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
|
async with httpx.AsyncClient() as client:
|
||
|
response = await client.post(url, json=data, verify=False)
|
||
|
return response.json()
|
||
|
|
||
|
async def get_access_token(self) -> str:
|
||
|
"""获取access token"""
|
||
|
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.appsecret}"
|
||
|
result = await self._get(url)
|
||
|
if "access_token" in result:
|
||
|
self._access_token = result["access_token"]
|
||
|
return self._access_token
|
||
|
raise Exception(f"Failed to get access token: {result}")
|
||
|
|
||
|
async def code2session(self, code: str) -> Dict[str, Any]:
|
||
|
"""小程序登录凭证校验"""
|
||
|
url = f"https://api.weixin.qq.com/sns/jscode2session?appid={self.appid}&secret={self.appsecret}&js_code={code}&grant_type=authorization_code"
|
||
|
result = await self._get(url)
|
||
|
if "errcode" in result and result["errcode"] != 0:
|
||
|
raise Exception(f"Code2Session failed: {result['errmsg']}")
|
||
|
return result
|
||
|
|
||
|
async def get_unlimited_qrcode(self, scene: str, access_token: str = None) -> bytes:
|
||
|
"""获取小程序码"""
|
||
|
if not access_token:
|
||
|
access_token = await self.get_access_token()
|
||
|
url = f"https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={access_token}"
|
||
|
data = {"scene": scene}
|
||
|
async with httpx.AsyncClient() as client:
|
||
|
response = await client.post(url, json=data, verify=False)
|
||
|
return response.content
|
||
|
|
||
|
async def send_template_message(self, data: Dict[str, Any], access_token: str = None) -> Dict[str, Any]:
|
||
|
"""发送订阅消息"""
|
||
|
if not access_token:
|
||
|
access_token = await self.get_access_token()
|
||
|
url = f"https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token={access_token}"
|
||
|
return await self._post(url, data)
|