refactor(wechat): update WeChatClient to WeChatAPI and enhance functionality

This commit is contained in:
?..濡.. 2025-01-09 07:00:06 +08:00
parent 0e27355932
commit 12318114f5
2 changed files with 97 additions and 5 deletions

View File

@ -1,9 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from typing import Dict, Any from typing import Dict, Any
from mooc.utils.wechat_client import WeChatClient from mooc.utils.wechat_client import WeChatAPI
router = APIRouter() router = APIRouter()
wechat_client = WeChatClient() wechat_client = WeChatAPI()
@router.get("/access_token") @router.get("/access_token")
async def get_access_token() -> Dict[str, Any]: async def get_access_token() -> Dict[str, Any]:

View File

@ -1,9 +1,9 @@
import json import json
import httpx import httpx
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, Union
from mooc.core.config import settings from mooc.core.config import settings
class WeChatClient: class WeChatAPI:
def __init__(self, appid: str = None, appsecret: str = None): def __init__(self, appid: str = None, appsecret: str = None):
self.appid = appid or settings.WECHAT_APPID self.appid = appid or settings.WECHAT_APPID
self.appsecret = appsecret or settings.WECHAT_APPSECRET self.appsecret = appsecret or settings.WECHAT_APPSECRET
@ -52,3 +52,95 @@ class WeChatClient:
access_token = await self.get_access_token() access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token={access_token}" url = f"https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token={access_token}"
return await self._post(url, data) return await self._post(url, data)
async def get_callback_ip(self, access_token: str = None) -> Dict[str, Any]:
"""获取微信服务器IP地址"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/getcallbackip?access_token={access_token}"
return await self._get(url)
async def check_callback(self, access_token: str = None, action: str = "all") -> Dict[str, Any]:
"""检查回调配置"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/callback/check?access_token={access_token}"
data = {"action": action, "check_operator": "DEFAULT"}
return await self._post(url, data)
async def create_menu(self, menu_data: Dict[str, Any], access_token: str = None) -> Dict[str, Any]:
"""创建自定义菜单"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/menu/create?access_token={access_token}"
return await self._post(url, menu_data)
async def get_menu(self, access_token: str = None) -> Dict[str, Any]:
"""获取自定义菜单"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/menu/get?access_token={access_token}"
return await self._get(url)
def get_oauth_url(self, redirect_uri: str, scope: str, state: str = "") -> str:
"""生成网页授权URL"""
url = (f"https://open.weixin.qq.com/connect/oauth2/authorize?"
f"appid={self.appid}&redirect_uri={redirect_uri}&response_type=code&"
f"scope={scope}#wechat_redirect")
if state:
url += f"&state={state}"
return url
async def get_user_token(self, code: str) -> Dict[str, Any]:
"""获取用户访问令牌"""
url = (f"https://api.weixin.qq.com/sns/oauth2/access_token?"
f"appid={self.appid}&secret={self.appsecret}&code={code}&"
f"grant_type=authorization_code")
result = await self._get(url)
if "access_token" not in result:
return {"code": 4001, "message": "获取用户授权失败"}
return result
async def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""刷新访问令牌"""
url = (f"https://api.weixin.qq.com/sns/oauth2/refresh_token?"
f"appid={self.appid}&grant_type=refresh_token&refresh_token={refresh_token}")
return await self._get(url)
async def check_user_token(self, access_token: str, openid: str) -> bool:
"""检查用户访问令牌是否有效"""
url = f"https://api.weixin.qq.com/sns/auth?access_token={access_token}&openid={openid}"
result = await self._get(url)
return result.get("errcode", -1) == 0
async def get_user_info(self, access_token: str, openid: str,
refresh_token: str = "") -> Dict[str, Any]:
"""获取用户信息"""
if await self.check_user_token(access_token, openid):
url = (f"https://api.weixin.qq.com/sns/userinfo?"
f"access_token={access_token}&openid={openid}&lang=zh_CN")
return await self._get(url)
elif refresh_token:
result = await self.refresh_token(refresh_token)
new_token = result.get("access_token")
if new_token:
return await self.get_user_info(new_token, openid)
return {"code": 4002, "message": "获取用户详细信息失败请重新获取code"}
async def save_unlimited_qrcode(self, uid: Union[str, int],
path: str, filename: str,
access_token: str = None) -> bool:
"""生成并保存小程序码到文件"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={access_token}"
data = {"scene": f"uid={uid}"}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, verify=False)
if response.status_code == 200:
full_path = f"{path}{filename}"
with open(full_path, 'wb') as f:
f.write(response.content)
return True
return False