import json import httpx from typing import Optional, Dict, Any, Union from mooc.core.config import settings class WeChatAPI: def __init__(self, appid: str = None, appsecret: str = None): self.appid = appid or settings.WECHAT_APPID self.appsecret = appsecret or settings.WECHAT_APPSECRET self._access_token = "" async def _get(self, url: str) -> Dict[str, Any]: async with httpx.AsyncClient() as client: response = await client.get(url, verify=False) return response.json() async def _post(self, url: str, data: Dict[str, Any]) -> Dict[str, Any]: async with httpx.AsyncClient() as client: response = await client.post(url, json=data, verify=False) return response.json() async def get_access_token(self) -> str: """获取access token""" url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.appsecret}" result = await self._get(url) if "access_token" in result: self._access_token = result["access_token"] return self._access_token raise Exception(f"Failed to get access token: {result}") async def code2session(self, code: str) -> Dict[str, Any]: """小程序登录凭证校验""" url = f"https://api.weixin.qq.com/sns/jscode2session?appid={self.appid}&secret={self.appsecret}&js_code={code}&grant_type=authorization_code" result = await self._get(url) if "errcode" in result and result["errcode"] != 0: raise Exception(f"Code2Session failed: {result['errmsg']}") return result async def get_unlimited_qrcode(self, scene: str, access_token: str = None) -> bytes: """获取小程序码""" if not access_token: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={access_token}" data = {"scene": scene} async with httpx.AsyncClient() as client: response = await client.post(url, json=data, verify=False) return response.content async def send_template_message(self, data: Dict[str, Any], access_token: str = None) -> Dict[str, Any]: """发送订阅消息""" if not access_token: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token={access_token}" return await self._post(url, data) async def get_callback_ip(self, access_token: str = None) -> Dict[str, Any]: """获取微信服务器IP地址""" if not access_token: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/cgi-bin/getcallbackip?access_token={access_token}" return await self._get(url) async def check_callback(self, access_token: str = None, action: str = "all") -> Dict[str, Any]: """检查回调配置""" if not access_token: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/cgi-bin/callback/check?access_token={access_token}" data = {"action": action, "check_operator": "DEFAULT"} return await self._post(url, data) async def create_menu(self, menu_data: Dict[str, Any], access_token: str = None) -> Dict[str, Any]: """创建自定义菜单""" if not access_token: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/cgi-bin/menu/create?access_token={access_token}" return await self._post(url, menu_data) async def get_menu(self, access_token: str = None) -> Dict[str, Any]: """获取自定义菜单""" if not access_token: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/cgi-bin/menu/get?access_token={access_token}" return await self._get(url) def get_oauth_url(self, redirect_uri: str, scope: str, state: str = "") -> str: """生成网页授权URL""" url = (f"https://open.weixin.qq.com/connect/oauth2/authorize?" f"appid={self.appid}&redirect_uri={redirect_uri}&response_type=code&" f"scope={scope}#wechat_redirect") if state: url += f"&state={state}" return url async def get_user_token(self, code: str) -> Dict[str, Any]: """获取用户访问令牌""" url = (f"https://api.weixin.qq.com/sns/oauth2/access_token?" f"appid={self.appid}&secret={self.appsecret}&code={code}&" f"grant_type=authorization_code") result = await self._get(url) if "access_token" not in result: return {"code": 4001, "message": "获取用户授权失败"} return result async def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """刷新访问令牌""" url = (f"https://api.weixin.qq.com/sns/oauth2/refresh_token?" f"appid={self.appid}&grant_type=refresh_token&refresh_token={refresh_token}") return await self._get(url) async def check_user_token(self, access_token: str, openid: str) -> bool: """检查用户访问令牌是否有效""" url = f"https://api.weixin.qq.com/sns/auth?access_token={access_token}&openid={openid}" result = await self._get(url) return result.get("errcode", -1) == 0 async def get_user_info(self, access_token: str, openid: str, refresh_token: str = "") -> Dict[str, Any]: """获取用户信息""" if await self.check_user_token(access_token, openid): url = (f"https://api.weixin.qq.com/sns/userinfo?" f"access_token={access_token}&openid={openid}&lang=zh_CN") return await self._get(url) elif refresh_token: result = await self.refresh_token(refresh_token) new_token = result.get("access_token") if new_token: return await self.get_user_info(new_token, openid) return {"code": 4002, "message": "获取用户详细信息失败,请重新获取code"} async def save_unlimited_qrcode(self, uid: Union[str, int], path: str, filename: str, access_token: str = None) -> bool: """生成并保存小程序码到文件""" if not access_token: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={access_token}" data = {"scene": f"uid={uid}"} async with httpx.AsyncClient() as client: response = await client.post(url, json=data, verify=False) if response.status_code == 200: full_path = f"{path}{filename}" with open(full_path, 'wb') as f: f.write(response.content) return True return False