import asyncio import socket import logging from typing import Dict, List, Tuple logger = logging.getLogger(__name__) async def check_connection(host: str, port: int = 443) -> Tuple[bool, str]: """检查与指定主机的连接""" try: # 尝试创建套接字连接 future = asyncio.open_connection(host, port) reader, writer = await asyncio.wait_for(future, timeout=5) # 关闭连接 writer.close() await writer.wait_closed() return True, f"成功连接到 {host}:{port}" except asyncio.TimeoutError: return False, f"连接到 {host}:{port} 超时" except socket.gaierror: return False, f"无法解析主机名 {host}" except Exception as e: return False, f"连接到 {host}:{port} 失败: {str(e)}" async def diagnose_wechat_api() -> Dict[str, str]: """诊断微信API连接问题""" results = {} # 检查几个微信API域名 domains = [ ("api.weixin.qq.com", 443), ("mp.weixin.qq.com", 443) ] for domain, port in domains: success, message = await check_connection(domain, port) results[domain] = message return results # 使用示例 # async def run_diagnostic(): # results = await diagnose_wechat_api() # for domain, result in results.items(): # print(f"{domain}: {result}")