feat(crud): part3基础实现

- 为 Cdkey、Exercise、Gift、Paper、Test 等模型添加了多个新的查询方法
This commit is contained in:
jieyuu 2025-02-28 23:22:23 +08:00
parent 5d8480dd17
commit 1f91ddc1db
6 changed files with 2505 additions and 46 deletions

View File

@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, HTTPException
from typing import Dict, Any from typing import Dict, Any
from mooc.utils.wechat_client import WeChatAPI from mooc.utils.wechat_client import WeChatAPI
@ -14,6 +14,7 @@ async def get_access_token() -> Dict[str, Any]:
except Exception as e: except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@router.post("/code2session") @router.post("/code2session")
async def code_to_session(code: str) -> Dict[str, Any]: async def code_to_session(code: str) -> Dict[str, Any]:
"""小程序登录""" """小程序登录"""
@ -23,6 +24,7 @@ async def code_to_session(code: str) -> Dict[str, Any]:
except Exception as e: except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@router.post("/qrcode") @router.post("/qrcode")
async def generate_qrcode(scene: str) -> bytes: async def generate_qrcode(scene: str) -> bytes:
"""生成小程序码""" """生成小程序码"""
@ -32,6 +34,7 @@ async def generate_qrcode(scene: str) -> bytes:
except Exception as e: except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@router.post("/send_template") @router.post("/send_template")
async def send_template_message(data: Dict[str, Any]) -> Dict[str, Any]: async def send_template_message(data: Dict[str, Any]) -> Dict[str, Any]:
"""发送订阅消息""" """发送订阅消息"""

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
from typing import Optional from typing import Optional, List, Tuple, Dict, Any
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import select, func
from mooc.crud.crud_base import CRUDBase from mooc.crud.crud_base import CRUDBase
from mooc.models.goouc_fullexam import (Advert, Banji, Banner, Category, Cdkey, CdkeyCate, Cdkeys, Exercise, Feedback, from mooc.models.goouc_fullexam import (Advert, Banji, Banner, Category, Cdkey, CdkeyCate, Cdkeys, Exercise, Feedback,
@ -47,6 +48,51 @@ class CRUDCdkey(CRUDBase[Cdkey, CdkeyCreate, CdkeyUpdate]):
def get_cdkey(self, db: Session, cdkey_id: int): def get_cdkey(self, db: Session, cdkey_id: int):
return self.get_by_field(db, "id", cdkey_id) return self.get_by_field(db, "id", cdkey_id)
# $cdkeys = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_cdkey").
# " WHERE
# weid =:weid
# uid=:uid
# status =2
# display=1
# endtime > unix_timestamp(now())
# kpool > :kpool ", [":weid" = > $_W["uniacid"], "uid" = > $uid, ":kpool" = > $now_time]);
def get_cdkey_list(self, db: Session, uniacid: str, uid: int, now_time: str) -> Optional[List[Cdkey]]:
cdkeys = db.query(self.model).filter(
self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.status == 2,
self.model.display == 1,
self.model.endtime > db.func.unix_timestamp(db.func.now()),
self.model.kpool > now_time
).all()
return cdkeys
def get_user_active_with_paperid(self, db: Session, uniacid: str, uid: int, now_time: str,
cdkeys_code_id_list: List[int]) -> Optional[
Cdkey]:
return (db.query(self.model).filter(
self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.status == 2,
self.model.display == 1,
self.model.endtime > func.unix_timestamp(func.now()),
self.model.kpool > now_time,
self.model.cid.in_(cdkeys_code_id_list)
).first())
def get_user_active(self, db: Session, uniacid: str, uid: int, now_time: str, cdkeys_code_id_list: List[int]) -> \
Optional[
Cdkey]:
return (db.query(self.model).filter(
self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.status == 2,
self.model.display == 1,
self.model.endtime > func.unix_timestamp(func.now()),
self.model.kpool > now_time,
self.model.cid.in_(cdkeys_code_id_list)
).first())
class CRUDCdkeyCate(CRUDBase[CdkeyCate, CdkeyCateCreate, CdkeyCateUpdate]): class CRUDCdkeyCate(CRUDBase[CdkeyCate, CdkeyCateCreate, CdkeyCateUpdate]):
def get_cdkey_cate(self, db: Session, cdkey_cate_id: int): def get_cdkey_cate(self, db: Session, cdkey_cate_id: int):
@ -57,11 +103,52 @@ class CRUDCdkeys(CRUDBase[Cdkeys, CdkeysCreate, CdkeysUpdate]):
def get_cdkeys(self, db: Session, cdkeys_id: int): def get_cdkeys(self, db: Session, cdkeys_id: int):
return self.get_by_field(db, "id", cdkeys_id) return self.get_by_field(db, "id", cdkeys_id)
def get_all_cdkeys_code_ids(self, db: Session, uniacid: str, paper_id: int) -> List[int]:
results = (db.query(self.model.code_id).filter(
self.model.uniacid == int(uniacid),
self.model.kpool_id == paper_id,
self.model.type == 1)
.all())
return [result[0] for result in results]
# $pools = pdo_getall("goouc_fullexam_cdkeys",
# ["weid" => $_W["uniacid"], "code_id" => $cdkey["cid"], "type" => 2], "kpool_id");
def get_all_kpool_id(self, db: Session, uniacid: str, cid: int) -> Optional[List[int]]:
pools = (db.query(self.model.kpool_id)
.filter(self.model.weid == uniacid,
self.model.code_id == cid,
self.model.type == 2
).all())
return [pool[0] for pool in pools]
def get_code_id_list_by_kpool_id_and_type(self, db: Session, uniacid: str, kpool_id: int, type: int):
results = (db.query(self.model.code_id)
.filter(self.model.weid == int(uniacid),
self.model.kpool_id == kpool_id,
self.model.type == type
).all())
# pdo_getall("goouc_fullexam_cdkeys", ["weid" = > $_W["uniacid"], "kpool_id" = > $v["id"], "type" = > 2], ["code_id"]);
return [result[0] for result in results]
class CRUDExercise(CRUDBase[Exercise, ExerciseCreate, ExerciseUpdate]): class CRUDExercise(CRUDBase[Exercise, ExerciseCreate, ExerciseUpdate]):
def get_exercise(self, db: Session, exercise_id: int): def get_exercise(self, db: Session, exercise_id: int):
return self.get_by_field(db, "id", exercise_id) return self.get_by_field(db, "id", exercise_id)
def get_testid_list(self, db: Session, uniacid: str, uid: int):
results = db.query(self.model.testid).filter(
self.model.weid == int(uniacid),
self.model.uid == uid
).all()
return [result[0] for result in results]
def get_exercise_have_do(self, db: Session, uniacid: str, uid: int, testid: int):
results = db.query(self.model.id, self.model.testid, self.model.test_type).filter(
self.model.weid == int(uniacid), self.model.uid == uid, self.model.testid == testid).first()
return results if results else None
class CRUDFeedback(CRUDBase[Feedback, FeedbackCreate, FeedbackUpdate]): class CRUDFeedback(CRUDBase[Feedback, FeedbackCreate, FeedbackUpdate]):
def get_feedback(self, db: Session, feedback_id: int): def get_feedback(self, db: Session, feedback_id: int):
@ -72,6 +159,13 @@ class CRUDGift(CRUDBase[Gift, GiftCreate, GiftUpdate]):
def get_gift(self, db: Session, gift_id: int): def get_gift(self, db: Session, gift_id: int):
return self.get_by_field(db, "id", gift_id) return self.get_by_field(db, "id", gift_id)
def get_gift_by_id(self, db: Session, gift_id: int, uniacid: str) -> Optional[Gift]:
return (db.query(self.model)
.filter(self.model.weid == int(uniacid), self.model.id == gift_id)
.first())
# $gift = pdo_fetch("SELECT * FROM ".tablename("goouc_fullexam_gift").
# " WHERE weid=:weid AND id=:gid", [":weid" = > $_W["uniacid"], ":gid" = > $val["giftid"]]);
class CRUDIndexBtn(CRUDBase[IndexBtn, IndexBtnCreate, IndexBtnUpdate]): class CRUDIndexBtn(CRUDBase[IndexBtn, IndexBtnCreate, IndexBtnUpdate]):
def get_index_btn(self, db: Session, index_btn_id: int): def get_index_btn(self, db: Session, index_btn_id: int):
@ -102,11 +196,85 @@ class CRUDPaper(CRUDBase[Paper, PaperCreate, PaperUpdate]):
def get_paper(self, db: Session, paper_id: int): def get_paper(self, db: Session, paper_id: int):
return self.get_by_field(db, "id", paper_id) return self.get_by_field(db, "id", paper_id)
def get_franction_by_paperid(self, db: Session, uniacid: str, paperid: int) -> Optional[str]:
result = (db.query(self.model.franction)
.filter(self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.status == 1,
self.model.id == paperid)
.first())
return result[0] if result else None
def get_page_list(self, db: Session, uniacid: str, page_index: int, page_size: int) -> Optional[List[Paper]]:
return db.query(self.model).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.status == 1
).order_by(self.model.displayorder.desc()).limit(page_size).offset(
(page_index - 1) * page_size).all()
# $paper = pdo_fetch("SELECT id,title,total_franction FROM ".tablename("goouc_fullexam_paper").
# " WHERE weid=:weid AND status=1 AND istatus=1 AND id=:paperid ",
# [":weid" = > $_W["uniacid"], ":paperid" = > $paperid]);
def get_paper_by_paperid(self, db: Session, uniacid: str, paperid: int) -> Optional[Paper]:
return (db.query(self.model)
.filter(self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.status == 1,
self.model.id == paperid)
.first())
def get_total(self, db: Session, uniacid: str):
return (db.query(self.model)
.filter(self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.status == 1)
.order_by(self.model.displayorder.desc())
.count())
class CRUDPaperTest(CRUDBase[PaperTest, PaperTestCreate, PaperTestUpdate]): class CRUDPaperTest(CRUDBase[PaperTest, PaperTestCreate, PaperTestUpdate]):
def get_paper_test(self, db: Session, paper_test_id: int): def get_paper_test(self, db: Session, paper_test_id: int):
return self.get_by_field(db, "id", paper_test_id) return self.get_by_field(db, "id", paper_test_id)
def get_qnum(self, db: Session, paper_test_id: int) -> Optional[int]:
return (db.query(self.model)
.filter(self.model.paperid == paper_test_id)
.filter(self.model.istatus == 1)
.filter(self.model.status == 1)
.count())
def get_all_test_id(self, db: Session, paperid: int) -> Optional[List[int]]:
return db.query(self.model.testid).filter(self.model.paperid == paperid).all()
# pdo_fetchall("SELECT
# q.id as id,q.type,q.question,q.qimage,q.qvideo,q.qaudio,q.a_type,q.option,q.rightkey,q.analysis,q.aimage,q.analysis_audio
# tablename("goouc_fullexam_paper_test") . " AS pt JOIN " . tablename("goouc_fullexam_test") . "
# AS q ON pt.testid=q.id WHERE pt.paperid=:paperid AND q.istatus=1 ", [":paperid" => $paper["id"]]);
def get_questions_by_paperid(self, db: Session, paperid: int):
questions = (
db.query(
Test.id.label('id'),
Test.type,
Test.question,
Test.qimage,
Test.qvideo,
Test.qaudio,
Test.a_type,
Test.option,
Test.rightkey,
Test.analysis,
Test.aimage,
Test.analysis_audio
)
.join(PaperTest, Test.id == PaperTest.testid)
.filter(PaperTest.paperid == paperid,
Test.istatus == 1)
.all()
)
return questions
class CRUDPhonecode(CRUDBase[Phonecode, PhoneCodeCreate, PhoneCodeUpdate]): class CRUDPhonecode(CRUDBase[Phonecode, PhoneCodeCreate, PhoneCodeUpdate]):
def get_phonecode(self, db: Session, phonecode_id: int): def get_phonecode(self, db: Session, phonecode_id: int):
@ -127,26 +295,562 @@ class CRUDSetting(CRUDBase[Setting, SettingCreate, SettingUpdate]):
def get_setting(self, db: Session, setting_id: int): def get_setting(self, db: Session, setting_id: int):
return self.get_by_field(db, "id", setting_id) return self.get_by_field(db, "id", setting_id)
def get_setting_by_weid(self, db: Session, uniacid: str):
return self.get_by_field(db, "weid", int(uniacid))
def get_about_us(self, db: Session, uniacid: str):
result = self.get_by_field(db, "weid", int(uniacid))
# 创建一个空字典
info = {}
if result is not None:
info["id"] = result.id
info["about"] = result.about
return info
class CRUDShareRecord(CRUDBase[ShareRecord, ShareRecordCreate, ShareRecordUpdate]): class CRUDShareRecord(CRUDBase[ShareRecord, ShareRecordCreate, ShareRecordUpdate]):
def get_share_record(self, db: Session, share_record_id: int): def get_share_record(self, db: Session, share_record_id: int):
return self.get_by_field(db, "id", share_record_id) return self.get_by_field(db, "id", share_record_id)
class CRUDsonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]): class CRUDSonSimple(CRUDBase[SonSimple, SonSimpleCreate, SonSimpleUpdate]):
def get_son_simple(self, db: Session, son_simple_id: int): def get_son_simple(self, db: Session, son_simple_id: int):
return self.get_by_field(db, "id", son_simple_id) return self.get_by_field(db, "id", son_simple_id)
def get_type_list(self, db: Session, uniacid: str) -> Optional[List[SonSimple]]:
# $type_list_t = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_son_simple").
# " WHERE weid = :weid ORDER BY id", [":weid" = > $_W["uniacid"]]);
results = db.query(self.model).filter(self.model.weid == int(uniacid)).order_by(self.model.id).all()
return results if results is not None else []
def get_type_list_page(self, db: Session, uniacid: str, pindex: int, psize: int) -> Optional[List[SonSimple]]:
results = db.query(self.model).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.status == 1
).order_by(self.model.displayorder.desc()).limit(psize).offset(
(pindex - 1) * psize).all()
# $type_list_t = pdo_fetchall(" SELECT * FROM ".tablename("goouc_fullexam_son_simple").
# " WHERE weid = :weid ORDER BY id DESC LIMIT ".($pindex - 1) * $psize.
# ",". $psize, [":weid" = > $_W["uniacid"]]);
return results if results is not None else []
def get_total(self, db: Session, uniacid: str) -> Optional[List[SonSimple]]:
results = db.query(self.model).filter(self.model.weid == int(uniacid)).order_by(self.model.id.desc).all()
return results if results is not None else []
class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]): class CRUDTest(CRUDBase[Test, TestCreate, TestUpdate]):
def get_test(self, db: Session, test_id: int): def get_test(self, db: Session, test_id: int):
return self.get_by_field(db, "test_id", test_id) return self.get_by_field(db, "test_id", test_id)
def get_question(self, db: Session, uniacid: str, id: int) -> Optional[Dict[str, Any]]:
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.id == id,
self.model.istatus == 1
)
# 只选择指定字段
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.first()
results_dict = dict(zip(column_names, results)) if results is not None else None
return results_dict
def get_err_list_order_by_id(self, db: Session, uniacid: str, id: int) -> Optional[List[Dict]]:
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.id == id,
self.model.istatus == 1
)
# 只选择指定字段
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.order_by(self.model.id).all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_pid_by_id(self, db: Session, uniacid: str, id: int) -> Optional[int]:
result = db.query(self.model.pid).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.id == id
).first()
return result[0] if result is not None else None
def get_test_count(self, db: Session, uniacid: str, pool_ids: List[int]) -> int:
total = db.query(self.model.id).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.type != 0,
self.model.display == 1,
self.model.type < 9,
self.model.libraryid.in_(pool_ids)
).count()
return total if total is not None else 0
def get_test_by_id_and_type(self, db: Session, uniacid: str, type: int, testid: int) -> Optional[int]:
result = (db.query(self.model.id).filter(
self.model.weid == int(uniacid),
self.model.type == type,
self.model.id == testid)
.first())
return result[0] if result is not None else None
def mock_exam_get_question_list(self, db: Session, uniacid: str, pool: List[int], type: int, display: int,
num: int) -> Optional[List[Dict[str, Any]]]:
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.libraryid.in_(pool),
self.model.istatus == 1,
self.model.type == type,
self.model.display == display,
self.model.type < 8
)
# 添加动态条件
if type == 5:
query = query.filter(self.model.son_status == 1)
if num <= 0:
raise ValueError("Invalid limit value")
# 只选择指定字段
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.order_by(func.rand()).limit(num).all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def mock_exam_get_question_list_type3(self, db: Session, uniacid: str, pool: List[int], type: int, display: int,
num: int) -> Optional[List[Dict[str, Any]]]:
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.libraryid.in_(pool),
self.model.istatus == 1,
self.model.type == type,
self.model.display == display,
).order_by(func.rand()).limit(num).all()
if num <= 0:
raise ValueError("Invalid limit value")
# 只选择指定字段
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.order_by(func.rand()).limit(num).all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_qtye_question_list(self, db: Session, uniacid: str, istatus: int, type: int, display: int, lib_id: int):
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.istatus == istatus,
self.model.type == type,
self.model.display == display,
self.model.libraryid == lib_id
).order_by(self.model.id)
# 只选择指定字段
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_randon_question_list(self, db: Session, uniacid: str, lib_id: int, type_max: int, istatus: int,
display: int) -> Optional[List[Dict[str, Any]]]:
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.istatus == istatus,
self.model.type != 0,
self.model.type < type_max,
self.model.display == display,
self.model.libraryid == lib_id
).order_by(func.rand())
# 只选择指定字段
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_randon_question_list_batch(self, db: Session, uniacid: str, type: int, type_max: int, istatus: int,
display: int, pool: List[int], limit: int) -> Optional[List[Dict[str, Any]]]:
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.type < type_max,
self.model.libraryid.in_(pool),
self.model.istatus == istatus,
self.model.type == type,
self.model.display == display
).order_by(func.rand()).limit(limit)
# 只选择指定字段
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_question_list(self, db: Session, uniacid: str, lib_id: int, istatus: int, type_max: int, display: int) -> \
Optional[List[Dict[str, Any]]]:
# 构建查询
# 构建基础查询
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.istatus == istatus,
self.model.type != 0,
self.model.display == display,
self.model.type < type_max,
self.model.libraryid == lib_id
).order_by(self.model.id)
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_question_list_with_limit(self,
db: Session, uniacid: str, lib_id: int, istatus: int, type_max: int, display: int,
limit: int) -> Optional[List[Dict[str, Any]]]:
# 构建查询
# 构建基础查询
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.istatus == istatus,
self.model.type != 0,
self.model.type < type_max,
self.model.display == display,
self.model.libraryid == lib_id
).order_by(self.model.id).limit(limit)
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_question_list_by_son_type(self, db: Session, uniacid: str, qtype: int, lib_id: int, istatus: int,
son_simple: int, display: int) -> \
Optional[List[Dict[str, Any]]]:
# 构建查询
# 构建基础查询
query = db.query(
self.model.id,
self.model.type,
self.model.question,
self.model.qimage,
self.model.qvideo,
self.model.qaudio,
self.model.a_type,
self.model.option,
self.model.rightkey,
self.model.analysis,
self.model.aimage,
self.model.analysis_audio
).filter(
self.model.weid == int(uniacid),
self.model.istatus == istatus,
self.model.son_simple == son_simple,
self.model.type == qtype,
self.model.display == display,
self.model.libraryid == lib_id
).order_by(self.model.id)
# 将result元组转化为字典返回
column_names = ["id", "type", "question", "qimage", "qvideo", "qaudio", "a_type", "option", "rightkey",
"analysis", "aimage", "analysis_audio"]
# 执行查询并转换结果为字典
results = query.all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_q_have_by_type_and_lib_id(self, db: Session, uniacid: str, type: int, lib_id: int, istatus: int,
son_status: int, display: int) -> Optional[int]:
query = (db.query(self.model.id)
.filter(self.model.weid == int(uniacid))
.filter(self.model.type == type)
.filter(self.model.istatus == istatus)
.filter(self.model.display == display)
.filter(self.model.son_status == son_status)
.filter(self.model.libraryid == lib_id)
)
return query.count()
def get_testson_by_pid(self, db: Session, uniacid: str, pid: int) -> Optional[List[Test]]:
results = db.query(self.model).filter(self.model.weid == int(uniacid), self.model.pid == pid).order_by(
self.model.id).all()
return results if results else []
class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]): class CRUDTestType(CRUDBase[TestType, TestTypeCreate, TestTypeUpdate]):
def get_test_type(self, db: Session, test_type_id: int): def get_test_type(self, db: Session, test_type_id: int):
return self.get_by_field(db, "id", test_type_id) return self.get_by_field(db, "id", test_type_id)
def get_fpool(self, db: Session, uniacid: str) -> List[int]:
results = db.query(self.model.id).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.status == 1,
self.model.price <= 0,
self.model.pid != 0,
self.model.gpid == 0
).all()
return [result[0] for result in results] if results else []
def get_fpool_with_student_check(self, db: Session, uniacid: str) -> List[int]:
results = db.query(self.model.id).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.status == 1,
self.model.price <= 0,
self.model.is_student != 1,
self.model.pid != 0,
self.model.gpid == 0
).all()
return [result[0] for result in results] if results else []
def get_test_type_ids(self, db: Session, uniacid: str, pid: int) -> List[int]:
results = db.query(self.model.id).filter(
self.model.weid == int(uniacid),
self.model.pid == pid,
self.model.istatus == 1,
self.model.status == 1,
self.model.gpid != 0
).all()
return [result[0] for result in results] if results else []
def get_test_type_by_special_id(self, db: Session, uniacid: str, special_id: int) -> Optional[int]:
result = db.query(self.model.id).filter(
self.model.weid == int(uniacid),
self.model.id == special_id,
self.model.istatus == 1,
self.model.status == 1
).first()
# $pid = pdo_getcolumn("goouc_fullexam_test_type", ["weid" = > $_W[
# "uniacid"], "id" = > $special_id, "istatus" = > 1, "status" = > 1], "pid");
return result[0] if result else None
def get_pool_by_id(self, db: Session, uniacid: str, pid: int) -> List[Dict[str, Any]]:
result = (db.query(self.model.id, self.model.name, self.model.price)
.filter(self.model.weid == int(uniacid))
.filter(self.model.istatus == 1)
.filter(self.model.status == 1)
.filter(self.model.id == pid).first())
# $pool = pdo_fetchall("SELECT id,name,price FROM ".tablename("goouc_fullexam_test_type").
# " WHERE weid=:weid AND istatus = 1 AND status = 1 AND id=:id ", [":weid" = > $_W["uniacid"],
# ":id" = > $pid]);
if result is not None:
# 将result从元组转化为字典返回
return {"id": result[0], "name": result[1], "price": result[2]}
else:
return None
def get_all_special_if_is_student(self, db: Session, uniacid: str, uid: int):
results = db.query(self.model.id, self.model.name).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.is_student == 1,
self.model.pid == 0,
self.model.status == 1
).order_by(self.model.id).all()
# pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type").
# " WHERE weid=:weid AND istatus=1 AND pid = 0 AND status = 1 ORDER BY id ASC ",
# [":weid" = > $_W["uniacid"]]);
return [{"id": result[0], "name": result[1]} for result in results]
def get_all_special_without_is_student(self, db: Session, uniacid: str, uid: int) -> Optional[List[Dict[str, Any]]]:
results = db.query(self.model.id, self.model.name).filter(
self.model.weid == int(uniacid),
self.model.istatus == 1,
self.model.is_student == 2,
self.model.pid == 0,
self.model.status == 1
).order_by(self.model.id).all()
# $list = pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type").
# " WHERE weid=:weid AND istatus=1 AND is_student = 2 AND pid = 0 AND status = 1 ORDER BY id ASC ",
# [":weid" = > $_W["uniacid"]]);
return [{"id": result[0], "name": result[1]} for result in results] if results else None
def get_son_by_pid(self, db: Session, uniacid: str, pid: int, istatus: int, status: int):
results = db.query(self.model.id, self.model.name).filter(
self.model.weid == int(uniacid),
self.model.istatus == istatus,
self.model.pid == pid,
self.model.status == status
).order_by(self.model.id).all()
# pdo_fetchall("SELECT `id`,`name` FROM ".tablename("goouc_fullexam_test_type").
# " WHERE weid=:weid AND istatus=1 AND pid=:pid AND status = 1 ORDER BY id ASC ",
# [":weid" = > $_W["uniacid"], ":pid" = > $v["id"]]);
return [{"id": result[0], "name": result[1]} for result in results] if results else None
class CRUDTypeCate(CRUDBase[TypeCate, TypeCateCreate, TypeCateUpdate]): class CRUDTypeCate(CRUDBase[TypeCate, TypeCateCreate, TypeCateUpdate]):
def get_type_cate(self, db: Session, type_cate_id: int): def get_type_cate(self, db: Session, type_cate_id: int):
@ -167,6 +871,13 @@ class CRUDXueshen(CRUDBase[Xuesheng, XueshengCreate, XueshengUpdate]):
def get_xueshen(self, db: Session, xueshen_id: int): def get_xueshen(self, db: Session, xueshen_id: int):
return self.get_by_field(db, "xuesheng_id", xueshen_id) return self.get_by_field(db, "xuesheng_id", xueshen_id)
def get_xuesheng_id(self, db: Session, uniacid: str, user_name: str, user_phone: str) -> Optional[int]:
return (db.query(self.model.xuesheng_id).filter(
self.model.weid == int(uniacid),
self.model.user_name == user_name,
self.model.user_phone == user_phone)
.first())
advert = CRUDAdvert(Advert) advert = CRUDAdvert(Advert)
banji = CRUDBase(Banji) banji = CRUDBase(Banji)
@ -190,7 +901,7 @@ q_year = CRUDSchool(School)
school = CRUDSchool(School) school = CRUDSchool(School)
setting = CRUDSetting(Setting) setting = CRUDSetting(Setting)
share_record = CRUDShareRecord(ShareRecord) share_record = CRUDShareRecord(ShareRecord)
son_simple = CRUDsonSimple(SonSimple) son_simple = CRUDSonSimple(SonSimple)
test = CRUDTest(Test) test = CRUDTest(Test)
test_type = CRUDTestType(TestType) test_type = CRUDTestType(TestType)
type_cate = CRUDTypeCate(TypeCate) type_cate = CRUDTypeCate(TypeCate)

