375 lines
16 KiB
Python
375 lines
16 KiB
Python
from typing import Optional, Tuple, List, Dict, Any
|
|
|
|
from sqlalchemy.orm import Session
|
|
from mooc.crud.crud_base import CRUDBase
|
|
|
|
from mooc.models.goouc_fullexam_user import (
|
|
# 导入全部
|
|
User,
|
|
UserDoexam,
|
|
UserDoOtherExamAnswer,
|
|
UserExamAnswer,
|
|
UserSpecial,
|
|
UserSequence,
|
|
UserMember,
|
|
UserCollectionPraction,
|
|
UserGift,
|
|
UserQhigh,
|
|
UserQintensive,
|
|
UserRead,
|
|
UserDootherExam,
|
|
UserWrongPraction,
|
|
UserPool,
|
|
UserQtype
|
|
)
|
|
from mooc.schemas.goouc_fullexam_user import (
|
|
# 导入全部create和update
|
|
UserCreate,
|
|
UserUpdate,
|
|
UserDoexamCreate,
|
|
UserDoexamUpdate,
|
|
UserDoOtherExamAnswerCreate,
|
|
UserDoOtherExamAnswerUpdate,
|
|
UserExamAnswerCreate,
|
|
UserExamAnswerUpdate,
|
|
UserSpecialCreate,
|
|
UserSpecialUpdate,
|
|
UserSpequenceCreate,
|
|
UserSpequenceUpdate,
|
|
UserMemberCreate,
|
|
UserMemberUpdate,
|
|
UserCollectionPractionCreate,
|
|
UserCollectionPractionUpdate,
|
|
UserGiftCreate,
|
|
UserGiftUpdate,
|
|
UserQHighCreate,
|
|
UserQHighUpdate,
|
|
UserQIntensiveCreate,
|
|
UserQIntensiveUpdate,
|
|
UserReadCreate,
|
|
UserReadUpdate,
|
|
UserDoOtherExamCreate,
|
|
UserDoOtherExamUpdate,
|
|
UserWrongPractionCreate,
|
|
UserWrongPractionUpdate,
|
|
UserPoolCreate,
|
|
UserPoolUpdate,
|
|
UserQTypeCreate,
|
|
UserQTypeUpdate,
|
|
)
|
|
from mooc.models.goouc_fullexam import (
|
|
TestType
|
|
)
|
|
|
|
|
|
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
|
def get_user(self, db: Session, user_id: int):
|
|
return self.get_by_field(db, "id", user_id)
|
|
|
|
def get_user_info(self, db: Session, uid: int, uniacid: int) -> Optional[User]:
|
|
return db.query(self.model).filter(self.model.id == uid, self.model.weid == uniacid).first()
|
|
|
|
def get_user_is_member(self, db: Session, uniacid: int, uid: int) -> Optional[int]:
|
|
# $is_member = pdo_getcolumn(goouc_fullexam_user,
|
|
# ["weid" = > $_W["uniacid"], "id" = > $uid],
|
|
# "ismember");
|
|
result = (db.query(self.model.ismember).
|
|
filter(self.model.weid == uniacid,
|
|
self.model.id == uid)
|
|
.first())
|
|
return result[0] if result else None
|
|
|
|
|
|
class CRUDUserDoexam(CRUDBase[UserDoexam, UserDoexamCreate, UserDoexamUpdate]):
|
|
def get_user_doexam(self, db: Session, user_doexam_id: int):
|
|
return self.get_by_field(db, "id", user_doexam_id)
|
|
|
|
def get_quanzhen_highest(self, db: Session, uniacid: int, uid: int):
|
|
result = (db.query(self.model)
|
|
.filter_by(self.model.weid == uniacid)
|
|
.filter(self.model.uid == uid)
|
|
.order_by(self.model.franction.desc())
|
|
.first())
|
|
return result[0] if result else None
|
|
|
|
def get_user_do_number(self, db: Session, uniacid: int, uid: int, examid: int):
|
|
return db.query(self.model).filter(
|
|
self.model.weid == uniacid,
|
|
self.model.uid == uid,
|
|
self.model.examid == examid).count()
|
|
|
|
def get_user_recordid(self, db: Session, uniacid: int, uid: int, paperid: int):
|
|
result = db.query(self.model.recordid).filter(
|
|
self.model.weid == uniacid,
|
|
self.model.uid == uid,
|
|
self.model.examid == paperid).order_by(self.model.createtime.desc()).limit(1).first()
|
|
return result[0] if result else None
|
|
|
|
|
|
class CRUDUserDoOtherExamAnswer(
|
|
CRUDBase[UserDoOtherExamAnswer, UserDoOtherExamAnswerCreate, UserDoOtherExamAnswerUpdate]):
|
|
def get_user_doother_exam_answer(self, db: Session, user_doother_exam_answer_id: int):
|
|
return self.get_by_field(db, "id", user_doother_exam_answer_id)
|
|
|
|
|
|
class CRUDUserExamAnswer(CRUDBase[UserExamAnswer, UserExamAnswerCreate, UserExamAnswerUpdate]):
|
|
def get_user_exam_answer(self, db: Session, user_exam_answer_id: int):
|
|
return self.get_by_field(db, "id", user_exam_answer_id)
|
|
|
|
def get_user_exam_answer(self, db: Session, uniacid: int, uid: int, paperid: int, recordid: int) -> Optional[
|
|
List[Dict[str, Any]]]:
|
|
query = (db.query(self.model.uid, self.model.examid, self.model.testid, self.model.test_type)
|
|
.filter(self.model.weid == uniacid)
|
|
.filter(self.model.uid == uid)
|
|
.filter(self.model.examid == paperid)
|
|
.filter(self.model.recordid == recordid)
|
|
.filter(self.model.test_type != 0)
|
|
.filter(self.model.isright == 0)
|
|
.order_by(self.model.testid)
|
|
)
|
|
|
|
# 将result元组转化为字典返回
|
|
column_names = ["uid", "examid", "testid", "test_type"]
|
|
|
|
# 执行查询并转换结果为字典
|
|
results = query.all()
|
|
results_dict = [dict(zip(column_names, result)) for result in results]
|
|
|
|
return results_dict
|
|
|
|
def get_exam_answer_test_id(self, db: Session, uniacid: int, uid: int) -> Optional[List[int]]:
|
|
results = (db.query(self.model.testid)
|
|
.filter(self.model.weid == uniacid)
|
|
.filter(self.model.uid == uid)
|
|
.order_by(self.model.testid)
|
|
.all())
|
|
return [result[0] for result in results] if results else []
|
|
|
|
def get_do_answer(self, db: Session, uniacid: int, uid: int, paperid: int, testid: int) -> Optional[UserExamAnswer]:
|
|
return (db.query(self.model).filter(
|
|
self.model.weid == uniacid,
|
|
self.model.uid == uid,
|
|
self.model.examid == paperid,
|
|
self.model.testid == testid)
|
|
.order_by(self.model.createtime.desc()).first())
|
|
|
|
# $do_answer = pdo_fetch(
|
|
# " SELECT * FROM ".tablename($this->t_user_exam_answer)."
|
|
# WHERE weid = :weid AND uid = :uid AND testid = :testid AND examid = :examid order by createtime DESC ", [
|
|
# ":weid" = > $_W["uniacid"], ":uid" = > $uid, ":examid" = > $paperid, "testid" = > $v["id"]]);
|
|
|
|
|
|
class CRUDUserSpecial(CRUDBase[UserSpecial, UserSpecialCreate, UserSpecialUpdate]):
|
|
def get_user_special(self, db: Session, user_special_id: int):
|
|
return self.get_by_field(db, "id", user_special_id)
|
|
|
|
def get_last_id_by_special_id(self, db: Session, uniacid: int, uid: int, special_id: int) -> Optional[int]:
|
|
result = (db.query(self.model.last_id)
|
|
.filter(self.model.weid == uniacid)
|
|
.filter(self.model.uid == uid)
|
|
.filter(self.model.special_id == special_id)
|
|
.filter(self.model.istatus == 1)
|
|
.first())
|
|
return result[0] if result else None
|
|
|
|
# $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_special").
|
|
# " WHERE weid=:weid AND uid=:uid AND special_id=:special_id AND istatus=1 ",
|
|
# [":weid" = > $_W["uniacid"], ":uid" = > $uid, ":special_id" = > $special_id]);
|
|
|
|
|
|
class CRUDUserSequence(CRUDBase[UserSequence, UserSpequenceCreate, UserSpequenceUpdate]):
|
|
def get_user_spequence(self, db: Session, user_spequence_id: int):
|
|
return self.get_by_field(db, "id", user_spequence_id)
|
|
|
|
def get_last_id(self, db: Session, uid: int, uniacid: int):
|
|
result = (db.query(self.model.question_id)
|
|
.filter(self.model.weid == uniacid,
|
|
self.model.user_id == uid)
|
|
.first())
|
|
return result[0] if result else None
|
|
|
|
def get_last_id_by_lib_id(self, db: Session, uid: int, uniacid: int, lib_id: int):
|
|
result = (db.query(self.model.question_id)
|
|
.filter(self.model.weid == uniacid,
|
|
self.model.user_id == uid, self.model.lib_id == lib_id)
|
|
.first())
|
|
return result[0] if result else None
|
|
|
|
|
|
class CRUDUserMember(CRUDBase[UserMember, UserMemberCreate, UserMemberUpdate]):
|
|
def get_user_member(self, db: Session, user_member_id: int):
|
|
return self.get_by_field(db, "id", user_member_id)
|
|
|
|
|
|
class CRUDUserCollectionPraction(
|
|
CRUDBase[UserCollectionPraction, UserCollectionPractionCreate, UserCollectionPractionUpdate]):
|
|
def get_user_collection_praction(self, db: Session, user_collection_praction_id: int):
|
|
return self.get_by_field(db, "id", user_collection_praction_id)
|
|
|
|
def get_is_collect(self, db: Session, uniacid: int, uid: int, test_id: int, test_type: int):
|
|
result = (db.query(self.model.iscollect)
|
|
.filter(self.model.weid == uniacid,
|
|
self.model.uid == uid,
|
|
self.model.testid == test_id,
|
|
self.model.test_type == test_type)
|
|
.first())
|
|
return result[0] if result else None
|
|
# $iscollect = pdo_getcolumn("goouc_fullexam_user_collection_praction",
|
|
# ["weid" = > $_W["uniacid"], "uid" = > $uid, "testid" = > $val["id"], "test_type" = > $val["type"]],
|
|
# "id");
|
|
|
|
def get_id_list(self, db: Session, uniacid: int, uid: int, istatus: int):
|
|
query = (db.query(self.model.testid, self.model.test_type)
|
|
.filter(self.model.weid == uniacid,
|
|
self.model.uid == uid,
|
|
self.model.istatus == istatus)
|
|
.order_by(self.model.id)
|
|
)
|
|
|
|
results = query.all()
|
|
return [{"testid": result[0], "test_type": result[1]} for result in results]
|
|
|
|
|
|
class CRUDUserGift(CRUDBase[UserGift, UserGiftCreate, UserGiftUpdate]):
|
|
def get_user_gift(self, db: Session, user_gift_id: int):
|
|
return self.get_by_field(db, "id", user_gift_id)
|
|
|
|
def get_user_gift_list(self, db: Session, uniacid: int, uid: str) -> Optional[List[Tuple[int, int, int]]]:
|
|
# [ giftid , createtime , status
|
|
# (1, 1634567890, 1),
|
|
# (2, 1634567900, 0)
|
|
# ]
|
|
return (db.query(self.model.giftid, self.model.createtime, self.model.status)
|
|
.filter(self.model.weid == uniacid).filter(
|
|
self.model.uid == uid).all())
|
|
|
|
def get_user_gift(self, db: Session, user_gift_id: int):
|
|
return self.get_by_field(db, "id", user_gift_id)
|
|
|
|
|
|
class CRUDUserQHigh(CRUDBase[UserQhigh, UserQHighCreate, UserQHighUpdate]):
|
|
def get_user_qhigh(self, db: Session, user_qhigh_id: int):
|
|
return self.get_by_field(db, "id", user_qhigh_id)
|
|
|
|
|
|
class CRUDUserQIntensive(CRUDBase[UserQintensive, UserQIntensiveCreate, UserQIntensiveUpdate]):
|
|
def get_user_qintensive(self, db: Session, user_qintensive_id: int):
|
|
return self.get_by_field(db, "id", user_qintensive_id)
|
|
|
|
|
|
class CRUDUserRead(CRUDBase[UserRead, UserReadCreate, UserReadUpdate]):
|
|
def get_user_read(self, db: Session, user_read_id: int):
|
|
return self.get_by_field(db, "id", user_read_id)
|
|
|
|
|
|
class CRUDUserDoOtherExam(CRUDBase[UserDootherExam, UserDoOtherExamCreate, UserDoOtherExamUpdate]):
|
|
def get_user_doother_exam(self, db: Session, user_doother_exam_id: int):
|
|
return self.get_by_field(db, "id", user_doother_exam_id)
|
|
|
|
def get_user_other_highest(self, db: Session, uid: int ,uniacid: int):
|
|
return (db.query(self.model)
|
|
.filter_by(self.model.weid == uniacid)
|
|
.filter(self.model.uid == uid)
|
|
.order_by(self.model.franction.desc())
|
|
.first())
|
|
|
|
|
|
class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, UserWrongPractionUpdate]):
|
|
def get_user_wrong_praction(self, db: Session, user_wrong_praction_id: int):
|
|
return self.get_by_field(db, "id", user_wrong_praction_id)
|
|
|
|
def get_err_id_list_order_by_createtime(self, db: Session, uniacid: int, uid: int):
|
|
results = (db.query(self.model.testid, self.model.test_type)
|
|
.filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1)
|
|
.order_by(self.model.createtime)
|
|
.all())
|
|
# 转换成字典
|
|
return [{
|
|
"testid": result[0],
|
|
"test_type": result[1]
|
|
} for result in results] if results else None
|
|
|
|
def get_id_list_order_by_id(self, db: Session, uniacid: int, uid: int, test_type: int):
|
|
results = (db.query(self.model.testid, self.model.test_type)
|
|
.filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1)
|
|
.order_by(self.model.id)
|
|
.all())
|
|
# 转换成字典
|
|
return [{
|
|
"testid": result[0],
|
|
"test_type": result[1]
|
|
} for result in results] if results else None
|
|
|
|
def get_id_list_with_time(self, db: Session, uniacid: int, uid: int, start: int):
|
|
results = (db.query(self.model.testid, self.model.test_type)
|
|
.filter(self.model.weid == uniacid, self.model.uid == uid, self.model.istatus == 1,
|
|
self.model.createtime > start)
|
|
.order_by(self.model.id)
|
|
.all())
|
|
|
|
# 转换成字典
|
|
return [{
|
|
"testid": result[0],
|
|
"test_type": result[1]
|
|
} for result in results] if results else None
|
|
|
|
|
|
class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]):
|
|
# def get_user_pool(self, db: Session, user_pool_id: int):
|
|
# return self.get_by_field(db, "id", user_pool_id)
|
|
|
|
def get_kid_id_by_paperid(self, db: Session, uniacid: int, uid: int, paperid: int):
|
|
result = (db.query(self.model.id).filter(self.model.weid == uniacid, self.model.uid == uid,
|
|
self.model.paperid == paperid, self.model.istatus == 1).first())
|
|
return result[0] if result else None
|
|
|
|
def get_user_pool(self, db: Session, uniacid: int, uid: int) -> List[Tuple[int, int]]:
|
|
results = db.query(UserPool.id, UserPool.poolid).join(
|
|
# 左连接
|
|
TestType,
|
|
UserPool.poolid == TestType.id,
|
|
isouter=True
|
|
).filter(
|
|
UserPool.weid == uniacid,
|
|
UserPool.uid == uid,
|
|
TestType.istatus == 1,
|
|
TestType.status == 1
|
|
).all()
|
|
return results if results else []
|
|
# pdo_fetchall("SELECT up.id,up.poolid FROM "
|
|
# tablename("goouc_fullexam_user_pool")
|
|
# as up LEFT JOIN " . tablename("goouc_fullexam_test_type")
|
|
# AS tt ON up.poolid=tt.id
|
|
# WHERE up.weid=:weid AND up.uid=:uid AND tt.istatus = 1 AND tt.status = 1
|
|
# [":weid" => $_W["uniacid"], ":uid" => $uid]);
|
|
|
|
def get_kid_id_by_poolid(self, db: Session, uniacid: int, uid: int, poolid: int):
|
|
result = (db.query(self.model.id)
|
|
.filter(self.model.weid == uniacid)
|
|
.filter(self.model.uid == uid)
|
|
.filter(self.model.poolid == poolid)
|
|
.filter(self.model.istatus == 1)
|
|
.first())
|
|
return result[0] if result else None
|
|
# $kid = pdo_getcolumn("goouc_fullexam_user_pool",
|
|
# ["weid" = > $_W["uniacid"], "uid" = > $uid, "poolid" = > $pool["id"], "istatus" = > 1], "id");
|
|
|
|
|
|
class CRUDUserQtype(CRUDBase[UserQtype, UserQTypeCreate, UserQTypeUpdate]):
|
|
def get_user_qype(self, db: Session, user_qtype_id: int):
|
|
return self.get_by_field(db, "id", user_qtype_id)
|
|
|
|
def get_last_id_by_type_id_and_uid(self, db: Session, uniacid: int, uid: int, type_id: int, istatus: int) -> \
|
|
Optional[int]:
|
|
results = (db.query(self.model.last_id).filter(
|
|
self.model.weid == uniacid,
|
|
self.model.uid == uid,
|
|
self.model.type_id == type_id,
|
|
self.model.istatus == istatus)
|
|
.first())
|
|
# $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_qtype").
|
|
# " WHERE weid=:weid AND uid=:uid AND type_id=:type_id AND istatus=1 ", [":weid" = > $_W[
|
|
# "uniacid"], ":uid" = > $uid, ":type_id" = > $qtype]);
|
|
|
|
return results[0] if results else None
|