from typing import Optional, Tuple, List, Dict, Any from sqlalchemy.orm import Session from mooc.crud.crud_base import CRUDBase from mooc.models.goouc_fullexam_user import ( # 导入全部 User, UserDoexam, UserDoOtherExamAnswer, UserExamAnswer, UserSpecial, UserSequence, UserMember, UserCollectionPraction, UserGift, UserQhigh, UserQintensive, UserRead, UserDootherExam, UserWrongPraction, UserPool, UserQtype ) from mooc.schemas.goouc_fullexam_user import ( # 导入全部create和update UserCreate, UserUpdate, UserDoexamCreate, UserDoexamUpdate, UserDoOtherExamAnswerCreate, UserDoOtherExamAnswerUpdate, UserExamAnswerCreate, UserExamAnswerUpdate, UserSpecialCreate, UserSpecialUpdate, UserSpequenceCreate, UserSpequenceUpdate, UserMemberCreate, UserMemberUpdate, UserCollectionPractionCreate, UserCollectionPractionUpdate, UserGiftCreate, UserGiftUpdate, UserQHighCreate, UserQHighUpdate, UserQIntensiveCreate, UserQIntensiveUpdate, UserReadCreate, UserReadUpdate, UserDoOtherExamCreate, UserDoOtherExamUpdate, UserWrongPractionCreate, UserWrongPractionUpdate, UserPoolCreate, UserPoolUpdate, UserQTypeCreate, UserQTypeUpdate, ) from mooc.models.goouc_fullexam import ( TestType ) class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]): def get_user(self, db: Session, user_id: int): return self.get_by_field(db, "id", user_id) def get_user_info(self, db: Session, uid: int, uniacid: int) -> Optional[User]: return db.query(self.model).filter(self.model.id == uid, self.model.weid == uniacid).first() def get_user_is_member(self, db: Session, uniacid: int, uid: int) -> Optional[int]: # $is_member = pdo_getcolumn(goouc_fullexam_user, # ["weid" = > $_W["uniacid"], "id" = > $uid], # "ismember"); result = (db.query(self.model.ismember). filter(self.model.weid == uniacid, self.model.id == uid) .first()) return result[0] if result else None class CRUDUserDoexam(CRUDBase[UserDoexam, UserDoexamCreate, UserDoexamUpdate]): def get_user_doexam(self, db: Session, user_doexam_id: int): return self.get_by_field(db, "id", user_doexam_id) def get_quanzhen_highest(self, db: Session, uniacid: int, uid: int): result = (db.query(self.model) .filter_by(self.model.weid == uniacid) .filter(self.model.uid == uid) .order_by(self.model.franction.desc()) .first()) return result[0] if result else None def get_user_do_number(self, db: Session, uniacid: int, uid: int, examid: int): return db.query(self.model).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.examid == examid).count() def get_user_recordid(self, db: Session, uniacid: int, uid: int, paperid: int): result = db.query(self.model.recordid).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.examid == paperid).order_by(self.model.createtime.desc()).limit(1).first() return result[0] if result else None class CRUDUserDoOtherExamAnswer( CRUDBase[UserDoOtherExamAnswer, UserDoOtherExamAnswerCreate, UserDoOtherExamAnswerUpdate]): def get_user_doother_exam_answer(self, db: Session, user_doother_exam_answer_id: int): return self.get_by_field(db, "id", user_doother_exam_answer_id) class CRUDUserExamAnswer(CRUDBase[UserExamAnswer, UserExamAnswerCreate, UserExamAnswerUpdate]): def get_user_exam_answer(self, db: Session, user_exam_answer_id: int): return self.get_by_field(db, "id", user_exam_answer_id) def get_user_exam_answer(self, db: Session, uniacid: int, uid: int, paperid: int, recordid: int) -> Optional[ List[Dict[str, Any]]]: query = (db.query(self.model.uid, self.model.examid, self.model.testid, self.model.test_type) .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .filter(self.model.examid == paperid) .filter(self.model.recordid == recordid) .filter(self.model.test_type != 0) .filter(self.model.isright == 0) .order_by(self.model.testid) ) # 将result元组转化为字典返回 column_names = ["uid", "examid", "testid", "test_type"] # 执行查询并转换结果为字典 results = query.all() results_dict = [dict(zip(column_names, result)) for result in results] return results_dict def get_exam_answer_test_id(self, db: Session, uniacid: int, uid: int) -> Optional[List[int]]: results = (db.query(self.model.testid) .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .order_by(self.model.testid) .all()) return [result[0] for result in results] if results else [] def get_do_answer(self, db: Session, uniacid: int, uid: int, paperid: int, testid: int) -> Optional[UserExamAnswer]: return (db.query(self.model).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.examid == paperid, self.model.testid == testid) .order_by(self.model.createtime.desc()).first()) # $do_answer = pdo_fetch( # " SELECT * FROM ".tablename($this->t_user_exam_answer)." # WHERE weid = :weid AND uid = :uid AND testid = :testid AND examid = :examid order by createtime DESC ", [ # ":weid" = > $_W["uniacid"], ":uid" = > $uid, ":examid" = > $paperid, "testid" = > $v["id"]]); class CRUDUserSpecial(CRUDBase[UserSpecial, UserSpecialCreate, UserSpecialUpdate]): def get_user_special(self, db: Session, user_special_id: int): return self.get_by_field(db, "id", user_special_id) def get_last_id_by_special_id(self, db: Session, uniacid: int, uid: int, special_id: int) -> Optional[int]: result = (db.query(self.model.last_id) .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .filter(self.model.special_id == special_id) .filter(self.model.istatus == 1) .first()) return result[0] if result else None # $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_special"). # " WHERE weid=:weid AND uid=:uid AND special_id=:special_id AND istatus=1 ", # [":weid" = > $_W["uniacid"], ":uid" = > $uid, ":special_id" = > $special_id]); class CRUDUserSequence(CRUDBase[UserSequence, UserSpequenceCreate, UserSpequenceUpdate]): def get_user_spequence(self, db: Session, user_spequence_id: int): return self.get_by_field(db, "id", user_spequence_id) def get_last_id(self, db: Session, uid: int, uniacid: int): result = (db.query(self.model.question_id) .filter(self.model.weid == uniacid, self.model.user_id == uid) .first()) return result[0] if result else None def get_last_id_by_lib_id(self, db: Session, uid: int, uniacid: int, lib_id: int): result = (db.query(self.model.question_id) .filter(self.model.weid == uniacid, self.model.user_id == uid, self.model.lib_id == lib_id) .first()) return result[0] if result else None class CRUDUserMember(CRUDBase[UserMember, UserMemberCreate, UserMemberUpdate]): def get_user_member(self, db: Session, user_member_id: int): return self.get_by_field(db, "id", user_member_id) class CRUDUserCollectionPraction( CRUDBase[UserCollectionPraction, UserCollectionPractionCreate, UserCollectionPractionUpdate]): def get_user_collection_praction(self, db: Session, user_collection_praction_id: int): return self.get_by_field(db, "id", user_collection_praction_id) def get_is_collect(self, db: Session, uniacid: int, uid: int, test_id: int, test_type: int): result = (db.query(self.model.iscollect) .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.testid == test_id, self.model.test_type == test_type) .first()) return result[0] if result else None # $iscollect = pdo_getcolumn("goouc_fullexam_user_collection_praction", # ["weid" = > $_W["uniacid"], "uid" = > $uid, "testid" = > $val["id"], "test_type" = > $val["type"]], # "id"); def get_id_list(self, db: Session, uniacid: int, uid: int, istatus: int): query = (db.query(self.model.testid, self.model.test_type) .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == istatus) .order_by(self.model.id) ) results = query.all() return [{"testid": result[0], "test_type": result[1]} for result in results] class CRUDUserGift(CRUDBase[UserGift, UserGiftCreate, UserGiftUpdate]): def get_user_gift(self, db: Session, user_gift_id: int): return self.get_by_field(db, "id", user_gift_id) def get_user_gift_list(self, db: Session, uniacid: int, uid: str) -> Optional[List[Tuple[int, int, int]]]: # [ giftid , createtime , status # (1, 1634567890, 1), # (2, 1634567900, 0) # ] return (db.query(self.model.giftid, self.model.createtime, self.model.status) .filter(self.model.weid == uniacid).filter( self.model.uid == uid).all()) def get_user_gift(self, db: Session, user_gift_id: int): return self.get_by_field(db, "id", user_gift_id) class CRUDUserQHigh(CRUDBase[UserQhigh, UserQHighCreate, UserQHighUpdate]): def get_user_qhigh(self, db: Session, user_qhigh_id: int): return self.get_by_field(db, "id", user_qhigh_id) class CRUDUserQIntensive(CRUDBase[UserQintensive, UserQIntensiveCreate, UserQIntensiveUpdate]): def get_user_qintensive(self, db: Session, user_qintensive_id: int): return self.get_by_field(db, "id", user_qintensive_id) class CRUDUserRead(CRUDBase[UserRead, UserReadCreate, UserReadUpdate]): def get_user_read(self, db: Session, user_read_id: int): return self.get_by_field(db, "id", user_read_id) class CRUDUserDoOtherExam(CRUDBase[UserDootherExam, UserDoOtherExamCreate, UserDoOtherExamUpdate]): def get_user_doother_exam(self, db: Session, user_doother_exam_id: int): return self.get_by_field(db, "id", user_doother_exam_id) def get_user_other_highest(self, db: Session, uid: int ,uniacid: int): return (db.query(self.model) .filter_by(self.model.weid == uniacid) .filter(self.model.uid == uid) .order_by(self.model.franction.desc()) .first()) class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, UserWrongPractionUpdate]): def get_user_wrong_praction(self, db: Session, user_wrong_praction_id: int): return self.get_by_field(db, "id", user_wrong_praction_id) def get_err_id_list_order_by_createtime(self, db: Session, uniacid: int, uid: int): results = (db.query(self.model.testid, self.model.test_type) .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1) .order_by(self.model.createtime) .all()) # 转换成字典 return [{ "testid": result[0], "test_type": result[1] } for result in results] if results else None def get_id_list_order_by_id(self, db: Session, uniacid: int, uid: int, test_type: int): results = (db.query(self.model.testid, self.model.test_type) .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1) .order_by(self.model.id) .all()) # 转换成字典 return [{ "testid": result[0], "test_type": result[1] } for result in results] if results else None def get_id_list_with_time(self, db: Session, uniacid: int, uid: int, start: int): results = (db.query(self.model.testid, self.model.test_type) .filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1, self.model.createtime > start) .order_by(self.model.id) .all()) # 转换成字典 return [{ "testid": result[0], "test_type": result[1] } for result in results] if results else None class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]): # def get_user_pool(self, db: Session, user_pool_id: int): # return self.get_by_field(db, "id", user_pool_id) def get_kid_id_by_paperid(self, db: Session, uniacid: int, uid: int, paperid: int): result = (db.query(self.model.id).filter(self.model.weid == uniacid, self.model.uid == uid, self.model.paperid == paperid, self.model.istatus == 1).first()) return result[0] if result else None def get_user_pool(self, db: Session, uniacid: int, uid: int) -> List[Tuple[int, int]]: results = db.query(UserPool.id, UserPool.poolid).join( # 左连接 TestType, UserPool.poolid == TestType.id, isouter=True ).filter( UserPool.weid == uniacid, UserPool.uid == uid, TestType.istatus == 1, TestType.status == 1 ).all() return results if results else [] # pdo_fetchall("SELECT up.id,up.poolid FROM " # tablename("goouc_fullexam_user_pool") # as up LEFT JOIN " . tablename("goouc_fullexam_test_type") # AS tt ON up.poolid=tt.id # WHERE up.weid=:weid AND up.uid=:uid AND tt.istatus = 1 AND tt.status = 1 # [":weid" => $_W["uniacid"], ":uid" => $uid]); def get_kid_id_by_poolid(self, db: Session, uniacid: int, uid: int, poolid: int): result = (db.query(self.model.id) .filter(self.model.weid == uniacid) .filter(self.model.uid == uid) .filter(self.model.poolid == poolid) .filter(self.model.istatus == 1) .first()) return result[0] if result else None # $kid = pdo_getcolumn("goouc_fullexam_user_pool", # ["weid" = > $_W["uniacid"], "uid" = > $uid, "poolid" = > $pool["id"], "istatus" = > 1], "id"); class CRUDUserQtype(CRUDBase[UserQtype, UserQTypeCreate, UserQTypeUpdate]): def get_user_qype(self, db: Session, user_qtype_id: int): return self.get_by_field(db, "id", user_qtype_id) def get_last_id_by_type_id_and_uid(self, db: Session, uniacid: int, uid: int, type_id: int, istatus: int) -> \ Optional[int]: results = (db.query(self.model.last_id).filter( self.model.weid == uniacid, self.model.uid == uid, self.model.type_id == type_id, self.model.istatus == istatus) .first()) # $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_qtype"). # " WHERE weid=:weid AND uid=:uid AND type_id=:type_id AND istatus=1 ", [":weid" = > $_W[ # "uniacid"], ":uid" = > $uid, ":type_id" = > $qtype]); return results[0] if results else None