View File

@ -1,14 +1,16 @@
from typing import Optional from typing import Optional, Tuple, List, Dict, Any
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from mooc.crud.crud_base import CRUDBase from mooc.crud.crud_base import CRUDBase
from mooc.models.goouc_fullexam_user import ( from mooc.models.goouc_fullexam_user import (
# 导入全部 # 导入全部
User,
UserDoexam, UserDoexam,
UserDoOtherExamAnswer, UserDoOtherExamAnswer,
UserExamAnswer, UserExamAnswer,
UserSpecial, UserSpecial,
UserSpequence, UserSequence,
UserMember, UserMember,
UserCollectionPraction, UserCollectionPraction,
UserGift, UserGift,
@ -18,9 +20,12 @@ from mooc.models.goouc_fullexam_user import (
UserDootherExam, UserDootherExam,
UserWrongPraction, UserWrongPraction,
UserPool, UserPool,
UserQtype
) )
from mooc.schemas.goouc_fullexam_user import ( from mooc.schemas.goouc_fullexam_user import (
# 导入全部create和update # 导入全部create和update
UserCreate,
UserUpdate,
UserDoexamCreate, UserDoexamCreate,
UserDoexamUpdate, UserDoexamUpdate,
UserDoOtherExamAnswerCreate, UserDoOtherExamAnswerCreate,
@ -49,13 +54,57 @@ from mooc.schemas.goouc_fullexam_user import (
UserWrongPractionUpdate, UserWrongPractionUpdate,
UserPoolCreate, UserPoolCreate,
UserPoolUpdate, UserPoolUpdate,
UserQTypeCreate,
UserQTypeUpdate,
) )
from mooc.models.goouc_fullexam import (
TestType
)
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_user(self, db: Session, user_id: int):
return self.get_by_field(db, "id", user_id)
def get_user_info(self, db: Session, uid: int, uniacid: str) -> Optional[User]:
return db.query(self.model).filter(self.model.id == uid, self.model.weid == int(uniacid)).first()
def get_user_is_member(self, db: Session, uniacid: str, uid: int) -> Optional[int]:
# $is_member = pdo_getcolumn(goouc_fullexam_user,
# ["weid" = > $_W["uniacid"], "id" = > $uid],
# "ismember");
result = (db.query(self.model.ismember).
filter(self.model.weid == uniacid,
self.model.id == uid)
.first())
return result[0] if result else None
class CRUDUserDoexam(CRUDBase[UserDoexam, UserDoexamCreate, UserDoexamUpdate]): class CRUDUserDoexam(CRUDBase[UserDoexam, UserDoexamCreate, UserDoexamUpdate]):
def get_user_doexam(self, db: Session, user_doexam_id: int): def get_user_doexam(self, db: Session, user_doexam_id: int):
return self.get_by_field(db, "id", user_doexam_id) return self.get_by_field(db, "id", user_doexam_id)
def get_quanzhen_highest(self, db: Session, uniacid: str, uid: int):
result = (db.query(self.model)
.filter_by(self.model.weid == uniacid)
.filter(self.model.uid == uid)
.order_by(self.model.franction.desc())
.first())
return result[0] if result else None
def get_user_do_number(self, db: Session, uniacid: str, uid: int, examid: int):
return db.query(self.model).filter(
self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.examid == examid).count()
def get_user_recordid(self, db: Session, uniacid: str, uid: int, paperid: int):
result = db.query(self.model.recordid).filter(
self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.examid == paperid).order_by(self.model.createtime.desc()).limit(1).first()
return result[0] if result else None
class CRUDUserDoOtherExamAnswer( class CRUDUserDoOtherExamAnswer(
CRUDBase[UserDoOtherExamAnswer, UserDoOtherExamAnswerCreate, UserDoOtherExamAnswerUpdate]): CRUDBase[UserDoOtherExamAnswer, UserDoOtherExamAnswerCreate, UserDoOtherExamAnswerUpdate]):
@ -67,16 +116,85 @@ class CRUDUserExamAnswer(CRUDBase[UserExamAnswer, UserExamAnswerCreate, UserExam
def get_user_exam_answer(self, db: Session, user_exam_answer_id: int): def get_user_exam_answer(self, db: Session, user_exam_answer_id: int):
return self.get_by_field(db, "id", user_exam_answer_id) return self.get_by_field(db, "id", user_exam_answer_id)
def get_user_exam_answer(self, db: Session, uniacid: str, uid: int, paperid: int, recordid: int) -> Optional[
List[Dict[str, Any]]]:
query = (db.query(self.model.uid, self.model.examid, self.model.testid, self.model.test_type)
.filter(self.model.weid == int(uniacid))
.filter(self.model.uid == uid)
.filter(self.model.examid == paperid)
.filter(self.model.recordid == recordid)
.filter(self.model.test_type != 0)
.filter(self.model.isright == 0)
.order_by(self.model.testid)
)
# 将result元组转化为字典返回
column_names = ["uid", "examid", "testid", "test_type"]
# 执行查询并转换结果为字典
results = query.all()
results_dict = [dict(zip(column_names, result)) for result in results]
return results_dict
def get_exam_answer_test_id(self, db: Session, uniacid: str, uid: int) -> Optional[List[int]]:
results = (db.query(self.model.testid)
.filter(self.model.weid == int(uniacid))
.filter(self.model.uid == uid)
.order_by(self.model.testid)
.all())
return [result[0] for result in results] if results else []
def get_do_answer(self, db: Session, uniacid: str, uid: int, paperid: int, testid: int) -> Optional[UserExamAnswer]:
return (db.query(self.model).filter(
self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.examid == paperid,
self.model.testid == testid)
.order_by(self.model.createtime.desc()).first())
# $do_answer = pdo_fetch(
# " SELECT * FROM ".tablename($this->t_user_exam_answer)."
# WHERE weid = :weid AND uid = :uid AND testid = :testid AND examid = :examid order by createtime DESC ", [
# ":weid" = > $_W["uniacid"], ":uid" = > $uid, ":examid" = > $paperid, "testid" = > $v["id"]]);
class CRUDUserSpecial(CRUDBase[UserSpecial, UserSpecialCreate, UserSpecialUpdate]): class CRUDUserSpecial(CRUDBase[UserSpecial, UserSpecialCreate, UserSpecialUpdate]):
def get_user_special(self, db: Session, user_special_id: int): def get_user_special(self, db: Session, user_special_id: int):
return self.get_by_field(db, "id", user_special_id) return self.get_by_field(db, "id", user_special_id)
def get_last_id_by_special_id(self, db: Session, uniacid: str, uid: int, special_id: int) -> Optional[int]:
result = (db.query(self.model.last_id)
.filter(self.model.weid == int(uniacid))
.filter(self.model.uid == uid)
.filter(self.model.special_id == special_id)
.filter(self.model.istatus == 1)
.first())
return result[0] if result else None
class CRUDUserSpequence(CRUDBase[UserSpequence, UserSpequenceCreate, UserSpequenceUpdate]): # $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_special").
# " WHERE weid=:weid AND uid=:uid AND special_id=:special_id AND istatus=1 ",
# [":weid" = > $_W["uniacid"], ":uid" = > $uid, ":special_id" = > $special_id]);
class CRUDUserSequence(CRUDBase[UserSequence, UserSpequenceCreate, UserSpequenceUpdate]):
def get_user_spequence(self, db: Session, user_spequence_id: int): def get_user_spequence(self, db: Session, user_spequence_id: int):
return self.get_by_field(db, "id", user_spequence_id) return self.get_by_field(db, "id", user_spequence_id)
def get_last_id(self, db: Session, uid: int, uniacid: str):
result = (db.query(self.model.question_id)
.filter(self.model.weid == int(uniacid),
self.model.user_id == uid)
.first())
return result[0] if result else None
def get_last_id_by_lib_id(self, db: Session, uid: int, uniacid: str, lib_id: int):
result = (db.query(self.model.question_id)
.filter(self.model.weid == int(uniacid),
self.model.user_id == uid, self.model.lib_id == lib_id)
.first())
return result[0] if result else None
class CRUDUserMember(CRUDBase[UserMember, UserMemberCreate, UserMemberUpdate]): class CRUDUserMember(CRUDBase[UserMember, UserMemberCreate, UserMemberUpdate]):
def get_user_member(self, db: Session, user_member_id: int): def get_user_member(self, db: Session, user_member_id: int):
@ -88,11 +206,46 @@ class CRUDUserCollectionPraction(
def get_user_collection_praction(self, db: Session, user_collection_praction_id: int): def get_user_collection_praction(self, db: Session, user_collection_praction_id: int):
return self.get_by_field(db, "id", user_collection_praction_id) return self.get_by_field(db, "id", user_collection_praction_id)
def get_is_collect(self, db: Session, uniacid: str, uid: int, test_id: int, test_type: int):
result = (db.query(self.model.iscollect)
.filter(self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.testid == test_id,
self.model.test_type == test_type)
.first())
return result[0] if result else None
# $iscollect = pdo_getcolumn("goouc_fullexam_user_collection_praction",
# ["weid" = > $_W["uniacid"], "uid" = > $uid, "testid" = > $val["id"], "test_type" = > $val["type"]],
# "id");
def get_id_list(self, db: Session, uniacid: str, uid: int, istatus: int):
query = (db.query(self.model.testid, self.model.test_type)
.filter(self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.istatus == istatus)
.order_by(self.model.id)
)
results = query.all()
return [{"testid": result[0], "test_type": result[1]} for result in results]
class CRUDUserGift(CRUDBase[UserGift, UserGiftCreate, UserGiftUpdate]): class CRUDUserGift(CRUDBase[UserGift, UserGiftCreate, UserGiftUpdate]):
def get_user_gift(self, db: Session, user_gift_id: int): def get_user_gift(self, db: Session, user_gift_id: int):
return self.get_by_field(db, "id", user_gift_id) return self.get_by_field(db, "id", user_gift_id)
def get_user_gift_list(self, db: Session, uniacid: str, uid: str) -> Optional[List[Tuple[int, int, int]]]:
# [ giftid , createtime , status
# (1, 1634567890, 1),
# (2, 1634567900, 0)
# ]
return (db.query(self.model.giftid, self.model.createtime, self.model.status)
.filter(self.model.weid == uniacid).filter(
self.model.uid == uid).all())
def get_user_gift(self, db: Session, user_gift_id: int):
return self.get_by_field(db, "id", user_gift_id)
class CRUDUserQHigh(CRUDBase[UserQhigh, UserQHighCreate, UserQHighUpdate]): class CRUDUserQHigh(CRUDBase[UserQhigh, UserQHighCreate, UserQHighUpdate]):
def get_user_qhigh(self, db: Session, user_qhigh_id: int): def get_user_qhigh(self, db: Session, user_qhigh_id: int):
@ -113,12 +266,109 @@ class CRUDUserDoOtherExam(CRUDBase[UserDootherExam, UserDoOtherExamCreate, UserD
def get_user_doother_exam(self, db: Session, user_doother_exam_id: int): def get_user_doother_exam(self, db: Session, user_doother_exam_id: int):
return self.get_by_field(db, "id", user_doother_exam_id) return self.get_by_field(db, "id", user_doother_exam_id)
def get_user_other_highest(self, db: Session, uniacid: str, uid: int):
return (db.query(self.model)
.filter_by(self.model.weid == uniacid)
.filter(self.model.uid == uid)
.order_by(self.model.franction.desc())
.first())
class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, UserWrongPractionUpdate]): class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate, UserWrongPractionUpdate]):
def get_user_wrong_praction(self, db: Session, user_wrong_praction_id: int): def get_user_wrong_praction(self, db: Session, user_wrong_praction_id: int):
return self.get_by_field(db, "id", user_wrong_praction_id) return self.get_by_field(db, "id", user_wrong_praction_id)
def get_err_id_list_order_by_createtime(self, db: Session, uniacid: str, uid: int):
results = (db.query(self.model.testid, self.model.test_type)
.filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1)
.order_by(self.model.createtime)
.all())
# 转换成字典
return [{
"testid": result[0],
"test_type": result[1]
} for result in results] if results else None
def get_id_list_order_by_id(self, db: Session, uniacid: str, uid: int, test_type: int):
results = (db.query(self.model.testid, self.model.test_type)
.filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1)
.order_by(self.model.id)
.all())
# 转换成字典
return [{
"testid": result[0],
"test_type": result[1]
} for result in results] if results else None
def get_id_list_with_time(self, db: Session, uniacid: str, uid: int, start: int):
results = (db.query(self.model.testid, self.model.test_type)
.filter(self.model.weid == int(uniacid), self.model.uid == uid, self.model.istatus == 1,
self.model.createtime > start)
.order_by(self.model.id)
.all())
# 转换成字典
return [{
"testid": result[0],
"test_type": result[1]
} for result in results] if results else None
class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]): class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]):
def get_user_pool(self, db: Session, user_pool_id: int): def get_user_pool(self, db: Session, user_pool_id: int):
return self.get_by_field(db, "id", user_pool_id) return self.get_by_field(db, "id", user_pool_id)
def get_kid_id_by_paperid(self, db: Session, uniacid: str, uid: int, paperid: int):
result = (db.query(self.model.id).filter(self.model.weid == uniacid, self.model.uid == uid,
self.model.paperid == paperid, self.model.istatus == 1).first())
return result[0] if result else None
def get_user_pool(self, db: Session, uniacid: str, uid: int) -> List[Tuple[int, int]]:
results = db.query(UserPool.id, UserPool.poolid).join(
# 左连接
TestType,
UserPool.poolid == TestType.id,
isouter=True
).filter(
UserPool.weid == uniacid,
UserPool.uid == uid,
TestType.istatus == 1,
TestType.status == 1
).all()
return results if results else []
# pdo_fetchall("SELECT up.id,up.poolid FROM "
# tablename("goouc_fullexam_user_pool")
# as up LEFT JOIN " . tablename("goouc_fullexam_test_type")
# AS tt ON up.poolid=tt.id
# WHERE up.weid=:weid AND up.uid=:uid AND tt.istatus = 1 AND tt.status = 1
# [":weid" => $_W["uniacid"], ":uid" => $uid]);
def get_kid_id_by_poolid(self, db: Session, uniacid: str, uid: int, poolid: int):
result = (db.query(self.model.id)
.filter(self.model.weid == int(uniacid))
.filter(self.model.uid == uid)
.filter(self.model.poolid == poolid)
.filter(self.model.istatus == 1)
.first())
return result[0] if result else None
# $kid = pdo_getcolumn("goouc_fullexam_user_pool",
# ["weid" = > $_W["uniacid"], "uid" = > $uid, "poolid" = > $pool["id"], "istatus" = > 1], "id");
class CRUDUserQtype(CRUDBase[UserQtype, UserQTypeCreate, UserQTypeUpdate]):
def get_user_qype(self, db: Session, user_qtype_id: int):
return self.get_by_field(db, "id", user_qtype_id)
def get_last_id_by_type_id_and_uid(self, db: Session, uniacid: str, uid: int, type_id: int, istatus: int) -> \
Optional[int]:
results = (db.query(self.model.last_id).filter(
self.model.weid == int(uniacid),
self.model.uid == uid,
self.model.type_id == type_id,
self.model.istatus == istatus)
.first())
# $last_id = pdo_fetchcolumn("SELECT last_id FROM ".tablename("goouc_fullexam_user_qtype").
# " WHERE weid=:weid AND uid=:uid AND type_id=:type_id AND istatus=1 ", [":weid" = > $_W[
# "uniacid"], ":uid" = > $uid, ":type_id" = > $qtype]);
return results[0] if results else None

