diff --git a/mooc/api/v1/endpoints/wechat.py b/mooc/api/v1/endpoints/wechat.py index 1169d89..ebc8914 100644 --- a/mooc/api/v1/endpoints/wechat.py +++ b/mooc/api/v1/endpoints/wechat.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, HTTPException from typing import Dict, Any from mooc.utils.wechat_client import WeChatAPI @@ -14,6 +14,7 @@ async def get_access_token() -> Dict[str, Any]: except Exception as e: raise HTTPException(status_code=400, detail=str(e)) + @router.post("/code2session") async def code_to_session(code: str) -> Dict[str, Any]: """小程序登录""" @@ -23,6 +24,7 @@ async def code_to_session(code: str) -> Dict[str, Any]: except Exception as e: raise HTTPException(status_code=400, detail=str(e)) + @router.post("/qrcode") async def generate_qrcode(scene: str) -> bytes: """生成小程序码""" @@ -32,6 +34,7 @@ async def generate_qrcode(scene: str) -> bytes: except Exception as e: raise HTTPException(status_code=400, detail=str(e)) + @router.post("/send_template") async def send_template_message(data: Dict[str, Any]) -> Dict[str, Any]: """发送订阅消息""" @@ -39,4 +42,4 @@ async def send_template_message(data: Dict[str, Any]) -> Dict[str, Any]: result = await wechat_client.send_template_message(data) return result except Exception as e: - raise HTTPException(status_code=400, detail=str(e)) \ No newline at end of file + raise HTTPException(status_code=400, detail=str(e)) diff --git a/mooc/api/v1/endpoints/wxapp.py b/mooc/api/v1/endpoints/wxapp.py index 96aab3b..bf51f5a 100644 --- a/mooc/api/v1/endpoints/wxapp.py +++ b/mooc/api/v1/endpoints/wxapp.py @@ -1,39 +1,87 @@ -from fastapi import APIRouter, Depends, Request, Form, Body -from typing import Optional, Dict, Any +import copy +import html +import math +import re +from datetime import datetime +from typing import Optional, Dict, Any, List + +import phpserialize +from fastapi import APIRouter, Depends, Request from pydantic import BaseModel from sqlalchemy.orm import Session -from mooc.db.database import get_db + +from mooc.crud.crud_goouc_fullexam import ( + CRUDSetting, + CRUDGift, + CRUDPaper, + CRUDPaperTest, + CRUDCdkeys, + CRUDCdkey, + CRUDXueshen, + CRUDTestType, + CRUDTest, + CRUDSonSimple, + CRUDExercise +) from mooc.crud.crud_goouc_fullexam_user import ( CRUDUserDoexam, CRUDUserExamAnswer, CRUDUserWrongPraction, - CRUDUserCollectionPraction + CRUDUserCollectionPraction, + CRUDUserGift, + CRUDUser, + CRUDUserDoOtherExam, + CRUDUserPool, + CRUDUserMember, + CRUDUserSequence, + CRUDUserSpecial, + CRUDUserQtype +) +from mooc.db.database import get_db +from mooc.models.goouc_fullexam import ( + Setting, + Gift, + Paper, + Cdkeys, Cdkey, + Test, + SonSimple, + Exercise, Xuesheng, TestType, PaperTest ) from mooc.models.goouc_fullexam_user import ( UserDoexam, UserExamAnswer, UserWrongPraction, - UserCollectionPraction + UserCollectionPraction, + User, + UserGift, + UserDootherExam, + UserPool, + UserSpecial, UserMember, UserSequence, UserQtype +) +from mooc.schemas.goouc_fullexam import ( + GiftResponse ) wxapp_router = APIRouter() + class WxappRequest(BaseModel): uid: Optional[str] = None op: Optional[str] = None m: Optional[str] = None data: Dict[str, Any] = {} + @wxapp_router.post("/index") async def handle_wxapp_request( - request: Request, - i: str, - t: Optional[str] = None, - v: Optional[str] = None, - c: Optional[str] = "entry", - a: Optional[str] = "wxapp", - do: Optional[str] = None, - db: Session = Depends(get_db) + request: Request, + i: str, + t: Optional[str] = None, + v: Optional[str] = None, + c: Optional[str] = "entry", + a: Optional[str] = "wxapp", + do: Optional[str] = None, + db: Session = Depends(get_db) ): # 获取表单数据 try: @@ -59,6 +107,26 @@ async def handle_wxapp_request( user_exam_answer = CRUDUserExamAnswer(UserExamAnswer) user_wrong_praction = CRUDUserWrongPraction(UserWrongPraction) user_collection = CRUDUserCollectionPraction(UserCollectionPraction) + user_mapper = CRUDUser(User) + user_gift_mapper = CRUDUserGift(UserGift) + setting_mapper = CRUDSetting(Setting) + gift_mapper = CRUDGift(Gift) + user_do_other_exam_mapper = CRUDUserDoOtherExam(UserDootherExam) + paper_mapper = CRUDPaper(Paper) + user_pool_mapper = CRUDUserPool(UserPool) + cdkeys_mapper = CRUDCdkeys(Cdkeys) + cdkey_mapper = CRUDCdkey(Cdkey) + user_member_mapper = CRUDUserMember(UserMember) + test_mapper = CRUDTest(Test) + xueshen_mapper = CRUDXueshen(Xuesheng) + user_sequence_mapper = CRUDUserSequence(UserSequence) + user_pool_mapper = CRUDUserPool(UserPool) + test_type_mapper = CRUDTestType(TestType) + users_special_mapper = CRUDUserSpecial(UserSpecial) + user_qtype_mapper = CRUDUserQtype(UserQtype) + son_simple_mapper = CRUDSonSimple(SonSimple) + exercise_mapper = CRUDExercise(Exercise) + paper_test_mapper = CRUDPaperTest(PaperTest) # 根据do参数处理不同的业务逻辑 if do == "Setuserinfo": @@ -78,9 +146,38 @@ async def handle_wxapp_request( elif do == "Advert": # 添加广告处理逻辑 return {"code": 0, "data": [], "msg": "success"} - + + # part 3 + elif do == "ExchangeGiftList": + return await handle_exchange_gift_list(WxappRequest(**data), i, db, user_gift_mapper, gift_mapper) + elif do == "Aboutus": + return await handle_about_us(WxappRequest(**data), i, db, setting_mapper) + elif do == "Mockexam": + return await MockExam(WxappRequest(**data), db, i, user_mapper, setting_mapper, xueshen_mapper, + test_type_mapper, test_mapper, user_pool_mapper, cdkey_mapper, cdkeys_mapper, + paper_test_mapper, paper_mapper, user_exam_answer) + elif do == "PreExamInfo": + return await per_exam_info(WxappRequest(**data), i, db, setting_mapper, user_doexam, user_mapper, + user_do_other_exam_mapper) + + elif do == "GetExamList": + return await get_exam_list(WxappRequest(**data), i, db, paper_mapper, paper_test_mapper, user_pool_mapper, + cdkeys_mapper, user_member_mapper, cdkey_mapper, user_doexam, test_mapper, + user_mapper) + elif do == "Sequence": + return await handle_sequence(WxappRequest(**data), db, i, user_mapper, xueshen_mapper, user_sequence_mapper, + test_mapper, setting_mapper, user_pool_mapper, test_type_mapper, + users_special_mapper, cdkeys_mapper, + cdkey_mapper, user_member_mapper, user_collection, user_qtype_mapper, + son_simple_mapper, + exercise_mapper, user_exam_answer, user_wrong_praction) + elif do == "TotalqNum": + return await TotalqNum(WxappRequest(**data), db, i, user_mapper, user_sequence_mapper, xueshen_mapper, + test_type_mapper, user_pool_mapper, test_mapper) + return {"code": 404, "msg": "接口未找到"} + async def handle_user_info(data: WxappRequest, db: Session): """处理用户信息相关操作""" operations = { @@ -89,14 +186,14 @@ async def handle_user_info(data: WxappRequest, db: Session): "bind": bind_user, "unbind": unbind_user, } - + operation = operations.get(data.op) if not operation: return { "code": 1, "msg": f"Unsupported operation: {data.op}" } - + try: result = await operation(data.uid, data.data, db) return { @@ -110,11 +207,12 @@ async def handle_user_info(data: WxappRequest, db: Session): "msg": str(e) } + async def handle_exam_operation( - data: WxappRequest, - db: Session, - user_doexam: CRUDUserDoexam, - user_exam_answer: CRUDUserExamAnswer + data: WxappRequest, + db: Session, + user_doexam: CRUDUserDoexam, + user_exam_answer: CRUDUserExamAnswer ): """处理考试相关操作""" operations = { @@ -122,14 +220,14 @@ async def handle_exam_operation( "get_history": get_exam_history, "get_detail": get_exam_detail, } - + operation = operations.get(data.op) if not operation: return { "code": 1, "msg": f"Unsupported operation: {data.op}" } - + try: result = await operation(data.uid, data.data, db, user_doexam, user_exam_answer) return { @@ -143,10 +241,11 @@ async def handle_exam_operation( "msg": str(e) } + async def handle_collection( - data: WxappRequest, - db: Session, - user_collection: CRUDUserCollectionPraction + data: WxappRequest, + db: Session, + user_collection: CRUDUserCollectionPraction ): """处理收藏相关操作""" operations = { @@ -154,14 +253,14 @@ async def handle_collection( "remove": remove_collection, "list": list_collections, } - + operation = operations.get(data.op) if not operation: return { "code": 1, "msg": f"Unsupported operation: {data.op}" } - + try: result = await operation(data.uid, data.data, db, user_collection) return { @@ -175,10 +274,11 @@ async def handle_collection( "msg": str(e) } + async def handle_wrong_question( - data: WxappRequest, - db: Session, - user_wrong_praction: CRUDUserWrongPraction + data: WxappRequest, + db: Session, + user_wrong_praction: CRUDUserWrongPraction ): """处理错题相关操作""" operations = { @@ -186,14 +286,14 @@ async def handle_wrong_question( "remove": remove_wrong_question, "list": list_wrong_questions, } - + operation = operations.get(data.op) if not operation: return { "code": 1, "msg": f"Unsupported operation: {data.op}" } - + try: result = await operation(data.uid, data.data, db, user_wrong_praction) return { @@ -207,6 +307,1375 @@ async def handle_wrong_question( "msg": str(e) } + +async def handle_sequence(data: WxappRequest, db: Session, uniacid: str, user_mapper: CRUDUser, + xueshen_mapper: CRUDXueshen, + user_sequence_mapper: CRUDUserSequence, + test_mapper: CRUDTest, + setting_mapper: CRUDSetting, + user_pool_mapper: CRUDUserPool, test_type_mapper: CRUDTestType, + users_special_mapper: CRUDUserSpecial, cdkeys_mapper: CRUDCdkeys, cdkey_mapper: CRUDCdkey, + user_member_mapper: CRUDUserMember, user_collect_praction_mapper: CRUDUserCollectionPraction, + user_qtype_mapper: CRUDUserQtype, son_simple_mapper: CRUDSonSimple, + exercise_mapper: CRUDExercise, user_do_exam_mapper: CRUDUserDoexam, + user_exam_answer_mapper: CRUDUserExamAnswer, + user_wrong_praction_mapper: CRUDUserWrongPraction): + uid = data.uid + op = data.op + if uid is None: + return {"code": 1, "msg": "传递的参数不存在或失效", "data": "1001"} + + user_info = user_mapper.get_user_info(db, uid, uniacid) + + if user_info is None: + return {"code": 1, "msg": "用户不存在", "data": "1002"} + + if user_info.status != 1: + return {"code": 1, "msg": "用户被禁用", "data": "1003"} + + is_student = xueshen_mapper.get_xuesheng_id(db, uniacid, user_info.name, user_info.phone) + + freepool = [] + lib_id = data.data["lib_id"] + + if op == "sequence": + last_id = user_sequence_mapper.get_last_id(db, uniacid, uid, lib_id) + question_list = test_mapper.get_question_list(db, uniacid, lib_id, 1, 9, 1) + + total_qnum = len(question_list) + last_id = int(last_id) + + if total_qnum < last_id: + last_id = total_qnum - 1 + elif last_id == total_qnum - 1: + last_id = 0 + + if question_list is not None: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, question["id"], + question["type"]) + if is_collect is not None: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + + question_list = Makeformat(db=db, itembank=question_list, uniacid=int(uniacid)) + return {"code": 0, + "msg": "顺序练习试题,总题数,上次练习试题ID", + "data": {"list": question_list, + "total_qnum": total_qnum, + "last_id": last_id + } + } + else: + return {"code": 0, "msg": "暂无顺序练习试题数据", "data": {"list": [], "total_qnum": 0, "have": 2}} + elif op == "randoms": + question_list = test_mapper.get_randon_question_list(db, uniacid, lib_id, 9, 1, 1) + + total_qnum = len(question_list) + if question_list is not None: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, question["id"], + question["type"]) + if is_collect is not None: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + + question_list = Makeformat(question_list) + return {"code": 0, + "msg": "随机练习试题,总题数", + "data": {"list": question_list, + "total_qnum": total_qnum + } + } + else: + return {"code": 0, "msg": "暂无随机练习试题数据", "data": {"list": [], "total_qnum": 0, "have": 2}} + elif op == "getallspecial": + pay_open = setting_mapper.get_setting_by_weid(db, uniacid).pay_open + freepoolnum = setting_mapper.get_setting_by_weid(db, uniacid).freepoolnum + + if is_student is not None: + test_type_list = test_type_mapper.get_all_special_if_is_student(db, uniacid, uid) + else: + test_type_list = test_type_mapper.get_all_special_without_is_student(db, uniacid, uid) + + if test_type_list is not None and len(test_type_list) > 0: + for test_type in test_type_list: + son = test_type_mapper.get_son_by_pid(db, uniacid, test_type["id"], 1, 1) + + if son is not None: + for v in son: + v["is_can"] = 0 + if v["price"] > 0: + kid = user_pool_mapper.get_kid_id_by_poolid(db, uniacid, uid, v["id"]) + if kid is not None: + v["is_can"] = 1 + else: + v["is_can"] = 1 + + z_vip = user_member_mapper.get_by_field(db, "weid", int(uniacid)) + v["vip_price"] = v["price"] * z_vip.scale + v["vip_price"] = round(v["vip_price"], 2) + + is_member = user_mapper.get_user_is_member(db, uniacid, uid) + if is_member is not None and is_member["ismember"] == 1 and v["vip_price"] <= 0: + v["is_can"] = 1 + + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cdkeys = cdkeys_mapper.get_code_id_list_by_kpool_id_and_type(db, uniacid, v["id"], 2) + + if cdkeys is not None and len(cdkeys) > 0: + active = cdkey_mapper.get_user_active(db, uniacid, uid, now_time, cdkeys) + + if active is not None: + v["is_can"] = 1 + + vson = test_type_mapper.get_son_by_pid(db, uniacid, v["id"], 1, 1) + + if vson is not None and len(vson) > 0: + for k in vson: + is_can = 0 + if v["is_can"] == 1: + is_can = 1 + else: + if k["id"] < 1: + is_can = 1 + k["is_can"] = is_can + v["son"] = copy.deepcopy(vson) + test_type["son"] = copy.deepcopy(son) + + return {"code": 0, + "msg": "获取专项", + "data": {"list": test_type_list, + "pay_open": pay_open, + "freenum": freepoolnum + } + } + + elif op == "special": + special_id = int(data.data["typeId"]) + pid = test_type_mapper.get_test_type_by_special_id(db, uniacid, special_id) + pool = test_type_mapper.get_pool_by_id(db, uniacid, pid) + + is_can = 0 + if pool["price"] <= 0: + is_can = 1 + else: + kid_id = user_pool_mapper.get_kid_id_by_poolid(db, uniacid, uid, pool["id"]) + if kid_id is not None: + is_can = 1 + if is_can == 1: + question_list = test_mapper.get_question_list(db, uniacid, special_id, 1, 9, 1) + + total_qnum = len(question_list) + else: + freepoolnum = setting_mapper.get_setting_by_weid(db, uniacid).freepoolnum + question_list = test_mapper.get_question_list_with_limit(db, uniacid, special_id, 1, 9, 1, freepoolnum) + + total_qnum = len(question_list) + last_id = users_special_mapper.get_last_id_by_special_id(db, uniacid, uid, special_id) + last_id = int(last_id) + + if last_id is not None and total_qnum <= last_id: + last_id = total_qnum - 1 + elif last_id == total_qnum - 1: + last_id = 0 + + if question_list is not None and len(question_list) > 0: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, question["id"], + question["type"]) + if is_collect is not None and is_collect != 0: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + + question_list = Makeformat(db=db, itembank=question_list, uniacid=int(uniacid)) + return {"code": 0, "msg": "知识点专项练习试题,总题数,进度下标", + "data": {"list": question_list, "total_qnum": total_qnum, "last_id": last_id}} + else: + return {"code": 0, "msg": "暂无该知识点试题数据", "data": {"list": [], "total_qnum": 0, "last_id": 0}} + + elif op == "getalltypes": + type_list = [] + type_list.insert(1, "单选题") + type_list.insert(2, "多选题") + type_list.insert(3, "判断题") + type_list.insert(4, "填空题") + type_list.insert(5, "语音题") + type_list.insert(6, "完形填空") + type_list.insert(7, "阅读理解") + type_list.insert(8, "简答题") + + typelist = [] + + for index, value in enumerate(type_list): + if index == 5: + q_have = test_mapper.get_q_have_by_type_and_lib_id(db, uniacid, index, lib_id, 1, 1, 1) + if q_have == 0: + continue + else: + # 没有son + q_have = test_mapper.get_q_have_by_type_and_lib_id(db, uniacid, index, lib_id, 1, None, 1) + if q_have == 0: + continue + + typelist.append({"id": index, "name": value}) + type_list_t = son_simple_mapper.get_type_list(db, uniacid) + + if type_list is not None and len(type_list) > 0: + son = 2 + if type_list_t is not None and len(type_list_t) > 0: + son = 1 + return { + "code": 0, + "msg": "获取题型", + "data": { + "type_list": type_list, + "son": son + } + } + else: + return { + "code": 0, + "msg": "暂无题型", + "data": { + "type_list": [], + "son": 2 + } + } + + elif op == "getallsimple": + psize = 20 + pindex = max(1, data.data["page"]) + + type_list_t = son_simple_mapper.get_type_list_page(db, uniacid, pindex, psize) + + total = son_simple_mapper.get_total(db, uniacid) + total = len(total) + 1 + + # todo 后续验证 + # if (!empty($type_list_t)) { + # foreach($type_list as $key = > $value) { + # array_unshift($type_list_t, $value); + # } + # foreach($type_list_t as $k = > $v) { + # $type_list_t[$k]["son_title"] = $v["son_title"]; + # } + # } + # 将 type_list 中的每个元素插入到 type_list_t 的开头 + # 定义 type_list + type_list = [{"id": "0", "son_title": "简答题"}] + # 将 type_list 中的每个元素插入到 type_list_t 的开头 + type_list_t = type_list + type_list_t + + if type_list_t is not None and len(type_list_t) > 0: + return { + "code": 0, + "msg": "获取题型", + "data": { + "type_list": type_list_t, + "total": total, + "psize": psize, + } + } + else: + return { + "code": 0, + "msg": "暂无题型", + "data": { + "type_list": [], + } + } + + elif op == "qtype": + if data.data["typeId"] is None or uid is None: + return { + "code": 1, + "msg": "传递的参数不存在或失效", + "data": "1009", + } + qtype = int(data.data["typeId"]) + uid = int(uid) + last_id = user_qtype_mapper.get_last_id_by_type_id_and_uid(db, uniacid, uid, qtype, 1) + + question_list = test_mapper.get_qtye_question_list(db, uniacid, 1, qtype, 1, lib_id) + + total_qnum = len(question_list) + last_id = int(last_id) + if total_qnum <= last_id: + last_id = total_qnum - 1 + elif last_id == total_qnum - 1: + last_id = 0 + + if question_list is not None and len(question_list) > 0: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, question["id"], + question["type"]) + if is_collect: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + + question_list = Makeformat(question_list) + return { + "code": 0, + "msg": "题型练习试题,总题数", + "data": { + "list": question_list, + "total_qnum": total_qnum, + "last_id": last_id, + }, + } + else: + return { + "code": 0, + "msg": "暂无该题型试题数据", + "data": { + "list": [], + "total_qnum": 0, + "have": 2, + }, + } + elif op == "simple_type": + if data.data["qtype"] is None or uid is None: + return { + "code": 1, + "msg": "传递的参数不存在或失效", + "data": "1009", + } + qtype = int(data.data["qtype"]) + son_type = int(data.data["son_type"]) + last_id = user_qtype_mapper.get_last_id_by_type_id_and_uid(db, uniacid, uid, qtype, 1) + question_list = test_mapper.get_question_list_by_son_type(db, uniacid, qtype, lib_id, 1, son_type, 1) + + total_qnum = len(question_list) + last_id = int(last_id) + + if total_qnum <= last_id: + last_id = total_qnum - 1 + elif last_id == total_qnum - 1: + last_id = 0 + + if question_list is not None and len(question_list) > 0: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, question["id"], + question["type"]) + if is_collect is not None: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + + pass + question_list = Makeformat(question_list) + return { + "code": 0, + "msg": "题型练习试题,总题数", + "data": {"list": question_list, "total_qnum": total_qnum, "last_id": last_id}} + else: + return { + "code": 0, + "msg": "暂无该题型试题数据", + "data": { + "list": [], + "total_qnum": 0, + "have": 2, + }, + } + + elif op == "notdone": + + testid_arr = exercise_mapper.get_testid_list(db, uniacid, uid) + newid_arr = [] + + if testid_arr is not None and len(testid_arr) > 0: + for testid in testid_arr: + newid_arr.append(testid["testid"]) + # 把newid_arr传进去 + question_list = test_mapper.get_question_list(db, uniacid, lib_id, 1, 8, 1) + + if question_list is not None and len(question_list) > 0: + for question in question_list: + if question["type"] == 5: + son_list = test_mapper.get_testson_by_pid(db, uniacid, question["id"]) + for son in son_list: + have_do = exercise_mapper.get_exercise_have_do(db, uniacid, uid, son.id) + + if have_do is not None: + question_list.remove(question) + break + if question_list is not None and len(question_list) > 0: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, question["id"], + question["type"]) + if is_collect is not None: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + question_list = Makeformat(question_list) + have = 1 + else: + question_list = [] + have = 2 + total_qnum = len(question_list) + return { + "code": 0, + "msg": "未做练习试题,总题数", + "data": {"list": question_list, "total_qnum": total_qnum, "have": have} + } + else: + return { + "code": 0, + "msg": "暂无未做练习试题", + "data": { + "list": [], + "total_qnum": 0, + "have": 2, + }, + } + else: + question_list = test_mapper.get_question_list(db, uniacid, lib_id, 1, 8, 1) + + if question_list is not None and len(question_list) > 0: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, question["id"], + question["type"]) + + if is_collect is not None: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + + question_list = Makeformat(db=db, itembank=question_list, uniacid=int(uniacid)) + have = 1 + total_qnum = len(question_list) + return { + "code": 0, + "msg": "未做练习试题,总题数", + "data": {"list": question_list, "total_qnum": total_qnum, "have": have} + } + else: + return { + "code": 0, + "msg": "暂无未做练习试题", + "data": { + "list": [], + "total_qnum": 0, + "have": 2, + }, + } + elif op == "todayerr_question": + # 获取当前日期的开始时间戳(00:00:00) + # 获取当前日期 + current_date = datetime.now().date() # 返回一个 date 对象,格式为 YYYY-MM-DD + + # 将日期转换为时间戳(Unix 时间戳) + start = datetime.combine(current_date, datetime.min.time()).timestamp() + + id_list = user_wrong_praction_mapper.get_id_list_with_time(db, uniacid, uid, int(start)) + err_id_list = user_wrong_praction_mapper.get_err_id_list_order_by_createtime(db, uniacid, uid) + + if id_list is None or len(id_list) == 0: + return { + "code": 1, + "msg": "今日暂无错题", + "data": "1004" + } + else: + err_list = [] + errid_list = [] + for id in err_id_list: + if id["test_type"] > 4: + pid = test_mapper.get_pid_by_id(db, uniacid, id["testid"]) + if pid not in errid_list: + errid_list.append(pid) + temp = test_mapper.get_question(db, uniacid, pid) + if temp is not None: + err_list.append(temp) + else: + temp = test_mapper.get_err_list_order_by_id(db, uniacid, id["testid"]) + if temp is not None: + err_list.append(temp) + for err in err_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, err["id"], err["type"]) + if is_collect is not None: + err["wrong_have"] = 1 + else: + err["wrong_have"] = 2 + + err_list = Makeformat(err_list) + total = len(err_list) + return { + "code": 0, + "msg": "今日错题", + "data": {"list": err_list, "total": total} + } + + elif op == "allerr_question": + err_id_list = user_wrong_praction_mapper.get_err_id_list_order_by_createtime(db, uniacid, uid) + if err_id_list is None or len(err_id_list) <= 0: + return { + "code": 1, + "msg": "暂无错题", + "data": "1004" + } + err_list = [] + errid_list = [] + + for err_id in err_id_list: + if err_id["test_type"] > 4: + pid = test_mapper.get_pid_by_id(db, uniacid, err_id["testid"]) + + if pid not in errid_list: + errid_list.append(pid) + temp = test_mapper.get_question(db, uniacid, pid) + if temp is not None: + err_list.append(temp) + else: + temp = test_mapper.get_question(db, uniacid, err_id["testid"]) + if temp is not None: + err_list.append(temp) + + for err in err_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, err["id"], err["type"]) + + if is_collect is not None: + err["wrong_have"] = 1 + else: + err["wrong_have"] = 2 + + err_list = Makeformat(db=db, itembank=err_list, uniacid=int(uniacid)) + return { + "code": 0, + "msg": "全部错题", + "data": {"list": err_list} + } + + elif op == "geterr_bytype": + typeId = int(data.data["typeId"]) + id_arr = [] + user_wrong_praction_mapper.get_id_list_order_by_createtime(db, uniacid, uid, typeId) + if id_arr is None or len(id_arr) <= 0: + return { + "code": 0, + "msg": "暂无该题型错题", + "data": "1005" + } + err_list = [] + errid_list = [] + + for id in id_arr: + if id["test_type"] > 4: + pid = test_mapper.get_pid_by_id(db, uniacid, id["testid"]) + if pid not in errid_list: + errid_list.append(pid) + temp = test_mapper.get_question(db, uniacid, pid) + if temp is not None: + err_list.append(temp) + else: + temp = test_mapper.get_question(db, uniacid, id["testid"]) + if temp is not None: + err_list.append(temp) + + for err in err_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, err["id"], err["type"]) + + if is_collect is not None: + err["wrong_have"] = 1 + else: + err["wrong_have"] = 2 + + err_list = Makeformat(err_list) + total = len(err_list) + return { + "code": 0, + "msg": "该题型错题列表,总数", + "data": {"list": err_list, "total": total} + } + elif op == "seeErr": + if data.data["paperid"] is None or uid is None: + return {"code": 1, "msg": "传递的参数不存在", "data": {"code": "1001"}} + + paperid = int(data.data["paperid"]) + uid = int(uid) + + record_id = user_do_exam_mapper.get_user_recordid(db, uniacid, uid, paperid) + answer_list = user_exam_answer_mapper.get_user_exam_answer(db, uniacid, uid, paperid, record_id) + + err_list = [] + errid_list = [] + # 拷贝 + err_id_list = copy.deepcopy(answer_list) + + for err_id in err_id_list: + if err_id["test_type"] > 4 and err_id["test_type"] < 8: + pid = 0 + test_mapper.get_pid_by_id(db, uniacid, err_id["test_id"]) + if pid not in errid_list: + errid_list.append(pid) + temp = test_mapper.get_question(db, uniacid, pid) + if temp is not None: + err_list.append(temp) + else: + temp = test_mapper.get_question(db, uniacid, err_id["test_id"]) + if temp is not None: + err_list.append(temp) + for err in err_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, uid, err["id"], err["type"]) + if is_collect is not None: + err["wrong_have"] = 1 + else: + err["wrong_have"] = 2 + err_list = Makeformat(db, err_list, uid=uid, paperid=paperid, uniacid=uniacid) + total = len(err_list) + return {"code": 0, "msg": "本次考试错题列表,总数", "data": {"list": err_list, "total": total}} + elif op == "collection_question": + id_list = user_collect_praction_mapper.get_id_list(db, uniacid, uid, 1) + + if id_list is not None and len(id_list) > 0: + question_list = [] + for id in id_list: + temp = test_mapper.get_question(db, uniacid, id["testid"]) + question_list.append(temp) + if question_list is not None and len(question_list) > 0: + for question in question_list: + is_collect = user_collect_praction_mapper.get_is_collect(db, uniacid, question["id"], + question["type"]) + if is_collect is not None: + question["wrong_have"] = 1 + else: + question["wrong_have"] = 2 + + question_list = Makeformat(db=db, itembank=question_list, uniacid=int(uniacid)) + return {"code": 0, "msg": "全部收藏", "data": {"list": question_list}} + + else: + return {"code": 1, "msg": "暂无收藏", "data": "1004"} + + +async def handle_exchange_gift_list( + data: WxappRequest, + uniacid: str, + db: Session, + user_gift_mapper: CRUDUserGift, + gift_mapper: CRUDGift +): + uid = data.uid + if uid: + return { + "code": 1, + "msg": "传递的参数不存在或失效" + } + + # [ giftid , createtime , status + # (1, 1634567890, 1), + # (2, 1634567900, 0) + # ] + user_gift_list = user_gift_mapper.get_user_gift_list(db, uniacid, uid) + data = [] + if user_gift_list: + for user_gift in user_gift_list: + gift = gift_mapper.get_gift_by_id(db, user_gift[0], uniacid) + + gift_dict = gift.__dict__ + gift_dict['createtime'] = user_gift[1] + # 假设 attachurl 是一个字符串 + # gift_dict['image'] = attachurl + gift.image + gift_dict['status'] = user_gift[2] + + # 创建新的 Pydantic 模型实例 + gift_vo = GiftResponse(**gift_dict) + data.append(gift_vo) + return { + "code": 0, + "msg": "兑换记录", + "data": data + } + return { + "code": 1, + "msg": "暂无兑换记录" + } + + pass + + +async def handle_about_us( + data: WxappRequest, + uniacid: str, + db: Session, + setting: CRUDSetting +): + print(data) + info = setting.get_about_us(db, uniacid) + return { + "code": 0, + "data": info, + "msg": "success" + } + + +async def MockExam(data: WxappRequest, + db: Session, + uniacid: str, + user_mapper: CRUDUser, + setting_mapper: CRUDSetting, + xueshen_mapper: CRUDXueshen, + test_type_mapper: CRUDTestType, + test_mapper: CRUDTest, + user_pool_mapper: CRUDUserPool, + cdkey_mapper: CRUDCdkey, + cdkeys_mapper: CRUDCdkeys, + paper_test_mapper: CRUDPaperTest, + paper_mapper: CRUDPaper, + user_exam_answer_mapper: CRUDUserExamAnswer + ): + uid = data.uid + + exam_type = int(data.data["exam_type"]) + paperid = int(data.data["paperid"]) + if not uid: + return { + "code": 1, + "msg": "传递的参数不存在或失效", + "data": "1001" + } + if not exam_type: + return { + "code": 1, + "msg": "传递的参数不存在或失效", + "data": "1002" + } + + user_info = user_mapper.get_user_info(db, uniacid, uid) + if not user_info: + return { + "code": 1, + "msg": "用户不存在", + "data": "1003" + } + + if user_info.status != 1: + return { + "code": 1, + "msg": "用户被禁用", + "data": "1004" + } + + is_student = xueshen_mapper.get_xuesheng_id(db, uniacid, user_info.name, user_info.phone) + freepool = [] + fpool = [] + if not is_student: + fpool = test_type_mapper.get_fpool(db, uniacid) + else: + fpool = test_type_mapper.get_fpool_with_student_check(db, uniacid) + if not fpool: + for val in fpool: + test_type_ids = test_type_mapper.get_test_type_ids(db, uniacid, val) + if not test_type_ids: + freepool.extend(test_type_ids) + pool = [] + if not freepool: + for val in freepool: + pool.append(val) + userpool = [] + # todo 使用了连接查询,这里需要测试是否正常工作 + upool = user_pool_mapper.get_user_pool(db, uniacid, uid) + + if upool: + for val in upool: + test_type_ids_ist = test_type_mapper.get_test_type_ids(db, uniacid, val[1]) + if not test_type_ids_ist: + for val in test_type_ids_ist: + userpool.append(val[0]) + if userpool: + for val in pool: + pool.append(val) + + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cdkey_list = cdkey_mapper.get_cdkey_list(db, uniacid, uid, int(now_time)) + active_pool = [] + + for cdkey in cdkey_list: + pools = cdkeys_mapper.get_all_kpool_id(db, uniacid, cdkey.cid) + active_pool.extend(pools) + + t_pools = [] + for a_pool in active_pool: + t_pool = test_type_mapper.get_test_type_ids(db, a_pool) + t_pools.extend(t_pool) + + active_pools = [val for val in t_pools] + pool.extend(active_pools) + # 去重 + pool = list(set(pool)) + # 拼接成字符串 + # $poolstr = ""; + # if (!empty($pool)) { + # $poolstr = implode(",", $pool); + # } + + setting = setting_mapper.get_setting_by_weid(db, uniacid) + # $franction = unserialize($setting["franction"]); + franction = phpserialize.loads(setting.franction.encode('utf-8'), decode_strings=True) + paper_time = setting.paper_time + data = [] + total_score = 0 + total_qnum = 0 + # 判断fraction不为空 + if franction: + for key, val in enumerate(franction): + data[f"type_num_{key}"] = val[f"num_{key}"] + data[f"type_score_{key}"] = int(val[f"num_{key}"]) * int(val[f"franction_{key}"]) + total_score += data[f"type_score_{key}"] + total_qnum += int(data[f"type_num_{key}"]) + else: + return { + "code": 1, + "msg": "暂未设置考试信息", + "data": "1005" + } + + question_list = [] + if exam_type == 1: + if paperid: + return { + "code": 1, + "msg": "传递的参数不存在或失效", + "data": "1006" + } + paper = paper_mapper.get_paper_by_paperid(db, uniacid, paperid) + + question_list = paper_test_mapper.get_questions_by_paperid(db, paper.id) + + if question_list: + question_list = Makeformat(question_list) + return { + "code": 0, + "msg": "获取全真试卷失败", + "data": { + "paperid": paper.id, + "list": question_list, + "total_score": total_score, + "paper_time": paper_time, + "total_qnum": total_qnum + } + } + else: + return {"code": 0, "msg": "获取全真试卷失败", "data": {"list": [], "have": 2}} + elif exam_type == 2: + testid_arr = user_exam_answer_mapper.get_exam_answer_test_id(db, uniacid, 1) + newid_arr = [val for val in testid_arr] + + if newid_arr: + i = 0 + for val in data: + i += 1 + if i <= 5: + temp = test_mapper.mock_exam_get_question_list(db, uniacid, pool, i, 1, data[f"type_num_{i}"]) + question_list.extend(temp) + new_question = [] + if question_list: + for question in question_list: + new_question.append(question) + + new_question_list = Makeformat(new_question) + now_total = len(new_question_list) + if now_total != total_qnum: + return {"code": 0, + "msg": "获取优先未做试卷试题失败1", + "data": {"list": [], "have": 2} + } + else: + return { + "code": 0, + "msg": "优先未做试卷试题", + "data": { + "list": new_question_list, + "total_score": total_score, + "paper_time": paper_time, + "total_qnum": total_qnum, + "ceshi": 1 + } + } + else: + return { + "code": 0, + "msg": "获取优先未做试卷试题失败2", + "data": { + "list": [], + "total_qnum": 0, + "have": 2 + } + } + else: + i = 0 + for value in data: + i += 1 + if i <= 7: + temp = test_mapper.get_randon_question_list_batch(db, uniacid, i, 8, 1, 1, pool, + data[f"type_num_{i}"]) + question_list.extend(temp) + + new_question = [] + if question_list is not None: + new_question.extend(question_list) + + new_question = Makeformat(db=db, itembank=new_question, uniacid=int(uniacid)) + now_total = len(new_question) + if now_total != total_qnum: + return { + "code": 0, + "msg": "获取优先未做试卷试题失败3", + "data": { + "list": [], + "have": 2 + } + } + else: + return { + "code": 0, + "msg": "优先未做试卷试题", + "data": { + "list": new_question, + "total_score": total_score, + "paper_time": paper_time, + "total_qnum": total_qnum, + "ceshi": 2 + } + } + else: + return { + "code": 0, + "msg": "获取优先未做试卷试题失败4", + "data": { + "list": [], + "total_qnum": 0, + "have": 2 + } + } + elif exam_type == 3: + i = 0 + for val in data: + i += 1 + if i <= 7: + question_list.extend( + test_mapper.mock_exam_get_question_list_type3(db, uniacid, pool, i, 1, data[f"type_num_{i}"])) + new_question = [] + if question_list is not None and len(question_list) > 0: + for val in question_list: + new_question.append(val) + + new_question = Makeformat(db=db, itembank=new_question, uniacid=int(uniacid)) + + now_total = len(new_question) + if now_total != total_qnum: + return { + "code": 0, + "msg": "获取智能考试试卷试题失败1", + "data": { + "list": [], + "have": 2 + } + } + else: + return { + "code": 0, + "msg": "智能考试试卷试题", + "data": { + "list": new_question, + "total_score": total_score, + "paper_time": paper_time, + "total_qnum": total_qnum, + } + } + else: + return { + "code": 0, + "msg": "获取智能考试试卷试题失败2", + "data": { + "list": [], + "total_qnum": 0, + "have": 2 + } + } + + +def mb_rtrim(s, suffix): + while s.endswith(suffix): + s = s[:-len(suffix)] + return s + + +# todo $_W["attachurl"]需要解决 +def Makeformat(db: Session, itembank: List, uid=None, paperid=None, uniacid=None): + attachurl = "" + + test_mapper = CRUDTest(Test) + user_exam_answer_mapper = CRUDUserExamAnswer(UserExamAnswer) + paper_mapper = CRUDPaper(Paper) + + for item in itembank: + # 处理qaudio + if item.get('qaudio'): + item['qaudio'] = attachurl + item['qaudio'] + + # 处理qimage + if item.get('qimage'): + try: + qimage = phpserialize.loads(item['qimage'].encode('utf-8'), decode_strings=True) + qimage_list = [] + if isinstance(qimage, dict): + qimage = [qimage[k] for k in sorted(qimage.keys())] + for vv in qimage: + if not re.match(r'^(http://|https://)', vv, re.I): + vv = attachurl + vv + qimage_list.append(vv) + item['qimage'] = qimage_list + except Exception as e: + item['qimage'] = [] + + # 处理question + if item.get('question'): + item['question'] = html.unescape(item['question']) + + # 处理aimage + if item.get('aimage'): + try: + aimage = phpserialize.loads(item['aimage'].encode('utf-8'), decode_strings=True) + aimage_list = [] + if isinstance(aimage, dict): + aimage = [aimage[k] for k in sorted(aimage.keys())] + for vv in aimage: + if not re.match(r'^(http://|https://)', vv, re.I): + vv = attachurl + vv + aimage_list.append(vv) + item['aimage'] = aimage_list + except Exception as e: + item['aimage'] = [] + + # 处理analysis_audio + if item.get('analysis_audio'): + item['analysis_audio'] = attachurl + item['analysis_audio'] + + # 处理analysis + if item.get('analysis'): + item['analysis'] = html.unescape(item['analysis']) + + # 处理option + if 'option' in item: + try: + option_data = phpserialize.loads(item['option'].encode('utf-8'), decode_strings=True) + options = [] + if isinstance(option_data, dict): + for key in sorted(option_data.keys()): + vv = html.unescape(option_data[key]) + if item.get('a_type', 0) != 0: + if vv and not re.match(r'^(http://|https://)', vv, re.I): + vv = attachurl + vv + options.append({'o': key, 'p': vv}) + item['option'] = options + except Exception as e: + item['option'] = [] + + # 处理type=4 + if item.get('type') == 4: + rightkey = item.get('rightkey', '') + if rightkey: + parts = rightkey.split('|') + rightarray = [] + for part in parts: + part = mb_rtrim(part, '【') + part = mb_rtrim(part, '】') + rightarray.extend(part.split('】【')) + item['rightarray'] = rightarray + + # 处理type5-7(子题目) + if 5 <= item.get('type', 0) <= 7: + son_list = test_mapper.get_testson_by_pid(db, uniacid, item['id']) + if son_list is not None and len(son_list) > 0: + son_dicts = [son.to_dict() for son in son_list] + item['list'] = Makeformat(db=db, itembank=son_dicts, uniacid=uniacid) + + # 处理type=8(用户答案) + if item.get('type') == 8: + do_answer = user_exam_answer_mapper.get_do_answer(db, uniacid, uid, paperid, item['id']) + + if not do_answer: + item['comments'] = '未填写评语', + else: + item['comments'] = do_answer.comments + + item['uanswer'] = do_answer.uanswer + item["get_simple_scores"] = do_answer.simple_score + + franction = paper_mapper.get_franction_by_paperid(db, uniacid, paperid) + + if franction is not None: + try: + simples = phpserialize.loads(franction.encode('utf-8'), decode_strings=True) + item['simple_scores'] = simples.get('8', {}).get('franction_8', 0) + except: + item['simple_scores'] = 0 + else: + item['simple_scores'] = 0 + + return itembank + + +async def TotalqNum(data: WxappRequest, + db: Session, + uniacid: str, + user_mapper: CRUDUser, + user_spequence_mapper: CRUDUserSequence, + xueshen_mapper: CRUDXueshen, + test_type_mapper: CRUDTestType, + user_pool_mapper: CRUDUserPool, + test_mapper: CRUDTest + ): + uid = data.uid + if not uid: + user_info = user_mapper.get_user_info(db, uid, uniacid) + if user_info: + return { + "code": 1, "msg": "用户不存在", "data": "1002" + } + if user_info.status != 1: + return { + "code": 1, "msg": "用户被禁用", "data": "1003" + } + last_id = int(user_spequence_mapper.get_last_id(db, uid, uniacid)) + + is_student = xueshen_mapper.get_xuesheng_id(db, uniacid, user_info.name, user_info.phone) + freepool = [] + fpool = [] + + if not is_student: + fpool = test_type_mapper.get_fpool(db, uniacid) + else: + fpool = test_type_mapper.get_fpool_with_student_check(db, uniacid) + if not fpool: + for val in fpool: + test_type_ids = test_type_mapper.get_test_type_ids(db, uniacid, val) + if not test_type_ids: + freepool.extend(test_type_ids) + pool = [] + if not freepool: + for val in freepool: + pool.append(val) + userpool = [] + # todo 使用了连接查询,这里需要测试是否正常工作 + upool = user_pool_mapper.get_user_pool(db, uniacid, uid) + + if not upool: + for val in upool: + test_type_ids_ist = test_type_mapper.get_test_type_ids(db, uniacid, val[1]) + if not test_type_ids_ist: + for val in test_type_ids_ist: + userpool.append(val[0]) + + if not userpool: + for val in pool: + pool.append(val) + + pool = list(dict.fromkeys(pool)) + total = 0 + if not pool: + total = test_mapper.get_test_count(db, uniacid, pool) + + if total < last_id: + last_id = total + elif last_id == total - 1: + last_id = 0 + + if total > 0: + return { + "code": 0, + "msg": "总题数,进度ID", + "data": { + "total": int(total), + "last_id": int(last_id) + } + } + else: + return { + "code": 1, + "msg": "暂无试题", + "data": { + "total": 0, "last_id": 0, "is_can": 0 + } + } + + else: + return { + "code": 1, + "msg": "请先登录", + "data": { + "total": 0, "last_id": 0, "is_can": 2 + } + } + + +async def per_exam_info( + data: WxappRequest, + uniacid: str, + db: Session, + setting: CRUDSetting, + user_doexam: CRUDUserDoexam, + user_mapper: CRUDUser, + user_do_other_exam: CRUDUserDoOtherExam +): + uid = int(data.uid) + if not uid: + return { + "code": 1, + "msg": "传递的参数不存在或失效", + "data": "1001" + } + user_info = user_mapper.get_user_info(db, uid, uniacid) + if not user_info: + return { + "code": 1, + "msg": "用户不存在", + "data": "1002" + } + if user_info.status != 1: + return { + "code": 1, + "msg": "用户被禁用", + "data": "1003" + } + + quanzhen_highest = user_doexam.get_quanzhen_highest(db, uid, uniacid) + other_highest = user_do_other_exam.get_user_other_highest(db, uid, uniacid) + + # 将空值转换为0,确保可以正确参与比较 + quanzhen_highest = quanzhen_highest if quanzhen_highest else 0 + other_highest = other_highest if other_highest else 0 + + # 比较两个分数,取最大值 + highest = max(quanzhen_highest, other_highest) + + setting_info = setting.get_about_us(db, uniacid) + + franction = phpserialize.loads(setting.franction.encode('utf-8'), decode_strings=True) + + paper_time = setting_info.paper_time + data = [] + total_score = 0 + total_qnum = 0 + if not franction: + for key, val in enumerate(franction): + data[f"type_num_{key}"] = val[f"num_{key}"] + data[f"type_score_{key}"] = int(val[f"num_{key}"]) * int(val[f"franction_{key}"]) + total_score += data[f"type_score_{key}"] + total_qnum += int(data[f"type_num_{key}"]) + pass_score = round(total_score * 0.6, 1) + return { + "code": 0, + "msg": "总分,时长,及格,历史最高", + "data": {"total_score": total_score, + "paper_time": paper_time, + "pass_score": pass_score, + "highest": highest + } + } + else: + return { + "code": 1, + "msg": "暂未设置考试信息", + "data": "1004" + } + + +async def get_exam_list( + data: WxappRequest, + uniacid: str, + db: Session, + paper_mapper: CRUDPaper, + paper_test: CRUDPaperTest, + user_pool: CRUDUserPool, + cdkeys_service: CRUDCdkeys, + user_member: CRUDUserMember, + cdkey_service: CRUDCdkey, + user_doexam_service: CRUDUserDoexam, + test_mapper: CRUDTest, + user_mapper: CRUDUser + +): + uid = data.uid + page_index = int(data.data.page) + page_size = 10 + paper_list = paper_mapper.get_page_list(db, uniacid, page_index, page_size) + total = paper_mapper.get_total(db, uniacid) + + if paper_list: + have = 2 + return { + "code": 0, + "msg": "暂无全真试卷", + "data": {"list": paper_list, "have": have} + } + + for paper in paper_list: + value = paper.__dict__ + value["name"] = value["title"] + value["qnum"] = paper_test.get_qnum(db, value["id"]) + value["createtime"] = datetime.fromtimestamp(value["createtime"]).strftime('%Y-%m-%d') + value["is_can"] = 0 + kid = "" + if value["price"] > 0: + kid = user_pool.get_kid_id_by_paperid(db, uniacid, uid, value["id"]) + if not kid: + value["is_can"] = 1 + else: + value["is_can"] = 1 + + user_mapper.get_user_is_member(db, uniacid, uid) + is_member = {} + z_vip = user_member.get_by_field(db, "weid", uniacid) + + value["vip_price"] = value["price"] * z_vip.scale + value["vip_price"] = math.floor(value["vip_price"] * 100) / 100 + + value["simple"] = 2 + if is_member == 1 and value["vip_price"] <= 0: + value["is_can"] = 1 + + now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cdkeys = cdkeys_service.get_all_cdkeys_code_ids(db, uniacid, value["id"]) + + if cdkeys is not None and len(cdkeys) > 0: + active = cdkey_service.get_user_active(db, uniacid, uid, now_time, cdkeys) + + # todo查询用户是否有符合条件的激活记录 + if active is not None: + value["is_can"] = 1 + + paper_test_id_list = paper_test.get_all_test_id(db, value["id"]) + for paper_test_id in paper_test_id_list: + res = test_mapper.get_test_by_id_and_type(db, uniacid, 8, paper_test_id) + + if res: + value["simple"] = 1 + value["is_repeat"] = 2 + break + # 查询用户做过的试卷数量 + do_number = user_doexam_service.get_user_do_number(db, uniacid, uid, value["id"]) + value["do_number"] = do_number + have = 1 + return { + "code": 0, + "msg": "全真卷列表", + "data": { + "list": paper_list, + "have": have, + "total": total, + "psize": page_size + } + } + + # 具体的操作函数实现... # 用户信息相关 async def get_user_info(uid: str, data: Dict[str, Any], db: Session): @@ -214,65 +1683,83 @@ async def get_user_info(uid: str, data: Dict[str, Any], db: Session): print("get_user_info", uid, data, db) pass + async def update_user_info(uid: str, data: Dict[str, Any], db: Session): # 实现更新用户信息的逻辑 print("update_user_info", uid, data, db) pass + async def bind_user(uid: str, data: Dict[str, Any], db: Session): # 实现绑定用户的逻辑 print("bind_user", uid, data, db) pass + async def unbind_user(uid: str, data: Dict[str, Any], db: Session): # 实现解绑用户的逻辑 print("unbind_user", uid, data, db) pass + # 考试相关 async def submit_exam(uid: str, data: Dict[str, Any], db: Session, user_doexam, user_exam_answer): # 实现提交考试的逻辑 print("submit_exam", uid, data, db, user_doexam, user_exam_answer) pass + async def get_exam_history(uid: str, data: Dict[str, Any], db: Session, user_doexam, user_exam_answer): # 实现获取考试历史的逻辑 print("get_exam_history", uid, data, db, user_doexam, user_exam_answer) pass + async def get_exam_detail(uid: str, data: Dict[str, Any], db: Session, user_doexam, user_exam_answer): # 实现获取考试详情的逻辑 print("get_exam_detail", uid, data, db, user_doexam, user_exam_answer) pass + # 收藏相关 async def add_collection(uid: str, data: Dict[str, Any], db: Session, user_collection): # 实现添加收藏的逻辑 print("add_collection", uid, data, db, user_collection) pass + async def remove_collection(uid: str, data: Dict[str, Any], db: Session, user_collection): # 实现移除收藏的逻辑 print("remove_collection", uid, data, db, user_collection) pass + async def list_collections(uid: str, data: Dict[str, Any], db: Session, user_collection): # 实现列出收藏的逻辑 print("list_collections", uid, data, db, user_collection) pass -# 错题相关 + # 错题相关 + + async def add_wrong_question(uid: str, data: Dict[str, Any], db: Session, user_wrong_praction): # 实现添加错题的逻辑 print("add_wrong_question", uid, data, db, user_wrong_praction) pass + async def remove_wrong_question(uid: str, data: Dict[str, Any], db: Session, user_wrong_praction): # 实现移除错题的逻辑 print("remove_wrong_question", uid, data, db, user_wrong_praction) pass + async def list_wrong_questions(uid: str, data: Dict[str, Any], db: Session, user_wrong_praction): # 实现列出错题的逻辑 print("list_wrong_questions", uid, data, db, user_wrong_praction) - pass \ No newline at end of file + pass + + +async def test(data: Dict[str, Any], db: Session): + test_mapper = CRUDTest(Test) + print(test_mapper.get_q_have_by_type_and_lib_id(db, "2", 1, 16, 1, None, 1)) diff --git a/mooc/crud/crud_goouc_fullexam.py b/mooc/crud/crud_goouc_fullexam.py index aaa44fd..b9ba2e4 100644 --- a/mooc/crud/crud_goouc_fullexam.py +++ b/mooc/crud/crud_goouc_fullexam.py @@ -1,5 +1,6 @@ -from typing import Optional +from typing import Optional, List, Tuple, Dict, Any from sqlalchemy.orm import Session +from sqlalchemy import select, func from mooc.crud.crud_base import CRUDBase from mooc.models.goouc_fullexam import (Advert, Banji, Banner, Category, Cdkey, CdkeyCate, Cdkeys, Exercise, Feedback, @@ -47,6 +48,51 @@ class CRUDCdkey(CRUDBase[Cdkey, CdkeyCreate, CdkeyUpdate]): def get_cdkey(self, db: Session, cdkey_id: int): return self.get_by_field(db, "id", cdkey_id) + # $cdkeys = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_cdkey"). + # " WHERE + # weid =:weid + # uid=:uid + # status =2 + # display=1 + # endtime > unix_timestamp(now()) + # kpool > :kpool ", [":weid" = > $_W["uniacid"], "uid" = > $uid, ":kpool" = > $now_time]); + def get_cdkey_list(self, db: Session, uniacid: str, uid: int, now_time: str) -> Optional[List[Cdkey]]: + cdkeys = db.query(self.model).filter( + self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.status == 2, + self.model.display == 1, + self.model.endtime > db.func.unix_timestamp(db.func.now()), + self.model.kpool > now_time + ).all() + return cdkeys + + def get_user_active_with_paperid(self, db: Session, uniacid: str, uid: int, now_time: str, + cdkeys_code_id_list: List[int]) -> Optional[ + Cdkey]: + return (db.query(self.model).filter( + self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.status == 2, + self.model.display == 1, + self.model.endtime > func.unix_timestamp(func.now()), + self.model.kpool > now_time, + self.model.cid.in_(cdkeys_code_id_list) + ).first()) + + def get_user_active(self, db: Session, uniacid: str, uid: int, now_time: str, cdkeys_code_id_list: List[int]) -> \ + Optional[ + Cdkey]: + return (db.query(self.model).filter( + self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.status == 2, + self.model.display == 1, + self.model.endtime > func.unix_timestamp(func.now()), + self.model.kpool > now_time, + self.model.cid.in_(cdkeys_code_id_list) + ).first()) + class CRUDCdkeyCate(CRUDBase[CdkeyCate, CdkeyCateCreate, CdkeyCateUpdate]): def get_cdkey_cate(self, db: Session, cdkey_cate_id: int): @@ -57,11 +103,52 @@ class CRUDCdkeys(CRUDBase[Cdkeys, CdkeysCreate, CdkeysUpdate]): def get_cdkeys(self, db: Session, cdkeys_id: int): return self.get_by_field(db, "id", cdkeys_id) + def get_all_cdkeys_code_ids(self, db: Session, uniacid: str, paper_id: int) -> List[int]: + results = (db.query(self.model.code_id).filter( + self.model.uniacid == int(uniacid), + self.model.kpool_id == paper_id, + self.model.type == 1) + .all()) + + return [result[0] for result in results] + + # $pools = pdo_getall("goouc_fullexam_cdkeys", + # ["weid" => $_W["uniacid"], "code_id" => $cdkey["cid"], "type" => 2], "kpool_id"); + def get_all_kpool_id(self, db: Session, uniacid: str, cid: int) -> Optional[List[int]]: + pools = (db.query(self.model.kpool_id) + .filter(self.model.weid == uniacid, + self.model.code_id == cid, + self.model.type == 2 + ).all()) + return [pool[0] for pool in pools] + + def get_code_id_list_by_kpool_id_and_type(self, db: Session, uniacid: str, kpool_id: int, type: int): + results = (db.query(self.model.code_id) + .filter(self.model.weid == int(uniacid), + self.model.kpool_id == kpool_id, + self.model.type == type + ).all()) + # pdo_getall("goouc_fullexam_cdkeys", ["weid" = > $_W["uniacid"], "kpool_id" = > $v["id"], "type" = > 2], ["code_id"]); + return [result[0] for result in results] + class CRUDExercise(CRUDBase[Exercise, ExerciseCreate, ExerciseUpdate]): def get_exercise(self, db: Session, exercise_id: int): return self.get_by_field(db, "id", exercise_id) + def get_testid_list(self, db: Session, uniacid: str, uid: int): + results = db.query(self.model.testid).filter( + self.model.weid == int(uniacid), + self.model.uid == uid + ).all() + return [result[0] for result in results] + + def get_exercise_have_do(self, db: Session, uniacid: str, uid: int, testid: int): + results = db.query(self.model.id, self.model.testid, self.model.test_type).filter( + self.model.weid == int(uniacid), self.model.uid == uid, self.model.testid == testid).first() + + return results if results else None + class CRUDFeedback(CRUDBase[Feedback, FeedbackCreate, FeedbackUpdate]): def get_feedback(self, db: Session, feedback_id: int): @@ -72,6 +159,13 @@ class CRUDGift(CRUDBase[Gift, GiftCreate, GiftUpdate]): def get_gift(self, db: Session, gift_id: int): return self.get_by_field(db, "id", gift_id) + def get_gift_by_id(self, db: Session, gift_id: int, uniacid: str) -> Optional[Gift]: + return (db.query(self.model) + .filter(self.model.weid == int(uniacid), self.model.id == gift_id) + .first()) + # $gift = pdo_fetch("SELECT * FROM ".tablename("goouc_fullexam_gift"). + # " WHERE weid=:weid AND id=:gid", [":weid" = > $_W["uniacid"], ":gid" = > $val["giftid"]]); + class CRUDIndexBtn(CRUDBase[IndexBtn, IndexBtnCreate, IndexBtnUpdate]): def get_index_btn(self, db: Session, index_btn_id: int): @@ -102,11 +196,85 @@ class CRUDPaper(CRUDBase[Paper, PaperCreate, PaperUpdate]): def get_paper(self, db: Session, paper_id: int): return self.get_by_field(db, "id", paper_id) + def get_franction_by_paperid(self, db: Session, uniacid: str, paperid: int) -> Optional[str]: + result = (db.query(self.model.franction) + .filter(self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.status == 1, + self.model.id == paperid) + .first()) + return result[0] if result else None + + def get_page_list(self, db: Session, uniacid: str, page_index: int, page_size: int) -> Optional[List[Paper]]: + return db.query(self.model).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.status == 1 + ).order_by(self.model.displayorder.desc()).limit(page_size).offset( + (page_index - 1) * page_size).all() + + # $paper = pdo_fetch("SELECT id,title,total_franction FROM ".tablename("goouc_fullexam_paper"). + # " WHERE weid=:weid AND status=1 AND istatus=1 AND id=:paperid ", + # [":weid" = > $_W["uniacid"], ":paperid" = > $paperid]); + + def get_paper_by_paperid(self, db: Session, uniacid: str, paperid: int) -> Optional[Paper]: + return (db.query(self.model) + .filter(self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.status == 1, + self.model.id == paperid) + .first()) + + def get_total(self, db: Session, uniacid: str): + return (db.query(self.model) + .filter(self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.status == 1) + .order_by(self.model.displayorder.desc()) + .count()) + class CRUDPaperTest(CRUDBase[PaperTest, PaperTestCreate, PaperTestUpdate]): def get_paper_test(self, db: Session, paper_test_id: int): return self.get_by_field(db, "id", paper_test_id) + def get_qnum(self, db: Session, paper_test_id: int) -> Optional[int]: + return (db.query(self.model) + .filter(self.model.paperid == paper_test_id) + .filter(self.model.istatus == 1) + .filter(self.model.status == 1) + .count()) + + def get_all_test_id(self, db: Session, paperid: int) -> Optional[List[int]]: + return db.query(self.model.testid).filter(self.model.paperid == paperid).all() + + # pdo_fetchall("SELECT + # q.id as id,q.type,q.question,q.qimage,q.qvideo,q.qaudio,q.a_type,q.option,q.rightkey,q.analysis,q.aimage,q.analysis_audio + # tablename("goouc_fullexam_paper_test") . " AS pt JOIN " . tablename("goouc_fullexam_test") . " + # AS q ON pt.testid=q.id WHERE pt.paperid=:paperid AND q.istatus=1 ", [":paperid" => $paper["id"]]); + def get_questions_by_paperid(self, db: Session, paperid: int): + questions = ( + db.query( + Test.id.label('id'), + Test.type, + Test.question, + Test.qimage, + Test.qvideo, + Test.qaudio, + Test.a_type, + Test.option, + Test.rightkey, + Test.analysis, + Test.aimage, + Test.analysis_audio + ) + .join(PaperTest, Test.id == PaperTest.testid) + .filter(PaperTest.paperid == paperid, + Test.istatus == 1) + .all() + ) + return questions + class CRUDPhonecode(CRUDBase[Phonecode, PhoneCodeCreate, PhoneCodeUpdate]): def get_phonecode(self, db: Session, phonecode_id: int): @@ -127,26 +295,562 @@ class CRUDSetting(CRUDBase[Setting, SettingCreate, SettingUpdate]): def get_setting(self, db: Session, setting_id: int): return self.get_by_field(db, "id", setting_id) + def get_setting_by_weid(self, db: Session, uniacid: str): + return self.get_by_field(db, "weid", int(uniacid)) + + def get_about_us(self, db: Session, uniacid: str): + result = self.get_by_field(db, "weid", int(uniacid)) + # 创建一个空字典 + info = {} + if result is not None: + info["id"] = result.id + info["about"] = result.about + return info + class CRUDShareRecord(CRUDBase[ShareRecord, ShareRecordCreate, ShareRecordUpdate]): def get_share_record(self, db: Session, share_record_id: int): return self.get_by_field(db, "id", share_record_id) -class CRUDsonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]): +class CRUDSonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]): def get_son_simple(self, db: Session, son_simple_id: int): return self.get_by_field(db, "id", son_simple_id) + def get_type_list(self, db: Session, uniacid: str) -> Optional[List[SonSimple]]: + # $type_list_t = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_son_simple"). + # " WHERE weid = :weid ORDER BY id", [":weid" = > $_W["uniacid"]]); + results = db.query(self.model).filter(self.model.weid == int(uniacid)).order_by(self.model.id).all() + return results if results is not None else [] + + def get_type_list_page(self, db: Session, uniacid: str, pindex: int, psize: int) -> Optional[List[SonSimple]]: + results = db.query(self.model).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.status == 1 + ).order_by(self.model.displayorder.desc()).limit(psize).offset( + (pindex - 1) * psize).all() + # $type_list_t = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_son_simple"). + # " WHERE weid = :weid ORDER BY id DESC LIMIT ".($pindex - 1) * $psize. + # ",". $psize, [":weid" = > $_W["uniacid"]]); + return results if results is not None else [] + + def get_total(self, db: Session, uniacid: str) -> Optional[List[SonSimple]]: + results = db.query(self.model).filter(self.model.weid == int(uniacid)).order_by(self.model.id.desc).all() + return results if results is not None else [] + class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): def get_test(self, db: Session, test_id: int): return self.get_by_field(db, "test_id", test_id) + def get_question(self, db: Session, uniacid: str, id: int) -> Optional[Dict[str, Any]]: + + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.id == id, + self.model.istatus == 1 + ) + + # 只选择指定字段 + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.first() + results_dict = dict(zip(column_names, results)) if results is not None else None + return results_dict + + def get_err_list_order_by_id(self, db: Session, uniacid: str, id: int) -> Optional[List[Dict]]: + + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.id == id, + self.model.istatus == 1 + ) + # 只选择指定字段 + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.order_by(self.model.id).all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_pid_by_id(self, db: Session, uniacid: str, id: int) -> Optional[int]: + result = db.query(self.model.pid).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.id == id + ).first() + return result[0] if result is not None else None + + def get_test_count(self, db: Session, uniacid: str, pool_ids: List[int]) -> int: + total = db.query(self.model.id).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.type != 0, + self.model.display == 1, + self.model.type < 9, + self.model.libraryid.in_(pool_ids) + ).count() + return total if total is not None else 0 + + def get_test_by_id_and_type(self, db: Session, uniacid: str, type: int, testid: int) -> Optional[int]: + result = (db.query(self.model.id).filter( + self.model.weid == int(uniacid), + self.model.type == type, + self.model.id == testid) + .first()) + return result[0] if result is not None else None + + def mock_exam_get_question_list(self, db: Session, uniacid: str, pool: List[int], type: int, display: int, + num: int) -> Optional[List[Dict[str, Any]]]: + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.libraryid.in_(pool), + self.model.istatus == 1, + self.model.type == type, + self.model.display == display, + self.model.type < 8 + ) + + # 添加动态条件 + if type == 5: + query = query.filter(self.model.son_status == 1) + + if num <= 0: + raise ValueError("Invalid limit value") + + # 只选择指定字段 + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.order_by(func.rand()).limit(num).all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def mock_exam_get_question_list_type3(self, db: Session, uniacid: str, pool: List[int], type: int, display: int, + num: int) -> Optional[List[Dict[str, Any]]]: + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.libraryid.in_(pool), + self.model.istatus == 1, + self.model.type == type, + self.model.display == display, + ).order_by(func.rand()).limit(num).all() + + if num <= 0: + raise ValueError("Invalid limit value") + + # 只选择指定字段 + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.order_by(func.rand()).limit(num).all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_qtye_question_list(self, db: Session, uniacid: str, istatus: int, type: int, display: int, lib_id: int): + + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.istatus == istatus, + self.model.type == type, + self.model.display == display, + self.model.libraryid == lib_id + ).order_by(self.model.id) + + # 只选择指定字段 + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_randon_question_list(self, db: Session, uniacid: str, lib_id: int, type_max: int, istatus: int, + display: int) -> Optional[List[Dict[str, Any]]]: + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.istatus == istatus, + self.model.type != 0, + self.model.type < type_max, + self.model.display == display, + self.model.libraryid == lib_id + + ).order_by(func.rand()) + + # 只选择指定字段 + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_randon_question_list_batch(self, db: Session, uniacid: str, type: int, type_max: int, istatus: int, + display: int, pool: List[int], limit: int) -> Optional[List[Dict[str, Any]]]: + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.type < type_max, + self.model.libraryid.in_(pool), + self.model.istatus == istatus, + self.model.type == type, + self.model.display == display + ).order_by(func.rand()).limit(limit) + + # 只选择指定字段 + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_question_list(self, db: Session, uniacid: str, lib_id: int, istatus: int, type_max: int, display: int) -> \ + Optional[List[Dict[str, Any]]]: + # 构建查询 + # 构建基础查询 + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.istatus == istatus, + self.model.type != 0, + self.model.display == display, + self.model.type < type_max, + self.model.libraryid == lib_id + ).order_by(self.model.id) + + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_question_list_with_limit(self, + db: Session, uniacid: str, lib_id: int, istatus: int, type_max: int, display: int, + limit: int) -> Optional[List[Dict[str, Any]]]: + # 构建查询 + # 构建基础查询 + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.istatus == istatus, + self.model.type != 0, + self.model.type < type_max, + self.model.display == display, + self.model.libraryid == lib_id + ).order_by(self.model.id).limit(limit) + + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_question_list_by_son_type(self, db: Session, uniacid: str, qtype: int, lib_id: int, istatus: int, + son_simple: int, display: int) -> \ + Optional[List[Dict[str, Any]]]: + # 构建查询 + # 构建基础查询 + query = db.query( + self.model.id, + self.model.type, + self.model.question, + self.model.qimage, + self.model.qvideo, + self.model.qaudio, + self.model.a_type, + self.model.option, + self.model.rightkey, + self.model.analysis, + self.model.aimage, + self.model.analysis_audio + ).filter( + self.model.weid == int(uniacid), + self.model.istatus == istatus, + self.model.son_simple == son_simple, + self.model.type == qtype, + self.model.display == display, + self.model.libraryid == lib_id + ).order_by(self.model.id) + + # 将result元组转化为字典返回 + column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", + "analysis", "aimage", "analysis_audio"] + + # 执行查询并转换结果为字典 + results = query.all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_q_have_by_type_and_lib_id(self, db: Session, uniacid: str, type: int, lib_id: int, istatus: int, + son_status: int, display: int) -> Optional[int]: + query = (db.query(self.model.id) + .filter(self.model.weid == int(uniacid)) + .filter(self.model.type == type) + .filter(self.model.istatus == istatus) + .filter(self.model.display == display) + .filter(self.model.son_status == son_status) + .filter(self.model.libraryid == lib_id) + ) + return query.count() + + def get_testson_by_pid(self, db: Session, uniacid: str, pid: int) -> Optional[List[Test]]: + results = db.query(self.model).filter(self.model.weid == int(uniacid), self.model.pid == pid).order_by( + self.model.id).all() + return results if results else [] + class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): def get_test_type(self, db: Session, test_type_id: int): return self.get_by_field(db, "id", test_type_id) + def get_fpool(self, db: Session, uniacid: str) -> List[int]: + results = db.query(self.model.id).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.status == 1, + self.model.price <= 0, + self.model.pid != 0, + self.model.gpid == 0 + ).all() + return [result[0] for result in results] if results else [] + + def get_fpool_with_student_check(self, db: Session, uniacid: str) -> List[int]: + results = db.query(self.model.id).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.status == 1, + self.model.price <= 0, + self.model.is_student != 1, + self.model.pid != 0, + self.model.gpid == 0 + ).all() + return [result[0] for result in results] if results else [] + + def get_test_type_ids(self, db: Session, uniacid: str, pid: int) -> List[int]: + results = db.query(self.model.id).filter( + self.model.weid == int(uniacid), + self.model.pid == pid, + self.model.istatus == 1, + self.model.status == 1, + self.model.gpid != 0 + ).all() + return [result[0] for result in results] if results else [] + + def get_test_type_by_special_id(self, db: Session, uniacid: str, special_id: int) -> Optional[int]: + result = db.query(self.model.id).filter( + self.model.weid == int(uniacid), + self.model.id == special_id, + self.model.istatus == 1, + self.model.status == 1 + ).first() + # $pid = pdo_getcolumn("goouc_fullexam_test_type", ["weid" = > $_W[ + # "uniacid"], "id" = > $special_id, "istatus" = > 1, "status" = > 1], "pid"); + return result[0] if result else None + + def get_pool_by_id(self, db: Session, uniacid: str, pid: int) -> List[Dict[str, Any]]: + result = (db.query(self.model.id, self.model.name, self.model.price) + .filter(self.model.weid == int(uniacid)) + .filter(self.model.istatus == 1) + .filter(self.model.status == 1) + .filter(self.model.id == pid).first()) + # $pool = pdo_fetchall("SELECT id,name,price FROM ".tablename("goouc_fullexam_test_type"). + # " WHERE weid=:weid AND istatus = 1 AND status = 1 AND id=:id ", [":weid" = > $_W["uniacid"], + # ":id" = > $pid]); + if result is not None: + # 将result从元组转化为字典返回 + return {"id": result[0], "name": result[1], "price": result[2]} + else: + return None + + def get_all_special_if_is_student(self, db: Session, uniacid: str, uid: int): + results = db.query(self.model.id, self.model.name).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.is_student == 1, + self.model.pid == 0, + self.model.status == 1 + ).order_by(self.model.id).all() + + # pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type"). + # " WHERE weid=:weid AND istatus=1 AND pid = 0 AND status = 1 ORDER BY id ASC ", + # [":weid" = > $_W["uniacid"]]); + + return [{"id": result[0], "name": result[1]} for result in results] + + def get_all_special_without_is_student(self, db: Session, uniacid: str, uid: int) -> Optional[List[Dict[str, Any]]]: + results = db.query(self.model.id, self.model.name).filter( + self.model.weid == int(uniacid), + self.model.istatus == 1, + self.model.is_student == 2, + self.model.pid == 0, + self.model.status == 1 + ).order_by(self.model.id).all() + + # $list = pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type"). + # " WHERE weid=:weid AND istatus=1 AND is_student = 2 AND pid = 0 AND status = 1 ORDER BY id ASC ", + # [":weid" = > $_W["uniacid"]]); + + return [{"id": result[0], "name": result[1]} for result in results] if results else None + + def get_son_by_pid(self, db: Session, uniacid: str, pid: int, istatus: int, status: int): + results = db.query(self.model.id, self.model.name).filter( + self.model.weid == int(uniacid), + self.model.istatus == istatus, + self.model.pid == pid, + self.model.status == status + ).order_by(self.model.id).all() + # pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type"). + # " WHERE weid=:weid AND istatus=1 AND pid=:pid AND status = 1 ORDER BY id ASC ", + # [":weid" = > $_W["uniacid"], ":pid" = > $v["id"]]); + + return [{"id": result[0], "name": result[1]} for result in results] if results else None + class CRUDTypeCate(CRUDBase[TypeCate, TypeCateCreate, TypeCateUpdate]): def get_type_cate(self, db: Session, type_cate_id: int): @@ -167,6 +871,13 @@ class CRUDXueshen(CRUDBase[Xuesheng, XueshengCreate, XueshengUpdate]): def get_xueshen(self, db: Session, xueshen_id: int): return self.get_by_field(db, "xuesheng_id", xueshen_id) + def get_xuesheng_id(self, db: Session, uniacid: str, user_name: str, user_phone: str) -> Optional[int]: + return (db.query(self.model.xuesheng_id).filter( + self.model.weid == int(uniacid), + self.model.user_name == user_name, + self.model.user_phone == user_phone) + .first()) + advert = CRUDAdvert(Advert) banji = CRUDBase(Banji) @@ -190,7 +901,7 @@ q_year = CRUDSchool(School) school = CRUDSchool(School) setting = CRUDSetting(Setting) share_record = CRUDShareRecord(ShareRecord) -son_simple = CRUDsonSimple(SonSimple) +son_simple = CRUDSonSimple(SonSimple) test = CRUDTest(Test) test_type = CRUDTestType(TestType) type_cate = CRUDTypeCate(TypeCate) diff --git a/mooc/crud/crud_goouc_fullexam_user.py b/mooc/crud/crud_goouc_fullexam_user.py index 46f4c47..07e4094 100644 --- a/mooc/crud/crud_goouc_fullexam_user.py +++ b/mooc/crud/crud_goouc_fullexam_user.py @@ -1,14 +1,16 @@ -from typing import Optional +from typing import Optional, Tuple, List, Dict, Any + from sqlalchemy.orm import Session from mooc.crud.crud_base import CRUDBase from mooc.models.goouc_fullexam_user import ( # 导入全部 + User, UserDoexam, UserDoOtherExamAnswer, UserExamAnswer, UserSpecial, - UserSpequence, + UserSequence, UserMember, UserCollectionPraction, UserGift, @@ -18,9 +20,12 @@ from mooc.models.goouc_fullexam_user import ( UserDootherExam, UserWrongPraction, UserPool, + UserQtype ) from mooc.schemas.goouc_fullexam_user import ( # 导入全部create和update + UserCreate, + UserUpdate, UserDoexamCreate, UserDoexamUpdate, UserDoOtherExamAnswerCreate, @@ -49,13 +54,57 @@ from mooc.schemas.goouc_fullexam_user import ( UserWrongPractionUpdate, UserPoolCreate, UserPoolUpdate, + UserQTypeCreate, + UserQTypeUpdate, ) +from mooc.models.goouc_fullexam import ( + TestType +) + + +class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]): + def get_user(self, db: Session, user_id: int): + return self.get_by_field(db, "id", user_id) + + def get_user_info(self, db: Session, uid: int, uniacid: str) -> Optional[User]: + return db.query(self.model).filter(self.model.id == uid, self.model.weid == int(uniacid)).first() + + def get_user_is_member(self, db: Session, uniacid: str, uid: int) -> Optional[int]: + # $is_member = pdo_getcolumn(goouc_fullexam_user, + # ["weid" = > $_W["uniacid"], "id" = > $uid], + # "ismember"); + result = (db.query(self.model.ismember). + filter(self.model.weid == uniacid, + self.model.id == uid) + .first()) + return result[0] if result else None class CRUDUserDoexam(CRUDBase[UserDoexam, UserDoexamCreate, UserDoexamUpdate]): def get_user_doexam(self, db: Session, user_doexam_id: int): return self.get_by_field(db, "id", user_doexam_id) + def get_quanzhen_highest(self, db: Session, uniacid: str, uid: int): + result = (db.query(self.model) + .filter_by(self.model.weid == uniacid) + .filter(self.model.uid == uid) + .order_by(self.model.franction.desc()) + .first()) + return result[0] if result else None + + def get_user_do_number(self, db: Session, uniacid: str, uid: int, examid: int): + return db.query(self.model).filter( + self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.examid == examid).count() + + def get_user_recordid(self, db: Session, uniacid: str, uid: int, paperid: int): + result = db.query(self.model.recordid).filter( + self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.examid == paperid).order_by(self.model.createtime.desc()).limit(1).first() + return result[0] if result else None + class CRUDUserDoOtherExamAnswer( CRUDBase[UserDoOtherExamAnswer, UserDoOtherExamAnswerCreate, UserDoOtherExamAnswerUpdate]): @@ -67,16 +116,85 @@ class CRUDUserExamAnswer(CRUDBase[UserExamAnswer, UserExamAnswerCreate, UserExam def get_user_exam_answer(self, db: Session, user_exam_answer_id: int): return self.get_by_field(db, "id", user_exam_answer_id) + def get_user_exam_answer(self, db: Session, uniacid: str, uid: int, paperid: int, recordid: int) -> Optional[ + List[Dict[str, Any]]]: + query = (db.query(self.model.uid, self.model.examid, self.model.testid, self.model.test_type) + .filter(self.model.weid == int(uniacid)) + .filter(self.model.uid == uid) + .filter(self.model.examid == paperid) + .filter(self.model.recordid == recordid) + .filter(self.model.test_type != 0) + .filter(self.model.isright == 0) + .order_by(self.model.testid) + ) + + # 将result元组转化为字典返回 + column_names = ["uid", "examid", "testid", "test_type"] + + # 执行查询并转换结果为字典 + results = query.all() + results_dict = [dict(zip(column_names, result)) for result in results] + + return results_dict + + def get_exam_answer_test_id(self, db: Session, uniacid: str, uid: int) -> Optional[List[int]]: + results = (db.query(self.model.testid) + .filter(self.model.weid == int(uniacid)) + .filter(self.model.uid == uid) + .order_by(self.model.testid) + .all()) + return [result[0] for result in results] if results else [] + + def get_do_answer(self, db: Session, uniacid: str, uid: int, paperid: int, testid: int) -> Optional[UserExamAnswer]: + return (db.query(self.model).filter( + self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.examid == paperid, + self.model.testid == testid) + .order_by(self.model.createtime.desc()).first()) + + # $do_answer = pdo_fetch( + # " SELECT * FROM ".tablename($this->t_user_exam_answer)." + # WHERE weid = :weid AND uid = :uid AND testid = :testid AND examid = :examid order by createtime DESC ", [ + # ":weid" = > $_W["uniacid"], ":uid" = > $uid, ":examid" = > $paperid, "testid" = > $v["id"]]); + class CRUDUserSpecial(CRUDBase[UserSpecial, UserSpecialCreate, UserSpecialUpdate]): def get_user_special(self, db: Session, user_special_id: int): return self.get_by_field(db, "id", user_special_id) + def get_last_id_by_special_id(self, db: Session, uniacid: str, uid: int, special_id: int) -> Optional[int]: + result = (db.query(self.model.last_id) + .filter(self.model.weid == int(uniacid)) + .filter(self.model.uid == uid) + .filter(self.model.special_id == special_id) + .filter(self.model.istatus == 1) + .first()) + return result[0] if result else None -class CRUDUserSpequence(CRUDBase[UserSpequence, UserSpequenceCreate, UserSpequenceUpdate]): + # $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_special"). + # " WHERE weid=:weid AND uid=:uid AND special_id=:special_id AND istatus=1 ", + # [":weid" = > $_W["uniacid"], ":uid" = > $uid, ":special_id" = > $special_id]); + + +class CRUDUserSequence(CRUDBase[UserSequence, UserSpequenceCreate, UserSpequenceUpdate]): def get_user_spequence(self, db: Session, user_spequence_id: int): return self.get_by_field(db, "id", user_spequence_id) + def get_last_id(self, db: Session, uid: int, uniacid: str): + result = (db.query(self.model.question_id) + .filter(self.model.weid == int(uniacid), + self.model.user_id == uid) + .first()) + return result[0] if result else None + + def get_last_id_by_lib_id(self, db: Session, uid: int, uniacid: str, lib_id: int): + result = (db.query(self.model.question_id) + .filter(self.model.weid == int(uniacid), + self.model.user_id == uid, self.model.lib_id == lib_id) + .first()) + return result[0] if result else None + class CRUDUserMember(CRUDBase[UserMember, UserMemberCreate, UserMemberUpdate]): def get_user_member(self, db: Session, user_member_id: int): @@ -88,11 +206,46 @@ class CRUDUserCollectionPraction( def get_user_collection_praction(self, db: Session, user_collection_praction_id: int): return self.get_by_field(db, "id", user_collection_praction_id) + def get_is_collect(self, db: Session, uniacid: str, uid: int, test_id: int, test_type: int): + result = (db.query(self.model.iscollect) + .filter(self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.testid == test_id, + self.model.test_type == test_type) + .first()) + return result[0] if result else None + # $iscollect = pdo_getcolumn("goouc_fullexam_user_collection_praction", + # ["weid" = > $_W["uniacid"], "uid" = > $uid, "testid" = > $val["id"], "test_type" = > $val["type"]], + # "id"); + + def get_id_list(self, db: Session, uniacid: str, uid: int, istatus: int): + query = (db.query(self.model.testid, self.model.test_type) + .filter(self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.istatus == istatus) + .order_by(self.model.id) + ) + + results = query.all() + return [{"testid": result[0], "test_type": result[1]} for result in results] + class CRUDUserGift(CRUDBase[UserGift, UserGiftCreate, UserGiftUpdate]): def get_user_gift(self, db: Session, user_gift_id: int): return self.get_by_field(db, "id", user_gift_id) + def get_user_gift_list(self, db: Session, uniacid: str, uid: str) -> Optional[List[Tuple[int, int, int]]]: + # [ giftid , createtime , status + # (1, 1634567890, 1), + # (2, 1634567900, 0) + # ] + return (db.query(self.model.giftid, self.model.createtime, self.model.status) + .filter(self.model.weid == uniacid).filter( + self.model.uid == uid).all()) + + def get_user_gift(self, db: Session, user_gift_id: int): + return self.get_by_field(db, "id", user_gift_id) + class CRUDUserQHigh(CRUDBase[UserQhigh, UserQHighCreate, UserQHighUpdate]): def get_user_qhigh(self, db: Session, user_qhigh_id: int): @@ -113,12 +266,109 @@ class CRUDUserDoOtherExam(CRUDBase[UserDootherExam, UserDoOtherExamCreate, UserD def get_user_doother_exam(self, db: Session, user_doother_exam_id: int): return self.get_by_field(db, "id", user_doother_exam_id) + def get_user_other_highest(self, db: Session, uniacid: str, uid: int): + return (db.query(self.model) + .filter_by(self.model.weid == uniacid) + .filter(self.model.uid == uid) + .order_by(self.model.franction.desc()) + .first()) + class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, UserWrongPractionUpdate]): def get_user_wrong_praction(self, db: Session, user_wrong_praction_id: int): return self.get_by_field(db, "id", user_wrong_praction_id) + def get_err_id_list_order_by_createtime(self, db: Session, uniacid: str, uid: int): + results = (db.query(self.model.testid, self.model.test_type) + .filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1) + .order_by(self.model.createtime) + .all()) + # 转换成字典 + return [{ + "testid": result[0], + "test_type": result[1] + } for result in results] if results else None + + def get_id_list_order_by_id(self, db: Session, uniacid: str, uid: int, test_type: int): + results = (db.query(self.model.testid, self.model.test_type) + .filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1) + .order_by(self.model.id) + .all()) + # 转换成字典 + return [{ + "testid": result[0], + "test_type": result[1] + } for result in results] if results else None + + def get_id_list_with_time(self, db: Session, uniacid: str, uid: int, start: int): + results = (db.query(self.model.testid, self.model.test_type) + .filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1, + self.model.createtime > start) + .order_by(self.model.id) + .all()) + + # 转换成字典 + return [{ + "testid": result[0], + "test_type": result[1] + } for result in results] if results else None + class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]): def get_user_pool(self, db: Session, user_pool_id: int): return self.get_by_field(db, "id", user_pool_id) + + def get_kid_id_by_paperid(self, db: Session, uniacid: str, uid: int, paperid: int): + result = (db.query(self.model.id).filter(self.model.weid == uniacid, self.model.uid == uid, + self.model.paperid == paperid, self.model.istatus == 1).first()) + return result[0] if result else None + + def get_user_pool(self, db: Session, uniacid: str, uid: int) -> List[Tuple[int, int]]: + results = db.query(UserPool.id, UserPool.poolid).join( + # 左连接 + TestType, + UserPool.poolid == TestType.id, + isouter=True + ).filter( + UserPool.weid == uniacid, + UserPool.uid == uid, + TestType.istatus == 1, + TestType.status == 1 + ).all() + return results if results else [] + # pdo_fetchall("SELECT up.id,up.poolid FROM " + # tablename("goouc_fullexam_user_pool") + # as up LEFT JOIN " . tablename("goouc_fullexam_test_type") + # AS tt ON up.poolid=tt.id + # WHERE up.weid=:weid AND up.uid=:uid AND tt.istatus = 1 AND tt.status = 1 + # [":weid" => $_W["uniacid"], ":uid" => $uid]); + + def get_kid_id_by_poolid(self, db: Session, uniacid: str, uid: int, poolid: int): + result = (db.query(self.model.id) + .filter(self.model.weid == int(uniacid)) + .filter(self.model.uid == uid) + .filter(self.model.poolid == poolid) + .filter(self.model.istatus == 1) + .first()) + return result[0] if result else None + # $kid = pdo_getcolumn("goouc_fullexam_user_pool", + # ["weid" = > $_W["uniacid"], "uid" = > $uid, "poolid" = > $pool["id"], "istatus" = > 1], "id"); + + +class CRUDUserQtype(CRUDBase[UserQtype, UserQTypeCreate, UserQTypeUpdate]): + def get_user_qype(self, db: Session, user_qtype_id: int): + return self.get_by_field(db, "id", user_qtype_id) + + def get_last_id_by_type_id_and_uid(self, db: Session, uniacid: str, uid: int, type_id: int, istatus: int) -> \ + Optional[int]: + results = (db.query(self.model.last_id).filter( + self.model.weid == int(uniacid), + self.model.uid == uid, + self.model.type_id == type_id, + self.model.istatus == istatus) + .first()) + # $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_qtype"). + # " WHERE weid=:weid AND uid=:uid AND type_id=:type_id AND istatus=1 ", [":weid" = > $_W[ + # "uniacid"], ":uid" = > $uid, ":type_id" = > $qtype]); + + return results[0] if results else None diff --git a/mooc/models/goouc_fullexam_user.py b/mooc/models/goouc_fullexam_user.py index b8acabb..349b338 100644 --- a/mooc/models/goouc_fullexam_user.py +++ b/mooc/models/goouc_fullexam_user.py @@ -369,7 +369,7 @@ class UserSpecial(Base): orm_mode = True -class UserSpequence(Base): +class UserSequence(Base): __tablename__ = 'ims_goouc_fullexam_user_spequence' __table_args__ = ( Index('idx_weid', 'weid'), diff --git a/mooc/schemas/goouc_fullexam.py b/mooc/schemas/goouc_fullexam.py index 45f0b38..340de20 100644 --- a/mooc/schemas/goouc_fullexam.py +++ b/mooc/schemas/goouc_fullexam.py @@ -464,8 +464,16 @@ class GiftInDB(GiftBase): orm_mode = True -class GiftResponse(GiftInDB): - pass # 可以根据需要添加额外的字段或调整现有字段 +class GiftResponse(BaseModel): + # 可以根据需要添加额外的字段或调整现有字段 + weid: int = Field(..., ge=0, description="站点 ID,默认为0") + name: str = Field(..., max_length=100, description="礼品名称") + price: int = Field(100, ge=0, description="礼品市场价格,默认为100") + coins: int = Field(500, ge=0, description="礼品所需金币,默认为500") + image: str = Field(..., max_length=255, description="礼品图片路径") + about: str = Field(..., description="礼品描述") + status: str = Field(..., description="礼品状态") + createtime: Optional[int] = Field(None, description="创建时间,Unix 时间戳") # 用于批量操作的模型 @@ -821,7 +829,7 @@ class PaperTestListResponse(BaseModel): class PhoneCodeBase(BaseModel): weid: int = Field(..., ge=0, description="站点 ID") - phone: str = Field(..., min_length=11, max_length=11, regex=r"^\d{11}$", description="手机号码") + phone: str = Field(..., min_length=11, max_length=11, description="手机号码") code: int = Field(..., ge=100000, le=999999, description="手机验证码,通常为6位数字") createtime: int = Field(..., description="创建时间,Unix 时间戳")