From 97a61189ea549652fd3b016fc563944a314cf459 Mon Sep 17 00:00:00 2001 From: Basyc <12016870+basyc@user.noreply.gitee.com> Date: Fri, 7 Mar 2025 21:13:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(wxapp):=20=E5=AE=9E=E7=8E=B0=E5=88=86?= =?UTF-8?q?=E4=BA=AB=E5=8A=9F=E8=83=BD=E5=B9=B6=E4=BC=98=E5=8C=96=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E4=BF=A1=E6=81=AF=E7=9B=B8=E5=85=B3=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增分享功能处理逻辑,包括获取设置、记录分享次数和更新用户积分 - 优化用户信息获取和更新操作,提高代码复用性和可维护性 - 添加错题收藏功能,支持用户对错题进行收藏和取消收藏 - 重构部分代码,提高可读性和性能 --- mooc/api/v1/endpoints/wxapp.py | 457 +++++++++++++++++--------- mooc/crud/crud_goouc_fullexam.py | 61 ++++ mooc/crud/crud_goouc_fullexam_user.py | 88 ++++- 3 files changed, 445 insertions(+), 161 deletions(-) diff --git a/mooc/api/v1/endpoints/wxapp.py b/mooc/api/v1/endpoints/wxapp.py index b17db0f..40aeba6 100644 --- a/mooc/api/v1/endpoints/wxapp.py +++ b/mooc/api/v1/endpoints/wxapp.py @@ -1,10 +1,12 @@ -from fastapi import APIRouter, Depends, Request, Form, Body, UploadFile,File +from fastapi import APIRouter, Depends, Request, Form, Body, UploadFile, File from fastapi.responses import PlainTextResponse from typing import Optional, Dict, Any +from mooc.models.goouc_fullexam import ShareRecord from pydantic import BaseModel from sqlalchemy.orm import Session from mooc.db.database import get_db from mooc.crud.crud_goouc_fullexam_user import ( + CRUDFullExamUser, CRUDUserDoexam, CRUDUserExamAnswer, CRUDUserWrongPraction, @@ -20,32 +22,33 @@ from mooc.models.goouc_fullexam_user import ( ) from mooc.core.config import settings from mooc.utils.wechat_client import wx_api -from datetime import datetime +from datetime import datetime, time import os from mooc.core.logger import get_logger import httpx from mooc.core.response import ResponseFactory import re -from mooc.crud.crud_goouc_fullexam import setting, xueshen -from mooc.crud.crud_goouc_fullexam_user import full_exam_user, user_member +from mooc.crud.crud_goouc_fullexam import CRUDSetting, CRUDShareRecord, setting, xueshen +from mooc.crud.crud_goouc_fullexam_user import full_exam_user, user_member, user_collection_praction wxapp_router = APIRouter() # 创建模块级别的日志记录器 logger = get_logger(__name__) + class WxappRequest(BaseModel): uid: Optional[str] = None op: Optional[str] = None m: Optional[str] = None data: Dict[str, Any] = {} code: Optional[str] = None - + # 启用额外字段支持 model_config = { "extra": "allow" # 允许存储模型中未定义的字段 } - + # 添加一个方法便于获取任意字段,带默认值 def get_field(self, field_name: str, default=None): """获取字段值,优先从直接属性获取,然后从data字典获取""" @@ -53,17 +56,18 @@ class WxappRequest(BaseModel): return getattr(self, field_name) return self.data.get(field_name, default) + @wxapp_router.post("/index") async def handle_wxapp_request( - request: Request, - i: str, - t: Optional[str] = None, - v: Optional[str] = None, - c: Optional[str] = "entry", - a: Optional[str] = "wxapp", - do: Optional[str] = None, - db: Session = Depends(get_db), - file: UploadFile = None + request: Request, + i: str, + t: Optional[str] = None, + v: Optional[str] = None, + c: Optional[str] = "entry", + a: Optional[str] = "wxapp", + do: Optional[str] = None, + db: Session = Depends(get_db), + file: UploadFile = None ): # 获取表单数据 try: @@ -71,7 +75,7 @@ async def handle_wxapp_request( logger.debug(f"Form data: {form_data}") # 将表单数据转换为字典 data = dict(form_data) - + # 检查是否包含上传文件 if "upfile" in form_data: file = form_data["upfile"] @@ -92,8 +96,8 @@ async def handle_wxapp_request( if do == "login2": # 针对login2,直接传递表单数据 if "code" in data: - return await handle_login2(WxappRequest(code=data.get("code"), - data=data), db) + return await handle_login2(WxappRequest(code=data.get("code"), + data=data), db) else: # 如果没有code,仍然尝试正常处理 return await handle_login2(WxappRequest(**data), db) @@ -121,23 +125,31 @@ async def handle_wxapp_request( elif do == "UpdateHeadimg": return await handle_update_headimg(WxappRequest(**data), db) + # part 4编写 + + elif do == "Share": + return await handle_share(WxappRequest(**data), int(i), db) + elif do == "AddWrong": + return await handle_add_wrong(WxappRequest(**data), db) + elif do == "Feedback": + return await handle_feedback(WxappRequest(**data), db) - return {"code": 404, "message": "接口未找到"} + async def handle_update_headimg(data: WxappRequest, db: Session): """处理更新用户头像的请求""" logger.info(f"处理更新头像请求: uid={data.uid},data={data}") - + try: # 验证必要参数 uid = data.uid # headimg = data.data.get("headimg") - + # if not uid or not headimg: # logger.error("参数不足: uid或headimg缺失") # return ResponseFactory.error(code=1, message="传递的参数不存在", data="1001") - + # 获取用户对象,包含weid条件 user = full_exam_user.get_by_id_and_weid(db, int(uid), settings.WECHAT_UNIACID) if not user: @@ -154,11 +166,12 @@ async def handle_update_headimg(data: WxappRequest, db: Session): except Exception as e: logger.error(f"头像更新失败: {str(e)}") return ResponseFactory.error(code=1, message="error", data="error") - + except Exception as e: logger.exception(f"处理更新头像请求过程中发生异常: {str(e)}") return ResponseFactory.error(code=1, message=str(e), data="error") + async def handle_upload_image(data: WxappRequest, db: Session, file: UploadFile = File(...)): """处理图片上传功能""" import os @@ -167,113 +180,116 @@ async def handle_upload_image(data: WxappRequest, db: Session, file: UploadFile import string import shutil from pathlib import Path - + logger.info(f"处理图片上传请求: uid={data.uid}, module={data.m}") - + try: # 定义允许的图片类型 allowed_types = [ - "image/jpg", "image/jpeg", "image/png", + "image/jpg", "image/jpeg", "image/png", "image/pjpeg", "image/gif", "image/bmp", "image/x-png" ] - + # 最大文件大小(2MB) max_file_size = 2000000 - + # 检查文件是否存在 if not file: logger.error("图片不存在!") return ResponseFactory.error(code=400, message="图片不存在!") - + # 检查文件内容类型 content_type = file.content_type if content_type not in allowed_types: logger.error(f"文件类型不符: {content_type}") return ResponseFactory.error(code=400, message=f"文件类型不符: {content_type}") - + # 读取文件内容以检查大小 contents = await file.read() if len(contents) > max_file_size: logger.error(f"文件太大: {len(contents)} bytes") return ResponseFactory.error(code=400, message="文件太大!") - + # 重置文件位置指针 await file.seek(0) - + # 获取上传目标文件夹路径 module_name = data.m or "default" - + # 使用绝对路径并确保目录存在 destination_folder = os.path.join(settings.UPLOAD_IMG_DIR, module_name) logger.debug(f"上传目标文件夹: {destination_folder}") - + # 确保目标目录存在 Path(destination_folder).mkdir(parents=True, exist_ok=True) - + # 创建更安全的随机文件名 (时间戳 + 随机数) current_time = int(time.time()) random_string = ''.join(random.choices(string.digits + string.ascii_lowercase, k=8)) file_extension = os.path.splitext(file.filename)[1].lower() # 转为小写以增加一致性 - + # 验证扩展名安全性 safe_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] if file_extension not in safe_extensions: file_extension = '.jpg' # 默认使用安全扩展名 - + filename = f"{current_time}_{random_string}{file_extension}" - + # 完整的保存路径 file_path = os.path.join(destination_folder, filename) logger.debug(f"文件将保存至: {file_path}") - + # 保存文件 with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) - + # 检查文件是否成功保存 if not os.path.exists(file_path): logger.error(f"文件保存失败: {file_path}") return ResponseFactory.error(code=500, message="文件保存失败") - + # 构建文件URL路径 - url_module_path = module_name.replace('\\', '/') - + url_module_path = module_name.replace('\\', '/') + # 使用相对路径作为存储路径 relative_path = f"{url_module_path}/{filename}" - + # 构建完整的外网访问URL # 方法1: 使用BASE_URL配置构建绝对URL absolute_url = f"{settings.BASE_URL}{settings.ATTACH_URL}{relative_path}" - + logger.info(f"文件上传成功: {absolute_url}") - + # 直接返回完整URL字符串 return PlainTextResponse(content=absolute_url) - + except Exception as e: logger.exception(f"文件上传过程中发生异常: {str(e)}") # 直接返回错误消息字符串 return str(e) + + def validate_phone(phone: str) -> bool: # 基础格式验证 if not re.match(r"^1(3\d|4[5-9]|5[0-35-9]|6[2567]|7[0-8]|8\d|9[0-35-9])\d{8}$", phone): return False - + # 特殊号段白名单 special_prefix = ["191", "192", "197"] # 根据实际情况更新 if any(phone.startswith(p) for p in special_prefix): return True - + # 虚拟运营商号段二次验证 if phone.startswith(("170", "171", "172")): return validate_virtual_operator(phone) # 调用专用验证接口 - + return True + async def handle_set_user_info(data: WxappRequest, db: Session): """处理用户信息相关操作""" logger.info(f"处理用户信息请求: op={data.op}, uid={data.uid},data={data}") - + operations = { "getinfo": get_user_info, "checkStudent": check_student, @@ -281,7 +297,7 @@ async def handle_set_user_info(data: WxappRequest, db: Session): "getcode": get_verification_code, "checkcode": check_verification_code, } - + # 如果op为空,则默认执行更新用户信息操作 if not data.op: logger.info(f"未指定操作类型,默认执行更新用户信息: uid={data.uid}") @@ -292,12 +308,12 @@ async def handle_set_user_info(data: WxappRequest, db: Session): update_data["name"] = data.name if hasattr(data, "phone") and data.phone != "null": update_data["phone"] = data.phone - + # 添加请求中的其他字段 for field in ["student_id", "id_card", "school", "level"]: if hasattr(data, field): update_data["field"] = getattr(data, field) - + logger.debug(f"更新用户数据: {update_data}") result = await update_user_info(data.uid, update_data, db) return ResponseFactory.success( @@ -310,7 +326,7 @@ async def handle_set_user_info(data: WxappRequest, db: Session): code=1, message=str(e) ) - + # 处理指定操作类型 operation = operations.get(data.op) logger.debug(f"operation: {operation}") @@ -320,22 +336,22 @@ async def handle_set_user_info(data: WxappRequest, db: Session): code=1, message=f"不支持的操作: {data.op}" ) - + try: # 处理特殊情况:getcode操作需要直接获取phone属性 if data.op == "getcode": # 确保将phone从对象属性传递到data字典 data_dict = dict(data.data) if data.data else {} # 复制现有数据 - + # 从对象中获取phone属性 if hasattr(data, "phone"): data_dict["phone"] = data.phone - + result = await operation(data.uid, data_dict, db) else: # 其他操作保持原样 result = await operation(data.uid, data.data, db) - + logger.debug(f"result: {result}") return ResponseFactory.success( data=result, @@ -348,23 +364,24 @@ async def handle_set_user_info(data: WxappRequest, db: Session): message=str(e) ) + # 获取用户信息 async def get_user_info(uid: str, data: Dict[str, Any], db: Session) -> Dict[str, Any]: """获取用户详细信息""" logger.debug(f"获取用户信息: uid={uid},data={data}") if not uid: raise ValueError("用户ID不存在") - + # 从请求获取协议和域名 protocol = "https://" # 在实际部署中可能需要根据请求头判断 domain_name = "yourdomain.com" # 这里需要从请求中获取 http = f"{protocol}{domain_name}" - + # 获取用户基本信息 - 已经使用CRUD方法 user = full_exam_user.get_by_fields(db, {"id": int(uid), "weid": settings.WECHAT_UNIACID, "istatus": 1}) if not user or user.istatus != 1: raise ValueError("用户不存在或已被删除") - + # 构建用户信息字典 info = { "id": user.id, @@ -384,49 +401,49 @@ async def get_user_info(uid: str, data: Dict[str, Any], db: Session) -> Dict[str "openid": user.openid } logger.debug(f"用户信息: {info}") - + # 检查会员是否过期 current_time = int(datetime.now().timestamp()) if user.ismember == 1 and user.member_endtime and int(user.member_endtime) < current_time: # 更新会员状态为非会员 - 使用CRUD方法 full_exam_user.update(db, db_obj=user, obj_in={"ismember": 2}) info["ismember"] = 2 - + # 格式化会员到期时间 if user.member_endtime: info["member_endtime"] = datetime.fromtimestamp(int(user.member_endtime)).strftime("%Y-%m-%d") - + # 获取系统设置 - 使用CRUD替代原始SQL system_setting = setting.get_by_fields(db, {"weid": user.weid}) - + # 默认用户信息状态为完整 is_ok = 1 - + # 检查用户信息是否完整 if system_setting and system_setting.info_status == 1: if not user.phone or not user.name: is_ok = 2 - + # 添加系统设置信息 if system_setting: info["IOS"] = system_setting.IOS info["customer_service"] = system_setting.customer_service - + # 检查会员系统是否开启 - 使用CRUD替代原始SQL member_setting = user_member.get_by_fields(db, { - "weid": user.weid, + "weid": user.weid, "istatus": 1 }) info["is_open"] = member_setting.status if member_setting else 0 - + # 用户信息完整性状态 info["is_ok"] = is_ok - + # 默认学生信息 info["stu_have"] = 2 info["student_id"] = 0 info["xuesheng_id"] = 0 - + # 如果用户有姓名和手机号,检查是否是学生 - 使用CRUD替代原始SQL if user.phone and user.name: student = xueshen.get_by_fields(db, { @@ -434,42 +451,40 @@ async def get_user_info(uid: str, data: Dict[str, Any], db: Session) -> Dict[str "name": user.name, "phone": user.phone }) - + if student: info["student_id"] = student.student_name info["xuesheng_id"] = student.xuesheng_id info["stu_have"] = 1 - + # 检查是否绑定微信 info["is_bind"] = 1 if user.openid else 0 - + return {"info": info, "http": http} + # 检查学生信息 async def check_student(uid: str, data: Dict[str, Any], db: Session) -> Dict[str, Any]: """检查学生信息是否存在""" phone = data.get("phone") name = data.get("name") - + if not phone or not name: raise ValueError("传递的参数不存在") - - user = full_exam_user.get(db, int(uid)) if not user: raise ValueError("用户不存在") - - + student = xueshen.get_by_fields(db, { "weid": user.weid, "name": name, "phone": phone }) - + if not student: raise ValueError("学生信息不存在") - + # 将结果转换为字典 student_info = { "xuesheng_id": student.xuesheng_id, @@ -478,47 +493,47 @@ async def check_student(uid: str, data: Dict[str, Any], db: Session) -> Dict[str "student_name": student.student_name, "sex": student.sex } - + return {"list": student_info} + # 获取验证码 async def get_verification_code(uid: str, data: Dict[str, Any], db: Session) -> Dict[str, Any]: """发送手机验证码""" phone = data.get("phone") - + # 验证手机号 if not validate_phone(phone): raise ValueError("手机号有误") - - + user = full_exam_user.get(db, int(uid)) if not user: raise ValueError("用户不存在") - + # 使用CRUD替代SQL查询系统配置 system_config = setting.get_by_fields(db, {"weid": user.weid}) - + # 生成4位随机验证码 import random code = ''.join(random.choices('0123456789', k=4)) - + # 发送验证码 - 这里需要替换为实际的短信发送逻辑 try: # 假设这是成功发送 send_result = {"Code": "OK"} - + if send_result["Code"] == "OK": # 保存验证码记录 - + from mooc.schemas.goouc_fullexam import PhoneCodeCreate - + phone_code_data = PhoneCodeCreate( phone=phone, code=int(code), # 确保代码和模型匹配 createtime=int(datetime.now().timestamp()), weid=user.weid ) - + phone_code.create(db, obj_in=phone_code_data) return {"status": 1} @@ -528,15 +543,16 @@ async def get_verification_code(uid: str, data: Dict[str, Any], db: Session) -> logger.error(f"发送验证码失败: {str(e)}") raise ValueError(f"发送验证码失败: {str(e)}") + # 检查验证码 async def check_verification_code(uid: str, data: Dict[str, Any], db: Session) -> Dict[str, Any]: """验证手机验证码""" input_code = data.get("code", "").strip() phone = data.get("phone") - + if not input_code or not phone: raise ValueError("传递的参数不存在") - + # 获取最新的验证码记录 from sqlalchemy import text code_query = text(""" @@ -544,45 +560,45 @@ async def check_verification_code(uid: str, data: Dict[str, Any], db: Session) - WHERE weid = :weid AND phone = :phone ORDER BY createtime DESC LIMIT 1 """) - + user = full_exam_user.get(db, int(uid)) if not user: raise ValueError("用户不存在") - + code_record = db.execute(code_query, { - "weid": user.weid, + "weid": user.weid, "phone": phone }).fetchone() - + if not code_record: raise ValueError("验证码不存在") - + # 检查验证码是否过期(5分钟有效期) current_time = int(datetime.now().timestamp()) if code_record.createtime + 300 < current_time: raise ValueError("验证码已过期") - + # 验证码是否正确 if str(code_record.code) != input_code: raise ValueError("验证码错误") - + # 使用CRUD检查手机号是否已绑定其他账号 other_users = full_exam_user.get_multi_by_fields(db, { "weid": user.weid, "phone": phone }) - + # 过滤掉当前用户 other_users = [u for u in other_users if u.id != int(uid)] - + if other_users: raise ValueError("该手机号已绑定其他账号!") - + # 更新用户手机号 try: # 使用CRUD更新用户手机号 full_exam_user.update(db, db_obj=user, obj_in={"phone": phone}) - + # 如果用户有姓名,尝试更新学生信息 if user.name: # 查找学生记录 @@ -590,49 +606,50 @@ async def check_verification_code(uid: str, data: Dict[str, Any], db: Session) - "weid": user.weid, "name": user.name }) - + # 如果找到学生记录,更新其手机号 if student: xueshen.update(db, db_obj=student, obj_in={"phone": phone}) - + return {"status": "success"} except Exception as e: logger.error(f"更新手机号失败: {str(e)}") db.rollback() raise ValueError(f"更新手机号失败: {str(e)}") + # 更新用户信息 async def update_user_info(uid: str, data: Dict[str, Any], db: Session) -> Dict[str, Any]: """更新用户基本信息""" if not uid: raise ValueError("传递的参数不存在") - + # 获取用户 user = full_exam_user.get_by_id_and_weid(db, int(uid), settings.WECHAT_UNIACID) if not user or user.istatus != 1: raise ValueError("非法用户ID") - + # 需要更新的字段 update_data = {} - + # 姓名字段 if "name" in data: update_data["name"] = data.get("name", "").strip() - + # 处理其他可能需要更新的字段 fields = ["student_id", "id_card", "school", "level", "phone"] for field in fields: if field in data and data.get(field) != "null": update_data[field] = data.get(field) - + # 如果没有要更新的数据 if not update_data: return {"status": 0, "message": "没有提供要更新的数据"} - + try: # 更新用户信息 full_exam_user.update(db, db_obj=user, obj_in=update_data) - + # 如果更新了姓名,并且用户有手机号,则尝试更新学生信息 if "name" in update_data and user.phone: from sqlalchemy import text @@ -641,24 +658,25 @@ async def update_user_info(uid: str, data: Dict[str, Any], db: Session) -> Dict[ SET name = :name WHERE weid = :weid AND phone = :phone """) - + db.execute(update_student_query, { - "name": update_data["name"], - "weid": user.weid, + "name": update_data["name"], + "weid": user.weid, "phone": user.phone }) - + return {"status": 1, "message": "信息保存成功"} except Exception as e: logger.error(f"更新用户信息失败: {str(e)}") db.rollback() raise ValueError(f"信息保存失败: {str(e)}") + async def handle_exam_operation( - data: WxappRequest, - db: Session, - user_doexam: CRUDUserDoexam, - user_exam_answer: CRUDUserExamAnswer + data: WxappRequest, + db: Session, + user_doexam: CRUDUserDoexam, + user_exam_answer: CRUDUserExamAnswer ): """处理考试相关操作""" operations = { @@ -666,14 +684,14 @@ async def handle_exam_operation( "get_history": get_exam_history, "get_detail": get_exam_detail, } - + operation = operations.get(data.op) if not operation: return { "code": 1, "message": f"Unsupported operation: {data.op}" } - + try: result = await operation(data.uid, data.data, db, user_doexam, user_exam_answer) return { @@ -687,10 +705,11 @@ async def handle_exam_operation( "message": str(e) } + async def handle_collection( - data: WxappRequest, - db: Session, - user_collection: CRUDUserCollectionPraction + data: WxappRequest, + db: Session, + user_collection: CRUDUserCollectionPraction ): """处理收藏相关操作""" operations = { @@ -698,14 +717,14 @@ async def handle_collection( "remove": remove_collection, "list": list_collections, } - + operation = operations.get(data.op) if not operation: return { "code": 1, "message": f"Unsupported operation: {data.op}" } - + try: result = await operation(data.uid, data.data, db, user_collection) return { @@ -719,10 +738,11 @@ async def handle_collection( "message": str(e) } + async def handle_wrong_question( - data: WxappRequest, - db: Session, - user_wrong_praction: CRUDUserWrongPraction + data: WxappRequest, + db: Session, + user_wrong_praction: CRUDUserWrongPraction ): """处理错题相关操作""" operations = { @@ -730,14 +750,14 @@ async def handle_wrong_question( "remove": remove_wrong_question, "list": list_wrong_questions, } - + operation = operations.get(data.op) if not operation: return { "code": 1, "message": f"Unsupported operation: {data.op}" } - + try: result = await operation(data.uid, data.data, db, user_wrong_praction) return { @@ -759,13 +779,13 @@ async def handle_login2(data: WxappRequest, db: Session): # 修改这里,查找code的位置 # 首先检查data.data中是否有code code = data.data.get("code") if isinstance(data.data, dict) else None - + # 如果在data.data中没有找到code,尝试直接从data对象获取 if not code and hasattr(data, "code"): code = data.code - + logger.debug(f"登录请求数据: {data}") - + if not code: logger.warning(f"登录失败: code为空 - UID: {data.uid}") return { @@ -809,11 +829,11 @@ async def handle_login2(data: WxappRequest, db: Session): # 如果无法转换为整数,使用默认值 logger.warning(f"WECHAT_UNIACID '{settings.WECHAT_UNIACID}' 无法转换为整数,使用默认值1") weid = 1 - + # 查询用户 - 注意这里是用openid和weid查询 users = full_exam_user.get_by_openid_and_weid( - db, - wx_info["openid"], + db, + wx_info["openid"], weid # 使用整数weid ) @@ -840,10 +860,10 @@ async def handle_login2(data: WxappRequest, db: Session): logger.debug(f"创建新用户: {user_data}") # 使用UserMemberCreate schema需要导入 from mooc.schemas.goouc_fullexam_user import FullExamUserCreate - + # 转换为schema对象 user_create_schema = FullExamUserCreate(**user_data) - + # 使用CRUD实例创建用户 new_user = full_exam_user.create(db, obj_in=user_create_schema) uid = new_user.id @@ -859,29 +879,29 @@ async def handle_login2(data: WxappRequest, db: Session): try: logger.debug(f"开始生成二维码") access_token = await wx_api.get_access_token() - + # 创建二维码目录 qrcode_path = "../addons/goouc_fullexam/Qrcode/" if not os.path.exists(qrcode_path): os.makedirs(qrcode_path, mode=0o777, exist_ok=True) filename = f"qrcode_{uid}.jpg" - + # 生成二维码并保存 res = await wx_api.save_unlimited_qrcode( - uid, - qrcode_path, - filename, + uid, + qrcode_path, + filename, access_token ) - + if res: # 更新用户二维码路径 - 使用CRUD实例 from mooc.schemas.goouc_fullexam_user import FullExamUserUpdate - + update_data = {"qrcode": os.path.join(qrcode_path, filename)} update_schema = FullExamUserUpdate(**update_data) - + # 使用CRUD实例更新用户 full_exam_user.update(db, db_obj=new_user, obj_in=update_schema) @@ -907,10 +927,10 @@ async def handle_login2(data: WxappRequest, db: Session): try: logger.debug(f"开始更新用户信息: {user_data}") from mooc.schemas.goouc_fullexam_user import FullExamUserUpdate - + update_schema = FullExamUserUpdate(**user_data) full_exam_user.update(db, db_obj=users, obj_in=update_schema) - + uid = users.id if users.phone: info_status = 1 @@ -923,7 +943,7 @@ async def handle_login2(data: WxappRequest, db: Session): } logger.info(f"登录成功,用户ID: {uid}, 状态: {info_status}") - + return ResponseFactory.success( data={"uid": uid, "info_status": info_status, "info": {"uid": uid, "info_status": info_status}}, message="登录成功" @@ -935,4 +955,129 @@ async def handle_login2(data: WxappRequest, db: Session): code=1, message=f"登录失败: {str(e)}", data=None - ) \ No newline at end of file + ) + + +async def handle_share( + data: WxappRequest, + uniacid, + db: Session, + setting_mapper: CRUDSetting, + sharing_mapper: CRUDShareRecord, + user_mapping: CRUDFullExamUser +): + uid = int(data.uid) + if not uid: + return {"code": 1, + "message": "传递的参数不存在", + "data": "1001"} + user_info = user_mapping.get_user_info(db, uniacid, uid) + if not user_info: + return {"code": 1, + "message": "用户不存在", + "data": "1001"} + if user_info["status"] != 1: + return {"code": 1, + "message": "用户被禁用", + "data": "1002"} + + setting = setting_mapper.get_setting_1(db, uniacid) + day_num = sharing_mapper.get_share_record_1(db, uniacid, uid) + + if setting["shareupper"] == 0: + data = {} + if setting["share_integral"] != 0: + if day_num: + update = { + "num": day_num["num"] + setting["share_integral"], + "day": datetime.now().strftime("%Y-%m-%d"), + "createtime": int(time.time()) + } + result = sharing_mapper.update_share_record(db, update, day_num["id"], day_num["uid"]) + if result > 0: + data["integral"] = user_info["integral"] + setting["share_integral"] + res = user_mapping.update_user_info(db, data, user_info["id"]) + else: + insert = { + "uid": uid, + "weid": uniacid, + "num": setting["share_integral"], + "istatus": 1, + "day": datetime.now().strftime("%Y-%m-%d"), + "createtime": int(time.time()) + } + result = sharing_mapper.create(db, obj_in=insert) + if result > 0: + data["integral"] = user_info["integral"] + setting["share_integral"] + res = user_mapping.update_user_info(db, data, user_info["id"]) + else: + if setting["share_integral"] != 0: + data = {} + if day_num: + if day_num["num"] < setting["shareupper"]: + can_num = setting["shareupper"] - day_num["num"] + if setting["share_integral"] <= can_num: + update = { + "num": day_num["num"] + setting["share_integral"], + "day": datetime.now().strftime("%Y-%m-%d"), + "createtime": int(time.time()) + } + result = sharing_mapper.update_share_record(db, update, day_num["id"], day_num["uid"]) + if result: + data["integral"] = user_info["integral"] + setting["share_integral"] + res = user_mapping.update_user_info(db, data, user_info["id"]) + else: + insert = { + "uid": uid, + "weid": uniacid, + "num": setting["share_integral"], + "istatus": 1, + "day": datetime.now().strftime("%Y-%m-%d"), + "createtime": int(time.time()) + } + result = sharing_mapper.create(db, obj_in=insert) + if result > 0: + data["integral"] = user_info["integral"] + setting["share_integral"] + res = user_mapping.update_user_info(db, data, user_info["id"]) + + if res: + return {"status": 0, "message": "分享成功", "data": {"code": 1, "num": setting["share_integral"]}} + else: + return {"status": 1, "message": "分享失败", "data": {"code": 2}} + + +async def handle_add_wrong( + data: WxappRequest, + uniacid, + db: Session): + uid = data.uid + tid = data.data["tid"] + test_type = data.data.get("test_type") + iscollect = data.data.get("wrong_have") + if not uid or not tid or not test_type or not iscollect: + return {"code": 1, "message": "传递的参数不存在或失效", "data": "1001"} + user_info = full_exam_user.get_user_info(db, uniacid, int(uid)) + if not user_info: + return {"code": 1, "message": "用户不存在", "data": "1002"} + if user_info["status"] != 1: + return {"code": 1, "message": "用户被禁用", "data": "1003"} + + have = user_collection_praction.get_user_collection_praction_1(db, uniacid, uid, tid) + mess = 0 + if not have: + insert = { + "weid": uniacid, + "uid": uid, + "testid": tid, + "test_type": test_type, + "istatus": 1, + "createtime": int(time.time()) + } + res = user_collection_praction.create(db, obj_in=insert) + mess = 1 + else: + res = user_collection_praction.delete_user_collection_praction(db, uniacid, uid, tid) + if res: + return {"code": 0, "message": "操作成功", "data": mess} + else: + return {"code": 1, "message": "操作失败", "data": "1005"} diff --git a/mooc/crud/crud_goouc_fullexam.py b/mooc/crud/crud_goouc_fullexam.py index 4e3548d..511c791 100644 --- a/mooc/crud/crud_goouc_fullexam.py +++ b/mooc/crud/crud_goouc_fullexam.py @@ -111,6 +111,7 @@ class CRUDPaperTest(CRUDBase[PaperTest, PaperTestCreate, PaperTestUpdate]): class CRUDPhonecode(CRUDBase[Phonecode, PhoneCodeCreate, PhoneCodeUpdate]): def get_phonecode(self, db: Session, phonecode_id: int): return self.get_by_field(db, "id", phonecode_id) + def get_latest_by_phone(self, db: Session, phone: str, weid: int): """获取指定手机号的最新验证码记录""" return db.query(self.model).filter( @@ -118,6 +119,7 @@ class CRUDPhonecode(CRUDBase[Phonecode, PhoneCodeCreate, PhoneCodeUpdate]): self.model.weid == weid ).order_by(self.model.createtime.desc()).first() + class CRUDQYear(CRUDBase[QYear, QYearCreate, QYearUpdate]): def get_qyear(self, db: Session, qyear_id: int): return self.get_by_field(db, "id", qyear_id) @@ -132,11 +134,70 @@ class CRUDSetting(CRUDBase[Setting, SettingCreate, SettingUpdate]): def get_setting(self, db: Session, setting_id: int): return self.get_by_field(db, "id", setting_id) + def get_setting_1(self, session: Session, uniacid: int): + # 执行查询并获取第一条结果 + result = session.query( + self.model.share_integral, + self.model.share_title, + self.model.shareupper + ).filter( + self.model.weid == uniacid + ).first() + + # 将结果转换为字典格式 + if result: + return { + "share_integral": result[0], + "share_title": result[1], + "shareupper": result[2] + } + return {} + class CRUDShareRecord(CRUDBase[ShareRecord, ShareRecordCreate, ShareRecordUpdate]): def get_share_record(self, db: Session, share_record_id: int): return self.get_by_field(db, "id", share_record_id) + def get_share_record_1(self, session: Session, uniacid: int, uid: int): + # 生成当天日期字符串 + current_day = datetime.now().strftime("%Y-%m-%d") + + # 构建查询并获取结果 + result = session.query( + self.model.id, + self.model.uid, + self.model.num + ).filter( + self.model.weid == uniacid, + self.model.uid == uid, + self.model.day == current_day + ).first() + + # 转换结果类型 + # return result._asdict() if result else {} # 适用于 SQLAlchemy 2.x 的 Row 对象 + # 或者手动构建字典(兼容更多版本) + return dict(zip(['id', 'uid', 'num'], result)) if result else {} + + def update_share_record( + self, + session: Session, + update_data: dict, + record_id: int, + user_id: int + ) -> int: + try: + # 执行更新操作 + affected_rows = session.query(self.model).filter( + self.model.id == record_id, + self.model.uid == user_id + ).update(update_data) + session.commit() + return affected_rows + except Exception as e: + session.rollback() + print(f"更新失败: {str(e)}") + return 0 + class CRUDsonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]): def get_son_simple(self, db: Session, son_simple_id: int): diff --git a/mooc/crud/crud_goouc_fullexam_user.py b/mooc/crud/crud_goouc_fullexam_user.py index e93b4e0..3148591 100644 --- a/mooc/crud/crud_goouc_fullexam_user.py +++ b/mooc/crud/crud_goouc_fullexam_user.py @@ -84,7 +84,7 @@ class CRUDUserSpequence(CRUDBase[UserSpequence, UserSpequenceCreate, UserSpequen class CRUDUserMember(CRUDBase[UserMember, UserMemberCreate, UserMemberUpdate]): def get_user_member(self, db: Session, user_member_id: int): return self.get_by_field(db, "id", user_member_id) - + def get_by_openid_and_weid(self, db: Session, openid: str, weid: int, istatus: int = 1): """通过openid和weid获取用户""" return db.query(self.model).filter( @@ -98,6 +98,47 @@ class CRUDUserCollectionPraction( CRUDBase[UserCollectionPraction, UserCollectionPractionCreate, UserCollectionPractionUpdate]): def get_user_collection_praction(self, db: Session, user_collection_praction_id: int): return self.get_by_field(db, "id", user_collection_praction_id) + def get_user_collection_praction_1( + self, + session: Session, + uniacid: int, + uid: int, + testid: int + ) -> dict: + result = session.query( + self.model + ).filter( + self.model.weid == uniacid, + self.model.istatus == 1, + self.model.uid == uid, + self.model.testid == testid + ).first() + + return result.__dict__ if result else {} + + def delete_user_collection_praction( + self, + session: Session, + uniacid: int, + uid: int, + testid: int + ) -> int: + try: + # 执行删除操作 + affected_rows = session.query( + self.model + ).filter( + self.model.weid == uniacid, + self.model.uid == uid, + self.model.testid == testid + ).delete() + + session.commit() + return affected_rows + except Exception as e: + session.rollback() + print(f"删除失败: {str(e)}") + return 0 class CRUDUserGift(CRUDBase[UserGift, UserGiftCreate, UserGiftUpdate]): @@ -134,10 +175,48 @@ class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]): def get_user_pool(self, db: Session, user_pool_id: int): return self.get_by_field(db, "id", user_pool_id) + class CRUDFullExamUser(CRUDBase[FullExamUser, FullExamUserCreate, FullExamUserUpdate]): def get_full_exam_user(self, db: Session, full_exam_user_id: int): return self.get_by_field(db, "id", full_exam_user_id) - + + def get_user_info(self, session: Session, uniacid: int, uid: int): + # 构建查询并获取第一条结果 + result = session.query( + self.model.id, + self.model.integral, + self.model.status + ).filter( + self.model.weid == uniacid, + self.model.id == uid + ).first() + + # 转换为字典格式 + fields = ["id", "integral", "status"] + if result is not None: + return dict(zip(fields, result)) + return {} + + def update_user_info( + self, + session: Session, + update_data: dict, + user_id: int + ) -> int: + try: + # 执行更新操作 + affected_rows = session.query(self.model).filter( + self.model.id == user_id + ).update(update_data) + session.commit() + return affected_rows + except Exception as e: + session.rollback() + print(f"更新失败: {str(e)}") + return 0 + + + def get_by_openid_and_weid(self, db: Session, openid: str, weid: int, istatus: int = 1): """通过openid和weid获取用户""" return db.query(self.model).filter( @@ -145,7 +224,7 @@ class CRUDFullExamUser(CRUDBase[FullExamUser, FullExamUserCreate, FullExamUserUp self.model.weid == weid, self.model.istatus == istatus ).first() - + def get_by_id_and_weid(self, db: Session, id: int, weid: int, istatus: int = 1): """通过id和weid获取用户""" return db.query(self.model).filter( @@ -154,6 +233,7 @@ class CRUDFullExamUser(CRUDBase[FullExamUser, FullExamUserCreate, FullExamUserUp self.model.istatus == istatus ).first() + # 创建实例 user_doexam = CRUDUserDoexam(UserDoexam) user_doother_exam_answer = CRUDUserDoOtherExamAnswer(UserDoOtherExamAnswer) @@ -170,5 +250,3 @@ user_doother_exam = CRUDUserDoOtherExam(UserDootherExam) user_wrong_praction = CRUDUserWrongPraction(UserWrongPraction) user_pool = CRUDUserPool(UserPool) full_exam_user = CRUDFullExamUser(FullExamUser) - -