View File

@ -369,7 +369,7 @@ class UserSpecial(Base):
orm_mode = True orm_mode = True
class UserSpequence(Base): class UserSequence(Base):
__tablename__ = 'ims_goouc_fullexam_user_spequence' __tablename__ = 'ims_goouc_fullexam_user_spequence'
__table_args__ = ( __table_args__ = (
Index('idx_weid', 'weid'), Index('idx_weid', 'weid'),

View File

@ -464,8 +464,16 @@ class GiftInDB(GiftBase):
orm_mode = True orm_mode = True
class GiftResponse(GiftInDB): class GiftResponse(BaseModel):
pass # 可以根据需要添加额外的字段或调整现有字段 # 可以根据需要添加额外的字段或调整现有字段
weid: int = Field(..., ge=0, description="站点 ID默认为0")
name: str = Field(..., max_length=100, description="礼品名称")
price: int = Field(100, ge=0, description="礼品市场价格默认为100")
coins: int = Field(500, ge=0, description="礼品所需金币默认为500")
image: str = Field(..., max_length=255, description="礼品图片路径")
about: str = Field(..., description="礼品描述")
status: str = Field(..., description="礼品状态")
createtime: Optional[int] = Field(None, description="创建时间Unix 时间戳")
# 用于批量操作的模型 # 用于批量操作的模型
@ -821,7 +829,7 @@ class PaperTestListResponse(BaseModel):
class PhoneCodeBase(BaseModel): class PhoneCodeBase(BaseModel):
weid: int = Field(..., ge=0, description="站点 ID") weid: int = Field(..., ge=0, description="站点 ID")
phone: str = Field(..., min_length=11, max_length=11, regex=r"^\d{11}$", description="手机号码") phone: str = Field(..., min_length=11, max_length=11, description="手机号码")
code: int = Field(..., ge=100000, le=999999, description="手机验证码通常为6位数字") code: int = Field(..., ge=100000, le=999999, description="手机验证码通常为6位数字")
createtime: int = Field(..., description="创建时间Unix 时间戳") createtime: int = Field(..., description="创建时间Unix 时间戳")