from typing import Optional, List, Tuple, Dict, Any from sqlalchemy.orm import Session from sqlalchemy import select, func from mooc.crud.crud_base import CRUDBase from mooc.models.goouc_fullexam import (Advert, Banji, Banner, Category, Cdkey, CdkeyCate, Cdkeys, Exercise, Feedback, Gift, IndexBtn, Knowledge, KnowledgeCate, Notice, Order, Paper, PaperTest, Phonecode, QYear, School, Setting, ShareRecord, SonSimple, Test, TestType, TypeCate, Watermark, Wxtpl, Xuesheng ) from mooc.schemas.goouc_fullexam import (AdvertUpdate, AdvertCreate, BanjiUpdate, BanjiCreate, BannerCreate, BannerUpdate, CategoryCreate, CategoryUpdate, CdkeyCreate, CdkeyUpdate, CdkeyCateCreate, CdkeyCateUpdate, IndexBtnCreate, IndexBtnUpdate, GiftCreate, GiftUpdate, CdkeysCreate, CdkeysUpdate, ExerciseCreate, ExerciseUpdate, FeedbackCreate, FeedbackUpdate, TypeCateCreate, TypeCateUpdate, KnowledgeCreate, KnowledgeUpdate, KnowledgeCateCreate, KnowledgeCateUpdate, NoticeCreate, NoticeUpdate, OrderCreate, OrderUpdate, PaperCreate, PaperUpdate, PaperTestCreate, PaperTestUpdate, PhoneCodeUpdate, PhoneCodeCreate, QYearCreate, QYearUpdate, SchoolCreate, SchoolUpdate, SettingCreate, SettingUpdate, ShareRecordCreate, ShareRecordUpdate, SonSimpleCreate, SonSimpleUpdate, TestCreate, TestUpdate, TestTypeCreate, TestTypeUpdate, WatermarkCreate, WatermarkUpdate, WxTplCreate, WxTplUpdate, XueshengCreate, XueshengUpdate ) class CRUDAdvert(CRUDBase[Advert, AdvertCreate, AdvertUpdate]): def get_advert(self, db: Session, admin_id: int) -> Optional[Advert]: return self.get_by_field(db, "id", admin_id) class CRUDBanji(CRUDBase[Banji, BanjiCreate, BanjiUpdate]): def get_banji(self, db: Session, banji_id: int) -> Optional[Banji]: return self.get_by_field(db, "banji_id", banji_id) class CRUDBanner(CRUDBase[Banner, BannerCreate, BannerUpdate]): def get_banner(self, db: Session, banner_id: int): return self.get_by_field(db, "id", banner_id) class CRUDCategory(CRUDBase[Category, CategoryCreate, CategoryUpdate]): def get_category(self, db: Session, category_id: int): return self.get_by_field(db, "id", category_id) class CRUDCdkey(CRUDBase[Cdkey, CdkeyCreate, CdkeyUpdate]): def get_cdkey(self, db: Session, cdkey_id: int): return self.get_by_field(db, "id", cdkey_id) # $cdkeys = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_cdkey"). # " WHERE # weid =:weid # uid=:uid # status =2 # display=1 # endtime > unix_timestamp(now()) # kpool > :kpool ", [":weid" = > $_W["uniacid"], "uid" = > $uid, ":kpool" = > $now_time]); def get_cdkey_list(self, db: Session, uniacid: int, uid: int, now_time: str) -> Optional[List[Cdkey]]: cdkeys = db.query(self.model).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.status == 2, self.model.display == 1, self.model.endtime > db.func.unix_timestamp(db.func.now()), self.model.kpool > now_time ).all() return cdkeys def get_user_active_with_paperid(self, db: Session, uniacid: int, uid: int, now_time: str, cdkeys_code_id_list: List[int]) -> Optional[ Cdkey]: return (db.query(self.model).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.status == 2, self.model.display == 1, self.model.endtime > func.unix_timestamp(func.now()), self.model.kpool > now_time, self.model.cid.in_(cdkeys_code_id_list) ).first()) def get_user_active(self, db: Session, uniacid: int, uid: int, now_time: str, cdkeys_code_id_list: List[int]) -> \ Optional[ Cdkey]: return (db.query(self.model).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.status == 2, self.model.display == 1, self.model.endtime > func.unix_timestamp(func.now()), self.model.kpool > now_time, self.model.cid.in_(cdkeys_code_id_list) ).first()) class CRUDCdkeyCate(CRUDBase[CdkeyCate, CdkeyCateCreate, CdkeyCateUpdate]): def get_cdkey_cate(self, db: Session, cdkey_cate_id: int): return self.get_by_field(db, "id", cdkey_cate_id) class CRUDCdkeys(CRUDBase[Cdkeys, CdkeysCreate, CdkeysUpdate]): def get_cdkeys(self, db: Session, cdkeys_id: int): return self.get_by_field(db, "id", cdkeys_id) def get_all_cdkeys_code_ids(self, db: Session, uniacid: int, kpool_id: int, type: int) -> List[int]: results = (db.query(self.model.code_id).filter( self.model.weid == uniacid, self.model.kpool_id == kpool_id, self.model.type == type) .all()) return [result[0] for result in results] if results is not None else [] # $pools = pdo_getall("goouc_fullexam_cdkeys", # ["weid" => $_W["uniacid"], "code_id" => $cdkey["cid"], "type" => 2], "kpool_id"); def get_all_kpool_id(self, db: Session, uniacid: int, cid: int) -> Optional[List[int]]: pools = (db.query(self.model.kpool_id) .filter(self.model.weid == uniacid, self.model.code_id == cid, self.model.type == 2 ).all()) return [pool[0] for pool in pools] class CRUDExercise(CRUDBase[Exercise, ExerciseCreate, ExerciseUpdate]): def get_exercise(self, db: Session, exercise_id: int): return self.get_by_field(db, "id", exercise_id) def get_testid_list(self, db: Session, uniacid: int, uid: int): results = db.query(self.model.testid).filter( self.model.weid == uniacid, self.model.uid == uid ).all() return [result[0] for result in results] def get_exercise_have_do(self, db: Session, uniacid: int, uid: int, testid: int): results = db.query(self.model.id, self.model.testid, self.model.test_type).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.testid == testid).first() return results if results else None class CRUDFeedback(CRUDBase[Feedback, FeedbackCreate, FeedbackUpdate]): def get_feedback(self, db: Session, feedback_id: int): return self.get_by_field(db, "id", feedback_id) class CRUDGift(CRUDBase[Gift, GiftCreate, GiftUpdate]): def get_gift(self, db: Session, gift_id: int): return self.get_by_field(db, "id", gift_id) def get_gift_by_id(self, db: Session, gift_id: int, uniacid: int) -> Optional[Gift]: return (db.query(self.model) .filter(self.model.weid == uniacid, self.model.id == gift_id) .first()) # $gift = pdo_fetch("SELECT * FROM ".tablename("goouc_fullexam_gift"). # " WHERE weid=:weid AND id=:gid", [":weid" = > $_W["uniacid"], ":gid" = > $val["giftid"]]); class CRUDIndexBtn(CRUDBase[IndexBtn, IndexBtnCreate, IndexBtnUpdate]): def get_index_btn(self, db: Session, index_btn_id: int): return self.get_by_field(db, "id", index_btn_id) class CRUDKnowledge(CRUDBase[Knowledge, KnowledgeCreate, KnowledgeUpdate]): def get_knowledge(self, db: Session, knowledge_id: int): return self.get_by_field(db, "knowledge_id", knowledge_id) class CRUDKnowledgeCate(CRUDBase[KnowledgeCate, KnowledgeCateCreate, KnowledgeCateUpdate]): def get_knowledge_cate(self, db: Session, knowledge_cate_id: int): return self.get_by_field(db, "id", knowledge_cate_id) class CRUDNotice(CRUDBase[Notice, NoticeCreate, NoticeUpdate]): def get_notice(self, db: Session, notice_id: int): return self.get_by_field(db, "id", notice_id) class CRUDOrder(CRUDBase[Order, OrderCreate, OrderUpdate]): def get_order(self, db: Session, order_id: int): return self.get_by_field(db, "id", order_id) class CRUDPaper(CRUDBase[Paper, PaperCreate, PaperUpdate]): def get_paper(self, db: Session, paper_id: int): return self.get_by_field(db, "id", paper_id) def get_franction_by_paperid(self, db: Session, uniacid: int, paperid: int) -> Optional[str]: result = (db.query(self.model.franction) .filter(self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.id == paperid) .first()) return result[0] if result else None def get_page_list(self, db: Session, uniacid: int, page_index: int, page_size: int) -> Optional[List[Paper]]: return db.query(self.model).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1 ).order_by(self.model.displayorder.desc()).limit(page_size).offset( (page_index - 1) * page_size).all() # $paper = pdo_fetch("SELECT id,title,total_franction FROM ".tablename("goouc_fullexam_paper"). # " WHERE weid=:weid AND status=1 AND istatus=1 AND id=:paperid ", # [":weid" = > $_W["uniacid"], ":paperid" = > $paperid]); def get_paper_by_paperid(self, db: Session, uniacid: int, paperid: int) -> Optional[Paper]: return (db.query(self.model) .filter(self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.id == paperid) .first()) def get_total(self, db: Session, uniacid: int): return (db.query(self.model) .filter(self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1) .order_by(self.model.displayorder.desc()) .count()) class CRUDPaperTest(CRUDBase[PaperTest, PaperTestCreate, PaperTestUpdate]): def get_paper_test(self, db: Session, paper_test_id: int): return self.get_by_field(db, "id", paper_test_id) def get_qnum(self, db: Session, paper_test_id: int) -> Optional[int]: return (db.query(self.model) .filter(self.model.paperid == paper_test_id) .filter(self.model.istatus == 1) .filter(self.model.status == 1) .count()) def get_all_test_id(self, db: Session, paperid: int) -> Optional[List[int]]: return db.query(self.model.testid).filter(self.model.paperid == paperid).all() # pdo_fetchall("SELECT # q.id as id,q.type,q.question,q.qimage,q.qvideo,q.qaudio,q.a_type,q.option,q.rightkey,q.analysis,q.aimage,q.analysis_audio # tablename("goouc_fullexam_paper_test") . " AS pt JOIN " . tablename("goouc_fullexam_test") . " # AS q ON pt.testid=q.id WHERE pt.paperid=:paperid AND q.istatus=1 ", [":paperid" => $paper["id"]]); def get_questions_by_paperid(self, db: Session, paperid: int): questions = ( db.query( Test.id.label('id'), Test.type, Test.question, Test.qimage, Test.qvideo, Test.qaudio, Test.a_type, Test.option, Test.rightkey, Test.analysis, Test.aimage, Test.analysis_audio ) .join(PaperTest, Test.id == PaperTest.testid) .filter(PaperTest.paperid == paperid, Test.istatus == 1) .all() ) return questions class CRUDPhonecode(CRUDBase[Phonecode, PhoneCodeCreate, PhoneCodeUpdate]): def get_phonecode(self, db: Session, phonecode_id: int): return self.get_by_field(db, "id", phonecode_id) class CRUDQYear(CRUDBase[QYear, QYearCreate, QYearUpdate]): def get_qyear(self, db: Session, qyear_id: int): return self.get_by_field(db, "id", qyear_id) class CRUDSchool(CRUDBase[School, SchoolCreate, SchoolUpdate]): def get_school(self, db: Session, school_id: int): return self.get_by_field(db, "school_id", school_id) class CRUDSetting(CRUDBase[Setting, SettingCreate, SettingUpdate]): def get_setting(self, db: Session, setting_id: int): return self.get_by_field(db, "id", setting_id) def get_setting_by_weid(self, db: Session, uniacid: int): return self.get_by_field(db, "weid", uniacid) def get_about_us(self, db: Session, uniacid: int): result = self.get_by_field(db, "weid", uniacid) # 创建一个空字典 info = {} if result is not None: info["id"] = result.id info["about"] = result.about return info class CRUDShareRecord(CRUDBase[ShareRecord, ShareRecordCreate, ShareRecordUpdate]): def get_share_record(self, db: Session, share_record_id: int): return self.get_by_field(db, "id", share_record_id) class CRUDSonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]): def get_son_simple(self, db: Session, son_simple_id: int): return self.get_by_field(db, "id", son_simple_id) def get_type_list(self, db: Session, uniacid: int) -> Optional[List[SonSimple]]: # $type_list_t = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_son_simple"). # " WHERE weid = :weid ORDER BY id", [":weid" = > $_W["uniacid"]]); results = db.query(self.model).filter(self.model.weid == uniacid).order_by(self.model.id).all() return results if results is not None else [] def get_type_list_page(self, db: Session, uniacid: int, pindex: int, psize: int) -> Optional[List[SonSimple]]: results = db.query(self.model).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1 ).order_by(self.model.displayorder.desc()).limit(psize).offset( (pindex - 1) * psize).all() # $type_list_t = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_son_simple"). # " WHERE weid = :weid ORDER BY id DESC LIMIT ".($pindex - 1) * $psize. # ",". $psize, [":weid" = > $_W["uniacid"]]); return results if results is not None else [] def get_total(self, db: Session, uniacid: int) -> Optional[List[SonSimple]]: results = db.query(self.model).filter(self.model.weid == uniacid).order_by(self.model.id.desc).all() return results if results is not None else [] class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): def get_test(self, db: Session, test_id: int): return self.get_by_field(db, "test_id", test_id) def get_question(self, db: Session, uniacid: int, id: int) -> Optional[Dict[str, Any]]: query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.id == id, self.model.istatus == 1 ) # 只选择指定字段 # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.first() results_dict = dict(zip(column_names, results)) if results is not None else None return results_dict def get_err_list_order_by_id(self, db: Session, uniacid: int, id: int) -> Optional[List[Dict]]: query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.id == id, self.model.istatus == 1 ) # 只选择指定字段 # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.order_by(self.model.id).all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_pid_by_id(self, db: Session, uniacid: int, id: int) -> Optional[int]: result = db.query(self.model.pid).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.id == id ).first() return result[0] if result is not None else None def get_test_count(self, db: Session, uniacid: int, pool_ids: List[int]) -> int: total = db.query(self.model.id).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.type != 0, self.model.display == 1, self.model.type < 9, self.model.libraryid.in_(pool_ids) ).count() return total if total is not None else 0 def get_test_by_id_and_type(self, db: Session, uniacid: int, type: int, testid: int) -> Optional[int]: result = (db.query(self.model.id).filter( self.model.weid == uniacid, self.model.type == type, self.model.id == testid) .first()) return result[0] if result is not None else None def mock_exam_get_question_list(self, db: Session, uniacid: int, pool: List[int], type: int, display: int, num: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.libraryid.in_(pool), self.model.istatus == 1, self.model.type == type, self.model.display == display, self.model.type < 8 ) # 添加动态条件 if type == 5: query = query.filter(self.model.son_status == 1) if num <= 0: raise ValueError("Invalid limit value") # 只选择指定字段 # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.order_by(func.rand()).limit(num).all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def mock_exam_get_question_list_type3(self, db: Session, uniacid: int, pool: List[int], type: int, display: int, num: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.libraryid.in_(pool), self.model.istatus == 1, self.model.type == type, self.model.display == display, ).order_by(func.rand()).limit(num).all() if num <= 0: raise ValueError("Invalid limit value") # 只选择指定字段 # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.order_by(func.rand()).limit(num).all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_qtye_question_list(self, db: Session, uniacid: int, istatus: int, type: int, display: int, lib_id: int): query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.istatus == istatus, self.model.type == type, self.model.display == display, self.model.libraryid == lib_id ).order_by(self.model.id) # 只选择指定字段 # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_randon_question_list(self, db: Session, uniacid: int, lib_id: int, type_max: int, istatus: int, display: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.istatus == istatus, self.model.type != 0, self.model.type < type_max, self.model.display == display, self.model.libraryid == lib_id ).order_by(func.rand()) # 只选择指定字段 # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_randon_question_list_batch(self, db: Session, uniacid: int, type: int, type_max: int, istatus: int, display: int, pool: List[int], limit: int) -> Optional[List[Dict[str, Any]]]: query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.type < type_max, self.model.libraryid.in_(pool), self.model.istatus == istatus, self.model.type == type, self.model.display == display ).order_by(func.rand()).limit(limit) # 只选择指定字段 # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_question_list(self, db: Session, uniacid: int, lib_id: int, istatus: int, type_max: int, display: int) -> \ Optional[List[Dict[str, Any]]]: # 构建查询 # 构建基础查询 query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.istatus == istatus, self.model.type != 0, self.model.display == display, self.model.type < type_max, self.model.libraryid == lib_id ).order_by(self.model.id) # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_question_list_with_limit(self, db: Session, uniacid: int, lib_id: int, istatus: int, type_max: int, display: int, limit: int) -> Optional[List[Dict[str, Any]]]: # 构建查询 # 构建基础查询 query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.istatus == istatus, self.model.type != 0, self.model.type < type_max, self.model.display == display, self.model.libraryid == lib_id ).order_by(self.model.id).limit(limit) # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_question_list_by_son_type(self, db: Session, uniacid: int, qtype: int, lib_id: int, istatus: int, son_simple: int, display: int) -> \ Optional[List[Dict[str, Any]]]: # 构建查询 # 构建基础查询 query = db.query( self.model.id, self.model.type, self.model.question, self.model.qimage, self.model.qvideo, self.model.qaudio, self.model.a_type, self.model.option, self.model.rightkey, self.model.analysis, self.model.aimage, self.model.analysis_audio ).filter( self.model.weid == uniacid, self.model.istatus == istatus, self.model.son_simple == son_simple, self.model.type == qtype, self.model.display == display, self.model.libraryid == lib_id ).order_by(self.model.id) # 将result元组转化为字典返回 column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey", "analysis", "aimage", "analysis_audio"] # 执行查询并转换结果为字典 results = query.all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_q_have_by_type_and_lib_id(self, db: Session, uniacid: int, type: int, lib_id: int, istatus: int, son_status: int, display: int) -> Optional[int]: query = (db.query(self.model.id) .filter(self.model.weid == uniacid) .filter(self.model.type == type) .filter(self.model.istatus == istatus) .filter(self.model.display == display) .filter(self.model.son_status == son_status) .filter(self.model.libraryid == lib_id) ) return query.count() def get_testson_by_pid(self, db: Session, uniacid: int, pid: int) -> Optional[List[Test]]: results = db.query(self.model).filter(self.model.weid == uniacid, self.model.pid == pid).order_by( self.model.id).all() return results if results else [] class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): def get_test_type(self, db: Session, test_type_id: int): return self.get_by_field(db, "id", test_type_id) def get_fpool(self, db: Session, uniacid: int) -> List[int]: results = db.query(self.model.id).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.price <= 0, self.model.pid != 0, self.model.gpid == 0 ).all() return [result[0] for result in results] if results else [] def get_fpool_with_student_check(self, db: Session, uniacid: int) -> List[int]: results = db.query(self.model.id).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.status == 1, self.model.price <= 0, self.model.is_student != 1, self.model.pid != 0, self.model.gpid == 0 ).all() return [result[0] for result in results] if results else [] def get_test_type_ids(self, db: Session, uniacid: int, pid: int) -> List[int]: results = db.query(self.model.id).filter( self.model.weid == uniacid, self.model.pid == pid, self.model.istatus == 1, self.model.status == 1, self.model.gpid != 0 ).all() return [result[0] for result in results] if results else [] def get_test_type_by_special_id(self, db: Session, uniacid: int, special_id: int) -> Optional[int]: result = db.query(self.model.id).filter( self.model.weid == uniacid, self.model.id == special_id, self.model.istatus == 1, self.model.status == 1 ).first() # $pid = pdo_getcolumn("goouc_fullexam_test_type", ["weid" = > $_W[ # "uniacid"], "id" = > $special_id, "istatus" = > 1, "status" = > 1], "pid"); return result[0] if result else None def get_pool_by_id(self, db: Session, uniacid: int, pid: int) -> List[Dict[str, Any]]: result = (db.query(self.model.id, self.model.name, self.model.price) .filter(self.model.weid == uniacid) .filter(self.model.istatus == 1) .filter(self.model.status == 1) .filter(self.model.id == pid).first()) # $pool = pdo_fetchall("SELECT id,name,price FROM ".tablename("goouc_fullexam_test_type"). # " WHERE weid=:weid AND istatus = 1 AND status = 1 AND id=:id ", [":weid" = > $_W["uniacid"], # ":id" = > $pid]); if result is not None: # 将result从元组转化为字典返回 return {"id": result[0], "name": result[1], "price": result[2]} else: return None def get_all_special_if_is_student(self, db: Session, uniacid: int, uid: int): results = db.query(self.model.id, self.model.name).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.is_student == 1, self.model.pid == 0, self.model.status == 1 ).order_by(self.model.id).all() # pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type"). # " WHERE weid=:weid AND istatus=1 AND pid = 0 AND status = 1 ORDER BY id ASC ", # [":weid" = > $_W["uniacid"]]); return [{"id": result[0], "name": result[1]} for result in results] def get_all_special_without_is_student(self, db: Session, uniacid: int, uid: int) -> Optional[List[Dict[str, Any]]]: results = db.query(self.model.id, self.model.name).filter( self.model.weid == uniacid, self.model.istatus == 1, self.model.is_student == 2, self.model.pid == 0, self.model.status == 1 ).order_by(self.model.id).all() # $list = pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type"). # " WHERE weid=:weid AND istatus=1 AND is_student = 2 AND pid = 0 AND status = 1 ORDER BY id ASC ", # [":weid" = > $_W["uniacid"]]); return [{"id": result[0], "name": result[1]} for result in results] if results else None def get_son_by_pid(self, db: Session, uniacid: int, pid: int, istatus: int, status: int): results = db.query(self.model.id, self.model.name).filter( self.model.weid == uniacid, self.model.istatus == istatus, self.model.pid == pid, self.model.status == status ).order_by(self.model.id).all() # pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type"). # " WHERE weid=:weid AND istatus=1 AND pid=:pid AND status = 1 ORDER BY id ASC ", # [":weid" = > $_W["uniacid"], ":pid" = > $v["id"]]); return [{"id": result[0], "name": result[1]} for result in results] if results else None class CRUDTypeCate(CRUDBase[TypeCate, TypeCateCreate, TypeCateUpdate]): def get_type_cate(self, db: Session, type_cate_id: int): return self.get_by_field(db, "id", type_cate_id) class CRUDWatermark(CRUDBase[Watermark, WatermarkCreate, WatermarkUpdate]): def get_watermark(self, db: Session, watermark_id: int): return self.get_by_field(db, "id", watermark_id) class CRUDWxTpl(CRUDBase[Wxtpl, WxTplCreate, WxTplUpdate]): def get_wx_tpl(self, db: Session, wx_tpl_id: int): return self.get_by_field(db, "id", wx_tpl_id) class CRUDXueshen(CRUDBase[Xuesheng, XueshengCreate, XueshengUpdate]): def get_xueshen(self, db: Session, xueshen_id: int): return self.get_by_field(db, "xuesheng_id", xueshen_id) def get_xuesheng_id(self, db: Session, uniacid: int, user_name: str, user_phone: str) -> Optional[int]: return (db.query(self.model.xuesheng_id).filter( self.model.weid == uniacid, self.model.user_name == user_name, self.model.user_phone == user_phone) .first()) advert = CRUDAdvert(Advert) banji = CRUDBase(Banji) banner = CRUDBanner(Banner) category = CRUDCategory(Category) cdkey = CRUDCdkey(Cdkey) cdkey_cate = CRUDCdkeyCate(CdkeyCate) cdkeys = CRUDCdkeys(Cdkeys) exercise = CRUDExercise(Exercise) feedback = CRUDFeedback(Feedback) gift = CRUDGift(Gift) index_btn = CRUDIndexBtn(IndexBtn) knowledge = CRUDKnowledge(Knowledge) knowledge_cate = CRUDKnowledgeCate(KnowledgeCate) notice = CRUDNotice(Notice) order = CRUDOrder(Order) paper = CRUDPaper(Paper) paper_test = CRUDPaperTest(PaperTest) phone_code = CRUDPhonecode(Phonecode) q_year = CRUDSchool(School) school = CRUDSchool(School) setting = CRUDSetting(Setting) share_record = CRUDShareRecord(ShareRecord) son_simple = CRUDSonSimple(SonSimple) test = CRUDTest(Test) test_type = CRUDTestType(TestType) type_cate = CRUDTypeCate(TypeCate) watermark = CRUDWatermark(Watermark) wx_tpl = CRUDWxTpl(Wxtpl) xueshen = CRUDXueshen(Xuesheng)