diff --git a/mooc/api/v1/endpoints/wxapp.py b/mooc/api/v1/endpoints/wxapp.py index bf51f5a..33cecee 100644 --- a/mooc/api/v1/endpoints/wxapp.py +++ b/mooc/api/v1/endpoints/wxapp.py @@ -120,7 +120,6 @@ async def handle_wxapp_request( test_mapper = CRUDTest(Test) xueshen_mapper = CRUDXueshen(Xuesheng) user_sequence_mapper = CRUDUserSequence(UserSequence) - user_pool_mapper = CRUDUserPool(UserPool) test_type_mapper = CRUDTestType(TestType) users_special_mapper = CRUDUserSpecial(UserSpecial) user_qtype_mapper = CRUDUserQtype(UserQtype) @@ -149,30 +148,30 @@ async def handle_wxapp_request( # part 3 elif do == "ExchangeGiftList": - return await handle_exchange_gift_list(WxappRequest(**data), i, db, user_gift_mapper, gift_mapper) + return await handle_exchange_gift_list(WxappRequest(**data), int(i), db, user_gift_mapper, gift_mapper) elif do == "Aboutus": - return await handle_about_us(WxappRequest(**data), i, db, setting_mapper) + return await handle_about_us(WxappRequest(**data), int(i), db, setting_mapper) elif do == "Mockexam": - return await MockExam(WxappRequest(**data), db, i, user_mapper, setting_mapper, xueshen_mapper, + return await MockExam(WxappRequest(**data), db, int(i), user_mapper, setting_mapper, xueshen_mapper, test_type_mapper, test_mapper, user_pool_mapper, cdkey_mapper, cdkeys_mapper, paper_test_mapper, paper_mapper, user_exam_answer) elif do == "PreExamInfo": - return await per_exam_info(WxappRequest(**data), i, db, setting_mapper, user_doexam, user_mapper, + return await per_exam_info(WxappRequest(**data), int(i), db, setting_mapper, user_doexam, user_mapper, user_do_other_exam_mapper) elif do == "GetExamList": - return await get_exam_list(WxappRequest(**data), i, db, paper_mapper, paper_test_mapper, user_pool_mapper, + return await get_exam_list(WxappRequest(**data), int(i), db, paper_mapper, paper_test_mapper, user_pool_mapper, cdkeys_mapper, user_member_mapper, cdkey_mapper, user_doexam, test_mapper, user_mapper) elif do == "Sequence": - return await handle_sequence(WxappRequest(**data), db, i, user_mapper, xueshen_mapper, user_sequence_mapper, + return await handle_sequence(WxappRequest(**data), db, int(i), user_mapper, xueshen_mapper, user_sequence_mapper, test_mapper, setting_mapper, user_pool_mapper, test_type_mapper, users_special_mapper, cdkeys_mapper, cdkey_mapper, user_member_mapper, user_collection, user_qtype_mapper, son_simple_mapper, exercise_mapper, user_exam_answer, user_wrong_praction) elif do == "TotalqNum": - return await TotalqNum(WxappRequest(**data), db, i, user_mapper, user_sequence_mapper, xueshen_mapper, + return await TotalqNum(WxappRequest(**data), db, int(i), user_mapper, user_sequence_mapper, xueshen_mapper, test_type_mapper, user_pool_mapper, test_mapper) return {"code": 404, "msg": "接口未找到"} @@ -325,7 +324,7 @@ async def handle_sequence(data: WxappRequest, db: Session, uniacid: str, user_ma if uid is None: return {"code": 1, "msg": "传递的参数不存在或失效", "data": "1001"} - user_info = user_mapper.get_user_info(db, uid, uniacid) + user_info = user_mapper.get_user_info(db, uid, ) if user_info is None: return {"code": 1, "msg": "用户不存在", "data": "1002"} @@ -339,7 +338,7 @@ async def handle_sequence(data: WxappRequest, db: Session, uniacid: str, user_ma lib_id = data.data["lib_id"] if op == "sequence": - last_id = user_sequence_mapper.get_last_id(db, uniacid, uid, lib_id) + last_id = user_sequence_mapper.get_last_id_by_lib_id(db, uniacid, uid, lib_id) question_list = test_mapper.get_question_list(db, uniacid, lib_id, 1, 9, 1) total_qnum = len(question_list) @@ -414,7 +413,7 @@ async def handle_sequence(data: WxappRequest, db: Session, uniacid: str, user_ma else: v["is_can"] = 1 - z_vip = user_member_mapper.get_by_field(db, "weid", int(uniacid)) + z_vip = user_member_mapper.get_by_field(db, "weid", uniacid) v["vip_price"] = v["price"] * z_vip.scale v["vip_price"] = round(v["vip_price"], 2) @@ -423,7 +422,7 @@ async def handle_sequence(data: WxappRequest, db: Session, uniacid: str, user_ma v["is_can"] = 1 now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - cdkeys = cdkeys_mapper.get_code_id_list_by_kpool_id_and_type(db, uniacid, v["id"], 2) + cdkeys = cdkeys_mapper.get_all_cdkeys_code_ids(db, uniacid, v["id"], 2) if cdkeys is not None and len(cdkeys) > 0: active = cdkey_mapper.get_user_active(db, uniacid, uid, now_time, cdkeys) @@ -956,7 +955,7 @@ async def handle_sequence(data: WxappRequest, db: Session, uniacid: str, user_ma async def handle_exchange_gift_list( data: WxappRequest, - uniacid: str, + uniacid: int, db: Session, user_gift_mapper: CRUDUserGift, gift_mapper: CRUDGift @@ -1002,7 +1001,7 @@ async def handle_exchange_gift_list( async def handle_about_us( data: WxappRequest, - uniacid: str, + uniacid: int, db: Session, setting: CRUDSetting ): @@ -1017,7 +1016,7 @@ async def handle_about_us( async def MockExam(data: WxappRequest, db: Session, - uniacid: str, + uniacid: int, user_mapper: CRUDUser, setting_mapper: CRUDSetting, xueshen_mapper: CRUDXueshen, @@ -1047,7 +1046,7 @@ async def MockExam(data: WxappRequest, "data": "1002" } - user_info = user_mapper.get_user_info(db, uniacid, uid) + user_info = user_mapper.get_user_info(db, uid, uniacid) if not user_info: return { "code": 1, @@ -1221,7 +1220,7 @@ async def MockExam(data: WxappRequest, if question_list is not None: new_question.extend(question_list) - new_question = Makeformat(db=db, itembank=new_question, uniacid=int(uniacid)) + new_question = Makeformat(db=db, itembank=new_question, uniacid=uniacid) now_total = len(new_question) if now_total != total_qnum: return { @@ -1518,7 +1517,7 @@ async def TotalqNum(data: WxappRequest, async def per_exam_info( data: WxappRequest, - uniacid: str, + uniacid: int, db: Session, setting: CRUDSetting, user_doexam: CRUDUserDoexam, @@ -1631,8 +1630,7 @@ async def get_exam_list( else: value["is_can"] = 1 - user_mapper.get_user_is_member(db, uniacid, uid) - is_member = {} + is_member = user_mapper.get_user_is_member(db, uniacid, uid) z_vip = user_member.get_by_field(db, "weid", uniacid) value["vip_price"] = value["price"] * z_vip.scale @@ -1643,7 +1641,7 @@ async def get_exam_list( value["is_can"] = 1 now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - cdkeys = cdkeys_service.get_all_cdkeys_code_ids(db, uniacid, value["id"]) + cdkeys = cdkeys_service.get_all_cdkeys_code_ids(db, uniacid, value["id"], 1) if cdkeys is not None and len(cdkeys) > 0: active = cdkey_service.get_user_active(db, uniacid, uid, now_time, cdkeys) diff --git a/mooc/crud/crud_goouc_fullexam.py b/mooc/crud/crud_goouc_fullexam.py index b9ba2e4..1757ad7 100644 --- a/mooc/crud/crud_goouc_fullexam.py +++ b/mooc/crud/crud_goouc_fullexam.py @@ -56,9 +56,9 @@ class CRUDCdkey(CRUDBase[Cdkey, CdkeyCreate, CdkeyUpdate]): # display=1 # endtime > unix_timestamp(now()) # kpool > :kpool ", [":weid" = > $_W["uniacid"], "uid" = > $uid, ":kpool" = > $now_time]); - def get_cdkey_list(self, db: Session, uniacid: str, uid: int, now_time: str) -> Optional[List[Cdkey]]: + def get_cdkey_list(self, db: Session, uniacid: int, uid: int, now_time: str) -> Optional[List[Cdkey]]: cdkeys = db.query(self.model).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid, self.model.status == 2, self.model.display == 1, @@ -67,11 +67,11 @@ class CRUDCdkey(CRUDBase[Cdkey, CdkeyCreate, CdkeyUpdate]): ).all() return cdkeys - def get_user_active_with_paperid(self, db: Session, uniacid: str, uid: int, now_time: str, + def get_user_active_with_paperid(self, db: Session, uniacid: int, uid: int, now_time: str, cdkeys_code_id_list: List[int]) -> Optional[ Cdkey]: return (db.query(self.model).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid, self.model.status == 2, self.model.display == 1, @@ -80,11 +80,11 @@ class CRUDCdkey(CRUDBase[Cdkey, CdkeyCreate, CdkeyUpdate]): self.model.cid.in_(cdkeys_code_id_list) ).first()) - def get_user_active(self, db: Session, uniacid: str, uid: int, now_time: str, cdkeys_code_id_list: List[int]) -> \ + def get_user_active(self, db: Session, uniacid: int, uid: int, now_time: str, cdkeys_code_id_list: List[int]) -> \ Optional[ Cdkey]: return (db.query(self.model).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid, self.model.status == 2, self.model.display == 1, @@ -103,18 +103,18 @@ class CRUDCdkeys(CRUDBase[Cdkeys, CdkeysCreate, CdkeysUpdate]): def get_cdkeys(self, db: Session, cdkeys_id: int): return self.get_by_field(db, "id", cdkeys_id) - def get_all_cdkeys_code_ids(self, db: Session, uniacid: str, paper_id: int) -> List[int]: + def get_all_cdkeys_code_ids(self, db: Session, uniacid: int, kpool_id: int, type: int) -> List[int]: results = (db.query(self.model.code_id).filter( - self.model.uniacid == int(uniacid), - self.model.kpool_id == paper_id, - self.model.type == 1) + self.model.weid == uniacid, + self.model.kpool_id == kpool_id, + self.model.type == type) .all()) - return [result[0] for result in results] + return [result[0] for result in results] if results is not None else [] # $pools = pdo_getall("goouc_fullexam_cdkeys", # ["weid" => $_W["uniacid"], "code_id" => $cdkey["cid"], "type" => 2], "kpool_id"); - def get_all_kpool_id(self, db: Session, uniacid: str, cid: int) -> Optional[List[int]]: + def get_all_kpool_id(self, db: Session, uniacid: int, cid: int) -> Optional[List[int]]: pools = (db.query(self.model.kpool_id) .filter(self.model.weid == uniacid, self.model.code_id == cid, @@ -122,30 +122,21 @@ class CRUDCdkeys(CRUDBase[Cdkeys, CdkeysCreate, CdkeysUpdate]): ).all()) return [pool[0] for pool in pools] - def get_code_id_list_by_kpool_id_and_type(self, db: Session, uniacid: str, kpool_id: int, type: int): - results = (db.query(self.model.code_id) - .filter(self.model.weid == int(uniacid), - self.model.kpool_id == kpool_id, - self.model.type == type - ).all()) - # pdo_getall("goouc_fullexam_cdkeys", ["weid" = > $_W["uniacid"], "kpool_id" = > $v["id"], "type" = > 2], ["code_id"]); - return [result[0] for result in results] - class CRUDExercise(CRUDBase[Exercise, ExerciseCreate, ExerciseUpdate]): def get_exercise(self, db: Session, exercise_id: int): return self.get_by_field(db, "id", exercise_id) - def get_testid_list(self, db: Session, uniacid: str, uid: int): + def get_testid_list(self, db: Session, uniacid: int, uid: int): results = db.query(self.model.testid).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid ).all() return [result[0] for result in results] - def get_exercise_have_do(self, db: Session, uniacid: str, uid: int, testid: int): + def get_exercise_have_do(self, db: Session, uniacid: int, uid: int, testid: int): results = db.query(self.model.id, self.model.testid, self.model.test_type).filter( - self.model.weid == int(uniacid), self.model.uid == uid, self.model.testid == testid).first() + self.model.weid == uniacid, self.model.uid == uid, self.model.testid == testid).first() return results if results else None @@ -159,9 +150,9 @@ class CRUDGift(CRUDBase[Gift, GiftCreate, GiftUpdate]): def get_gift(self, db: Session, gift_id: int): return self.get_by_field(db, "id", gift_id) - def get_gift_by_id(self, db: Session, gift_id: int, uniacid: str) -> Optional[Gift]: + def get_gift_by_id(self, db: Session, gift_id: int, uniacid: int) -> Optional[Gift]: return (db.query(self.model) - .filter(self.model.weid == int(uniacid), self.model.id == gift_id) + .filter(self.model.weid == uniacid, self.model.id == gift_id) .first()) # $gift = pdo_fetch("SELECT * FROM ".tablename("goouc_fullexam_gift"). # " WHERE weid=:weid AND id=:gid", [":weid" = > $_W["uniacid"], ":gid" = > $val["giftid"]]); @@ -196,18 +187,18 @@ class CRUDPaper(CRUDBase[Paper, PaperCreate, PaperUpdate]): def get_paper(self, db: Session, paper_id: int): return self.get_by_field(db, "id", paper_id) - def get_franction_by_paperid(self, db: Session, uniacid: str, paperid: int) -> Optional[str]: + def get_franction_by_paperid(self, db: Session, uniacid: int, paperid: int) -> Optional[str]: result = (db.query(self.model.franction) - .filter(self.model.weid == int(uniacid), + .filter(self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.id == paperid) .first()) return result[0] if result else None - def get_page_list(self, db: Session, uniacid: str, page_index: int, page_size: int) -> Optional[List[Paper]]: + def get_page_list(self, db: Session, uniacid: int, page_index: int, page_size: int) -> Optional[List[Paper]]: return db.query(self.model).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1 ).order_by(self.model.displayorder.desc()).limit(page_size).offset( @@ -217,17 +208,17 @@ class CRUDPaper(CRUDBase[Paper, PaperCreate, PaperUpdate]): # " WHERE weid=:weid AND status=1 AND istatus=1 AND id=:paperid ", # [":weid" = > $_W["uniacid"], ":paperid" = > $paperid]); - def get_paper_by_paperid(self, db: Session, uniacid: str, paperid: int) -> Optional[Paper]: + def get_paper_by_paperid(self, db: Session, uniacid: int, paperid: int) -> Optional[Paper]: return (db.query(self.model) - .filter(self.model.weid == int(uniacid), + .filter(self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.id == paperid) .first()) - def get_total(self, db: Session, uniacid: str): + def get_total(self, db: Session, uniacid: int): return (db.query(self.model) - .filter(self.model.weid == int(uniacid), + .filter(self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1) .order_by(self.model.displayorder.desc()) @@ -295,11 +286,11 @@ class CRUDSetting(CRUDBase[Setting, SettingCreate, SettingUpdate]): def get_setting(self, db: Session, setting_id: int): return self.get_by_field(db, "id", setting_id) - def get_setting_by_weid(self, db: Session, uniacid: str): - return self.get_by_field(db, "weid", int(uniacid)) + def get_setting_by_weid(self, db: Session, uniacid: int): + return self.get_by_field(db, "weid", uniacid) - def get_about_us(self, db: Session, uniacid: str): - result = self.get_by_field(db, "weid", int(uniacid)) + def get_about_us(self, db: Session, uniacid: int): + result = self.get_by_field(db, "weid", uniacid) # 创建一个空字典 info = {} if result is not None: @@ -317,15 +308,15 @@ class CRUDSonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]): def get_son_simple(self, db: Session, son_simple_id: int): return self.get_by_field(db, "id", son_simple_id) - def get_type_list(self, db: Session, uniacid: str) -> Optional[List[SonSimple]]: + def get_type_list(self, db: Session, uniacid: int) -> Optional[List[SonSimple]]: # $type_list_t = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_son_simple"). # " WHERE weid = :weid ORDER BY id", [":weid" = > $_W["uniacid"]]); - results = db.query(self.model).filter(self.model.weid == int(uniacid)).order_by(self.model.id).all() + results = db.query(self.model).filter(self.model.weid == uniacid).order_by(self.model.id).all() return results if results is not None else [] - def get_type_list_page(self, db: Session, uniacid: str, pindex: int, psize: int) -> Optional[List[SonSimple]]: + def get_type_list_page(self, db: Session, uniacid: int, pindex: int, psize: int) -> Optional[List[SonSimple]]: results = db.query(self.model).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1 ).order_by(self.model.displayorder.desc()).limit(psize).offset( @@ -335,8 +326,8 @@ class CRUDSonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]): # ",". $psize, [":weid" = > $_W["uniacid"]]); return results if results is not None else [] - def get_total(self, db: Session, uniacid: str) -> Optional[List[SonSimple]]: - results = db.query(self.model).filter(self.model.weid == int(uniacid)).order_by(self.model.id.desc).all() + def get_total(self, db: Session, uniacid: int) -> Optional[List[SonSimple]]: + results = db.query(self.model).filter(self.model.weid == uniacid).order_by(self.model.id.desc).all() return results if results is not None else [] @@ -344,7 +335,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): def get_test(self, db: Session, test_id: int): return self.get_by_field(db, "test_id", test_id) - def get_question(self, db: Session, uniacid: str, id: int) -> Optional[Dict[str, Any]]: + def get_question(self, db: Session, uniacid: int, id: int) -> Optional[Dict[str, Any]]: query = db.query( self.model.id, @@ -360,7 +351,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.id == id, self.model.istatus == 1 ) @@ -375,7 +366,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): results_dict = dict(zip(column_names, results)) if results is not None else None return results_dict - def get_err_list_order_by_id(self, db: Session, uniacid: str, id: int) -> Optional[List[Dict]]: + def get_err_list_order_by_id(self, db: Session, uniacid: int, id: int) -> Optional[List[Dict]]: query = db.query( self.model.id, @@ -391,7 +382,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.id == id, self.model.istatus == 1 ) @@ -406,17 +397,17 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def get_pid_by_id(self, db: Session, uniacid: str, id: int) -> Optional[int]: + def get_pid_by_id(self, db: Session, uniacid: int, id: int) -> Optional[int]: result = db.query(self.model.pid).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.id == id ).first() return result[0] if result is not None else None - def get_test_count(self, db: Session, uniacid: str, pool_ids: List[int]) -> int: + def get_test_count(self, db: Session, uniacid: int, pool_ids: List[int]) -> int: total = db.query(self.model.id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.type != 0, self.model.display == 1, @@ -425,15 +416,15 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): ).count() return total if total is not None else 0 - def get_test_by_id_and_type(self, db: Session, uniacid: str, type: int, testid: int) -> Optional[int]: + def get_test_by_id_and_type(self, db: Session, uniacid: int, type: int, testid: int) -> Optional[int]: result = (db.query(self.model.id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.type == type, self.model.id == testid) .first()) return result[0] if result is not None else None - def mock_exam_get_question_list(self, db: Session, uniacid: str, pool: List[int], type: int, display: int, + def mock_exam_get_question_list(self, db: Session, uniacid: int, pool: List[int], type: int, display: int, num: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, @@ -449,7 +440,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.libraryid.in_(pool), self.model.istatus == 1, self.model.type == type, @@ -475,7 +466,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def mock_exam_get_question_list_type3(self, db: Session, uniacid: str, pool: List[int], type: int, display: int, + def mock_exam_get_question_list_type3(self, db: Session, uniacid: int, pool: List[int], type: int, display: int, num: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, @@ -491,7 +482,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.libraryid.in_(pool), self.model.istatus == 1, self.model.type == type, @@ -512,7 +503,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def get_qtye_question_list(self, db: Session, uniacid: str, istatus: int, type: int, display: int, lib_id: int): + def get_qtye_question_list(self, db: Session, uniacid: int, istatus: int, type: int, display: int, lib_id: int): query = db.query( self.model.id, @@ -528,7 +519,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == istatus, self.model.type == type, self.model.display == display, @@ -546,7 +537,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def get_randon_question_list(self, db: Session, uniacid: str, lib_id: int, type_max: int, istatus: int, + def get_randon_question_list(self, db: Session, uniacid: int, lib_id: int, type_max: int, istatus: int, display: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, @@ -562,7 +553,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == istatus, self.model.type != 0, self.model.type < type_max, @@ -582,7 +573,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def get_randon_question_list_batch(self, db: Session, uniacid: str, type: int, type_max: int, istatus: int, + def get_randon_question_list_batch(self, db: Session, uniacid: int, type: int, type_max: int, istatus: int, display: int, pool: List[int], limit: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, @@ -598,7 +589,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.type < type_max, self.model.libraryid.in_(pool), self.model.istatus == istatus, @@ -617,7 +608,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def get_question_list(self, db: Session, uniacid: str, lib_id: int, istatus: int, type_max: int, display: int) -> \ + def get_question_list(self, db: Session, uniacid: int, lib_id: int, istatus: int, type_max: int, display: int) -> \ Optional[List[Dict[str, Any]]]: # 构建查询 # 构建基础查询 @@ -635,7 +626,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == istatus, self.model.type != 0, self.model.display == display, @@ -654,7 +645,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict def get_question_list_with_limit(self, - db: Session, uniacid: str, lib_id: int, istatus: int, type_max: int, display: int, + db: Session, uniacid: int, lib_id: int, istatus: int, type_max: int, display: int, limit: int) -> Optional[List[Dict[str, Any]]]: # 构建查询 # 构建基础查询 @@ -672,7 +663,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == istatus, self.model.type != 0, self.model.type < type_max, @@ -690,7 +681,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def get_question_list_by_son_type(self, db: Session, uniacid: str, qtype: int, lib_id: int, istatus: int, + def get_question_list_by_son_type(self, db: Session, uniacid: int, qtype: int, lib_id: int, istatus: int, son_simple: int, display: int) -> \ Optional[List[Dict[str, Any]]]: # 构建查询 @@ -709,7 +700,7 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): self.model.aimage, self.model.analysis_audio ).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == istatus, self.model.son_simple == son_simple, self.model.type == qtype, @@ -727,10 +718,10 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): return results_dict - def get_q_have_by_type_and_lib_id(self, db: Session, uniacid: str, type: int, lib_id: int, istatus: int, + def get_q_have_by_type_and_lib_id(self, db: Session, uniacid: int, type: int, lib_id: int, istatus: int, son_status: int, display: int) -> Optional[int]: query = (db.query(self.model.id) - .filter(self.model.weid == int(uniacid)) + .filter(self.model.weid == uniacid) .filter(self.model.type == type) .filter(self.model.istatus == istatus) .filter(self.model.display == display) @@ -739,8 +730,8 @@ class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): ) return query.count() - def get_testson_by_pid(self, db: Session, uniacid: str, pid: int) -> Optional[List[Test]]: - results = db.query(self.model).filter(self.model.weid == int(uniacid), self.model.pid == pid).order_by( + def get_testson_by_pid(self, db: Session, uniacid: int, pid: int) -> Optional[List[Test]]: + results = db.query(self.model).filter(self.model.weid == uniacid, self.model.pid == pid).order_by( self.model.id).all() return results if results else [] @@ -749,9 +740,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): def get_test_type(self, db: Session, test_type_id: int): return self.get_by_field(db, "id", test_type_id) - def get_fpool(self, db: Session, uniacid: str) -> List[int]: + def get_fpool(self, db: Session, uniacid: int) -> List[int]: results = db.query(self.model.id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.price <= 0, @@ -760,9 +751,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): ).all() return [result[0] for result in results] if results else [] - def get_fpool_with_student_check(self, db: Session, uniacid: str) -> List[int]: + def get_fpool_with_student_check(self, db: Session, uniacid: int) -> List[int]: results = db.query(self.model.id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.price <= 0, @@ -772,9 +763,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): ).all() return [result[0] for result in results] if results else [] - def get_test_type_ids(self, db: Session, uniacid: str, pid: int) -> List[int]: + def get_test_type_ids(self, db: Session, uniacid: int, pid: int) -> List[int]: results = db.query(self.model.id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.pid == pid, self.model.istatus == 1, self.model.status == 1, @@ -782,9 +773,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): ).all() return [result[0] for result in results] if results else [] - def get_test_type_by_special_id(self, db: Session, uniacid: str, special_id: int) -> Optional[int]: + def get_test_type_by_special_id(self, db: Session, uniacid: int, special_id: int) -> Optional[int]: result = db.query(self.model.id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.id == special_id, self.model.istatus == 1, self.model.status == 1 @@ -793,9 +784,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): # "uniacid"], "id" = > $special_id, "istatus" = > 1, "status" = > 1], "pid"); return result[0] if result else None - def get_pool_by_id(self, db: Session, uniacid: str, pid: int) -> List[Dict[str, Any]]: + def get_pool_by_id(self, db: Session, uniacid: int, pid: int) -> List[Dict[str, Any]]: result = (db.query(self.model.id, self.model.name, self.model.price) - .filter(self.model.weid == int(uniacid)) + .filter(self.model.weid == uniacid) .filter(self.model.istatus == 1) .filter(self.model.status == 1) .filter(self.model.id == pid).first()) @@ -808,9 +799,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): else: return None - def get_all_special_if_is_student(self, db: Session, uniacid: str, uid: int): + def get_all_special_if_is_student(self, db: Session, uniacid: int, uid: int): results = db.query(self.model.id, self.model.name).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.is_student == 1, self.model.pid == 0, @@ -823,9 +814,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): return [{"id": result[0], "name": result[1]} for result in results] - def get_all_special_without_is_student(self, db: Session, uniacid: str, uid: int) -> Optional[List[Dict[str, Any]]]: + def get_all_special_without_is_student(self, db: Session, uniacid: int, uid: int) -> Optional[List[Dict[str, Any]]]: results = db.query(self.model.id, self.model.name).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == 1, self.model.is_student == 2, self.model.pid == 0, @@ -838,9 +829,9 @@ class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): return [{"id": result[0], "name": result[1]} for result in results] if results else None - def get_son_by_pid(self, db: Session, uniacid: str, pid: int, istatus: int, status: int): + def get_son_by_pid(self, db: Session, uniacid: int, pid: int, istatus: int, status: int): results = db.query(self.model.id, self.model.name).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.istatus == istatus, self.model.pid == pid, self.model.status == status @@ -871,9 +862,9 @@ class CRUDXueshen(CRUDBase[Xuesheng, XueshengCreate, XueshengUpdate]): def get_xueshen(self, db: Session, xueshen_id: int): return self.get_by_field(db, "xuesheng_id", xueshen_id) - def get_xuesheng_id(self, db: Session, uniacid: str, user_name: str, user_phone: str) -> Optional[int]: + def get_xuesheng_id(self, db: Session, uniacid: int, user_name: str, user_phone: str) -> Optional[int]: return (db.query(self.model.xuesheng_id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.user_name == user_name, self.model.user_phone == user_phone) .first()) diff --git a/mooc/crud/crud_goouc_fullexam_user.py b/mooc/crud/crud_goouc_fullexam_user.py index 07e4094..9bc80e8 100644 --- a/mooc/crud/crud_goouc_fullexam_user.py +++ b/mooc/crud/crud_goouc_fullexam_user.py @@ -66,10 +66,10 @@ class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]): def get_user(self, db: Session, user_id: int): return self.get_by_field(db, "id", user_id) - def get_user_info(self, db: Session, uid: int, uniacid: str) -> Optional[User]: - return db.query(self.model).filter(self.model.id == uid, self.model.weid == int(uniacid)).first() + def get_user_info(self, db: Session, uid: int, uniacid: int) -> Optional[User]: + return db.query(self.model).filter(self.model.id == uid, self.model.weid == uniacid).first() - def get_user_is_member(self, db: Session, uniacid: str, uid: int) -> Optional[int]: + def get_user_is_member(self, db: Session, uniacid: int, uid: int) -> Optional[int]: # $is_member = pdo_getcolumn(goouc_fullexam_user, # ["weid" = > $_W["uniacid"], "id" = > $uid], # "ismember"); @@ -84,7 +84,7 @@ class CRUDUserDoexam(CRUDBase[UserDoexam, UserDoexamCreate, UserDoexamUpdate]): def get_user_doexam(self, db: Session, user_doexam_id: int): return self.get_by_field(db, "id", user_doexam_id) - def get_quanzhen_highest(self, db: Session, uniacid: str, uid: int): + def get_quanzhen_highest(self, db: Session, uniacid: int, uid: int): result = (db.query(self.model) .filter_by(self.model.weid == uniacid) .filter(self.model.uid == uid) @@ -92,15 +92,15 @@ class CRUDUserDoexam(CRUDBase[UserDoexam, UserDoexamCreate, UserDoexamUpdate]): .first()) return result[0] if result else None - def get_user_do_number(self, db: Session, uniacid: str, uid: int, examid: int): + def get_user_do_number(self, db: Session, uniacid: int, uid: int, examid: int): return db.query(self.model).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid, self.model.examid == examid).count() - def get_user_recordid(self, db: Session, uniacid: str, uid: int, paperid: int): + def get_user_recordid(self, db: Session, uniacid: int, uid: int, paperid: int): result = db.query(self.model.recordid).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid, self.model.examid == paperid).order_by(self.model.createtime.desc()).limit(1).first() return result[0] if result else None @@ -116,10 +116,10 @@ class CRUDUserExamAnswer(CRUDBase[UserExamAnswer, UserExamAnswerCreate, UserExam def get_user_exam_answer(self, db: Session, user_exam_answer_id: int): return self.get_by_field(db, "id", user_exam_answer_id) - def get_user_exam_answer(self, db: Session, uniacid: str, uid: int, paperid: int, recordid: int) -> Optional[ + def get_user_exam_answer(self, db: Session, uniacid: int, uid: int, paperid: int, recordid: int) -> Optional[ List[Dict[str, Any]]]: query = (db.query(self.model.uid, self.model.examid, self.model.testid, self.model.test_type) - .filter(self.model.weid == int(uniacid)) + .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .filter(self.model.examid == paperid) .filter(self.model.recordid == recordid) @@ -137,17 +137,17 @@ class CRUDUserExamAnswer(CRUDBase[UserExamAnswer, UserExamAnswerCreate, UserExam return results_dict - def get_exam_answer_test_id(self, db: Session, uniacid: str, uid: int) -> Optional[List[int]]: + def get_exam_answer_test_id(self, db: Session, uniacid: int, uid: int) -> Optional[List[int]]: results = (db.query(self.model.testid) - .filter(self.model.weid == int(uniacid)) + .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .order_by(self.model.testid) .all()) return [result[0] for result in results] if results else [] - def get_do_answer(self, db: Session, uniacid: str, uid: int, paperid: int, testid: int) -> Optional[UserExamAnswer]: + def get_do_answer(self, db: Session, uniacid: int, uid: int, paperid: int, testid: int) -> Optional[UserExamAnswer]: return (db.query(self.model).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid, self.model.examid == paperid, self.model.testid == testid) @@ -163,9 +163,9 @@ class CRUDUserSpecial(CRUDBase[UserSpecial, UserSpecialCreate, UserSpecialUpdate def get_user_special(self, db: Session, user_special_id: int): return self.get_by_field(db, "id", user_special_id) - def get_last_id_by_special_id(self, db: Session, uniacid: str, uid: int, special_id: int) -> Optional[int]: + def get_last_id_by_special_id(self, db: Session, uniacid: int, uid: int, special_id: int) -> Optional[int]: result = (db.query(self.model.last_id) - .filter(self.model.weid == int(uniacid)) + .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .filter(self.model.special_id == special_id) .filter(self.model.istatus == 1) @@ -181,16 +181,16 @@ class CRUDUserSequence(CRUDBase[UserSequence, UserSpequenceCreate, UserSpequence def get_user_spequence(self, db: Session, user_spequence_id: int): return self.get_by_field(db, "id", user_spequence_id) - def get_last_id(self, db: Session, uid: int, uniacid: str): + def get_last_id(self, db: Session, uid: int, uniacid: int): result = (db.query(self.model.question_id) - .filter(self.model.weid == int(uniacid), + .filter(self.model.weid == uniacid, self.model.user_id == uid) .first()) return result[0] if result else None - def get_last_id_by_lib_id(self, db: Session, uid: int, uniacid: str, lib_id: int): + def get_last_id_by_lib_id(self, db: Session, uid: int, uniacid: int, lib_id: int): result = (db.query(self.model.question_id) - .filter(self.model.weid == int(uniacid), + .filter(self.model.weid == uniacid, self.model.user_id == uid, self.model.lib_id == lib_id) .first()) return result[0] if result else None @@ -206,9 +206,9 @@ class CRUDUserCollectionPraction( def get_user_collection_praction(self, db: Session, user_collection_praction_id: int): return self.get_by_field(db, "id", user_collection_praction_id) - def get_is_collect(self, db: Session, uniacid: str, uid: int, test_id: int, test_type: int): + def get_is_collect(self, db: Session, uniacid: int, uid: int, test_id: int, test_type: int): result = (db.query(self.model.iscollect) - .filter(self.model.weid == int(uniacid), + .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.testid == test_id, self.model.test_type == test_type) @@ -218,9 +218,9 @@ class CRUDUserCollectionPraction( # ["weid" = > $_W["uniacid"], "uid" = > $uid, "testid" = > $val["id"], "test_type" = > $val["type"]], # "id"); - def get_id_list(self, db: Session, uniacid: str, uid: int, istatus: int): + def get_id_list(self, db: Session, uniacid: int, uid: int, istatus: int): query = (db.query(self.model.testid, self.model.test_type) - .filter(self.model.weid == int(uniacid), + .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == istatus) .order_by(self.model.id) @@ -234,7 +234,7 @@ class CRUDUserGift(CRUDBase[UserGift, UserGiftCreate, UserGiftUpdate]): def get_user_gift(self, db: Session, user_gift_id: int): return self.get_by_field(db, "id", user_gift_id) - def get_user_gift_list(self, db: Session, uniacid: str, uid: str) -> Optional[List[Tuple[int, int, int]]]: + def get_user_gift_list(self, db: Session, uniacid: int, uid: str) -> Optional[List[Tuple[int, int, int]]]: # [ giftid , createtime , status # (1, 1634567890, 1), # (2, 1634567900, 0) @@ -266,7 +266,7 @@ class CRUDUserDoOtherExam(CRUDBase[UserDootherExam, UserDoOtherExamCreate, UserD def get_user_doother_exam(self, db: Session, user_doother_exam_id: int): return self.get_by_field(db, "id", user_doother_exam_id) - def get_user_other_highest(self, db: Session, uniacid: str, uid: int): + def get_user_other_highest(self, db: Session, uid: int ,uniacid: int): return (db.query(self.model) .filter_by(self.model.weid == uniacid) .filter(self.model.uid == uid) @@ -278,9 +278,9 @@ class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, def get_user_wrong_praction(self, db: Session, user_wrong_praction_id: int): return self.get_by_field(db, "id", user_wrong_praction_id) - def get_err_id_list_order_by_createtime(self, db: Session, uniacid: str, uid: int): + def get_err_id_list_order_by_createtime(self, db: Session, uniacid: int, uid: int): results = (db.query(self.model.testid, self.model.test_type) - .filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1) + .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1) .order_by(self.model.createtime) .all()) # 转换成字典 @@ -289,9 +289,9 @@ class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, "test_type": result[1] } for result in results] if results else None - def get_id_list_order_by_id(self, db: Session, uniacid: str, uid: int, test_type: int): + def get_id_list_order_by_id(self, db: Session, uniacid: int, uid: int, test_type: int): results = (db.query(self.model.testid, self.model.test_type) - .filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1) + .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1) .order_by(self.model.id) .all()) # 转换成字典 @@ -300,9 +300,9 @@ class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, "test_type": result[1] } for result in results] if results else None - def get_id_list_with_time(self, db: Session, uniacid: str, uid: int, start: int): + def get_id_list_with_time(self, db: Session, uniacid: int, uid: int, start: int): results = (db.query(self.model.testid, self.model.test_type) - .filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1, + .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1, self.model.createtime > start) .order_by(self.model.id) .all()) @@ -315,15 +315,15 @@ class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]): - def get_user_pool(self, db: Session, user_pool_id: int): - return self.get_by_field(db, "id", user_pool_id) + # def get_user_pool(self, db: Session, user_pool_id: int): + # return self.get_by_field(db, "id", user_pool_id) - def get_kid_id_by_paperid(self, db: Session, uniacid: str, uid: int, paperid: int): + def get_kid_id_by_paperid(self, db: Session, uniacid: int, uid: int, paperid: int): result = (db.query(self.model.id).filter(self.model.weid == uniacid, self.model.uid == uid, self.model.paperid == paperid, self.model.istatus == 1).first()) return result[0] if result else None - def get_user_pool(self, db: Session, uniacid: str, uid: int) -> List[Tuple[int, int]]: + def get_user_pool(self, db: Session, uniacid: int, uid: int) -> List[Tuple[int, int]]: results = db.query(UserPool.id, UserPool.poolid).join( # 左连接 TestType, @@ -343,9 +343,9 @@ class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]): # WHERE up.weid=:weid AND up.uid=:uid AND tt.istatus = 1 AND tt.status = 1 # [":weid" => $_W["uniacid"], ":uid" => $uid]); - def get_kid_id_by_poolid(self, db: Session, uniacid: str, uid: int, poolid: int): + def get_kid_id_by_poolid(self, db: Session, uniacid: int, uid: int, poolid: int): result = (db.query(self.model.id) - .filter(self.model.weid == int(uniacid)) + .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .filter(self.model.poolid == poolid) .filter(self.model.istatus == 1) @@ -359,10 +359,10 @@ class CRUDUserQtype(CRUDBase[UserQtype, UserQTypeCreate, UserQTypeUpdate]): def get_user_qype(self, db: Session, user_qtype_id: int): return self.get_by_field(db, "id", user_qtype_id) - def get_last_id_by_type_id_and_uid(self, db: Session, uniacid: str, uid: int, type_id: int, istatus: int) -> \ + def get_last_id_by_type_id_and_uid(self, db: Session, uniacid: int, uid: int, type_id: int, istatus: int) -> \ Optional[int]: results = (db.query(self.model.last_id).filter( - self.model.weid == int(uniacid), + self.model.weid == uniacid, self.model.uid == uid, self.model.type_id == type_id, self.model.istatus == istatus)