添加接口测试

This commit is contained in:
烟雨如花 2025-01-15 20:26:58 +08:00
parent 12318114f5
commit 84eb746bab
4 changed files with 290 additions and 6 deletions

View File

@ -1,5 +1,5 @@
from fastapi import APIRouter
from mooc.api.v1.endpoints import admin, wechat, account, uni_account
from mooc.api.v1.endpoints import admin, wechat, account, uni_account, wxapp
api_router = APIRouter()
@ -31,4 +31,10 @@ api_router.include_router(
uni_account.uni_account_router,
prefix="/uni-accounts",
tags=["uni-accounts"]
)
api_router.include_router(
wxapp.wxapp_router,
prefix="/wxapp",
tags=["wxapp"]
)

View File

@ -0,0 +1,278 @@
from fastapi import APIRouter, Depends, Request, Form, Body
from typing import Optional, Dict, Any
from pydantic import BaseModel
from sqlalchemy.orm import Session
from mooc.db.database import get_db
from mooc.crud.crud_goouc_fullexam_user import (
CRUDUserDoexam,
CRUDUserExamAnswer,
CRUDUserWrongPraction,
CRUDUserCollectionPraction
)
from mooc.models.goouc_fullexam_user import (
UserDoexam,
UserExamAnswer,
UserWrongPraction,
UserCollectionPraction
)
wxapp_router = APIRouter()
class WxappRequest(BaseModel):
uid: Optional[str] = None
op: Optional[str] = None
m: Optional[str] = None
data: Dict[str, Any] = {}
@wxapp_router.post("/index")
async def handle_wxapp_request(
request: Request,
i: str,
t: Optional[str] = None,
v: Optional[str] = None,
c: Optional[str] = "entry",
a: Optional[str] = "wxapp",
do: Optional[str] = None,
db: Session = Depends(get_db)
):
# 获取表单数据
try:
form_data = await request.form()
print("Form data:", form_data)
# 将表单数据转换为字典
data = dict(form_data)
except Exception as e:
print("Error reading form data:", e)
# 如果没有表单数据尝试读取JSON
try:
data = await request.json()
print("JSON data:", data)
except Exception as e:
print("Error reading JSON:", e)
data = {}
print("Final data:", data)
print("Query params:", request.query_params)
# 初始化CRUD操作类传入对应的模型
user_doexam = CRUDUserDoexam(UserDoexam)
user_exam_answer = CRUDUserExamAnswer(UserExamAnswer)
user_wrong_praction = CRUDUserWrongPraction(UserWrongPraction)
user_collection = CRUDUserCollectionPraction(UserCollectionPraction)
# 根据do参数处理不同的业务逻辑
if do == "Setuserinfo":
return await handle_user_info(WxappRequest(**data), db)
elif do == "ExamOperation":
return await handle_exam_operation(WxappRequest(**data), db, user_doexam, user_exam_answer)
elif do == "Collection":
return await handle_collection(WxappRequest(**data), db, user_collection)
elif do == "WrongQuestion":
return await handle_wrong_question(WxappRequest(**data), db, user_wrong_praction)
elif do == "TotalqNum":
# 添加新的处理逻辑
return {"code": 0, "data": {"total": 100}, "msg": "success"}
elif do == "Index":
# 添加首页处理逻辑
return {"code": 0, "data": {}, "msg": "success"}
elif do == "Advert":
# 添加广告处理逻辑
return {"code": 0, "data": [], "msg": "success"}
return {"code": 404, "msg": "接口未找到"}
async def handle_user_info(data: WxappRequest, db: Session):
"""处理用户信息相关操作"""
operations = {
"getinfo": get_user_info,
"update": update_user_info,
"bind": bind_user,
"unbind": unbind_user,
}
operation = operations.get(data.op)
if not operation:
return {
"code": 1,
"msg": f"Unsupported operation: {data.op}"
}
try:
result = await operation(data.uid, data.data, db)
return {
"code": 0,
"data": result,
"msg": "success"
}
except Exception as e:
return {
"code": 1,
"msg": str(e)
}
async def handle_exam_operation(
data: WxappRequest,
db: Session,
user_doexam: CRUDUserDoexam,
user_exam_answer: CRUDUserExamAnswer
):
"""处理考试相关操作"""
operations = {
"submit": submit_exam,
"get_history": get_exam_history,
"get_detail": get_exam_detail,
}
operation = operations.get(data.op)
if not operation:
return {
"code": 1,
"msg": f"Unsupported operation: {data.op}"
}
try:
result = await operation(data.uid, data.data, db, user_doexam, user_exam_answer)
return {
"code": 0,
"data": result,
"msg": "success"
}
except Exception as e:
return {
"code": 1,
"msg": str(e)
}
async def handle_collection(
data: WxappRequest,
db: Session,
user_collection: CRUDUserCollectionPraction
):
"""处理收藏相关操作"""
operations = {
"add": add_collection,
"remove": remove_collection,
"list": list_collections,
}
operation = operations.get(data.op)
if not operation:
return {
"code": 1,
"msg": f"Unsupported operation: {data.op}"
}
try:
result = await operation(data.uid, data.data, db, user_collection)
return {
"code": 0,
"data": result,
"msg": "success"
}
except Exception as e:
return {
"code": 1,
"msg": str(e)
}
async def handle_wrong_question(
data: WxappRequest,
db: Session,
user_wrong_praction: CRUDUserWrongPraction
):
"""处理错题相关操作"""
operations = {
"add": add_wrong_question,
"remove": remove_wrong_question,
"list": list_wrong_questions,
}
operation = operations.get(data.op)
if not operation:
return {
"code": 1,
"msg": f"Unsupported operation: {data.op}"
}
try:
result = await operation(data.uid, data.data, db, user_wrong_praction)
return {
"code": 0,
"data": result,
"msg": "success"
}
except Exception as e:
return {
"code": 1,
"msg": str(e)
}
# 具体的操作函数实现...
# 用户信息相关
async def get_user_info(uid: str, data: Dict[str, Any], db: Session):
# 实现获取用户信息的逻辑
print("get_user_info", uid, data, db)
pass
async def update_user_info(uid: str, data: Dict[str, Any], db: Session):
# 实现更新用户信息的逻辑
print("update_user_info", uid, data, db)
pass
async def bind_user(uid: str, data: Dict[str, Any], db: Session):
# 实现绑定用户的逻辑
print("bind_user", uid, data, db)
pass
async def unbind_user(uid: str, data: Dict[str, Any], db: Session):
# 实现解绑用户的逻辑
print("unbind_user", uid, data, db)
pass
# 考试相关
async def submit_exam(uid: str, data: Dict[str, Any], db: Session, user_doexam, user_exam_answer):
# 实现提交考试的逻辑
print("submit_exam", uid, data, db, user_doexam, user_exam_answer)
pass
async def get_exam_history(uid: str, data: Dict[str, Any], db: Session, user_doexam, user_exam_answer):
# 实现获取考试历史的逻辑
print("get_exam_history", uid, data, db, user_doexam, user_exam_answer)
pass
async def get_exam_detail(uid: str, data: Dict[str, Any], db: Session, user_doexam, user_exam_answer):
# 实现获取考试详情的逻辑
print("get_exam_detail", uid, data, db, user_doexam, user_exam_answer)
pass
# 收藏相关
async def add_collection(uid: str, data: Dict[str, Any], db: Session, user_collection):
# 实现添加收藏的逻辑
print("add_collection", uid, data, db, user_collection)
pass
async def remove_collection(uid: str, data: Dict[str, Any], db: Session, user_collection):
# 实现移除收藏的逻辑
print("remove_collection", uid, data, db, user_collection)
pass
async def list_collections(uid: str, data: Dict[str, Any], db: Session, user_collection):
# 实现列出收藏的逻辑
print("list_collections", uid, data, db, user_collection)
pass
# 错题相关
async def add_wrong_question(uid: str, data: Dict[str, Any], db: Session, user_wrong_praction):
# 实现添加错题的逻辑
print("add_wrong_question", uid, data, db, user_wrong_praction)
pass
async def remove_wrong_question(uid: str, data: Dict[str, Any], db: Session, user_wrong_praction):
# 实现移除错题的逻辑
print("remove_wrong_question", uid, data, db, user_wrong_praction)
pass
async def list_wrong_questions(uid: str, data: Dict[str, Any], db: Session, user_wrong_praction):
# 实现列出错题的逻辑
print("list_wrong_questions", uid, data, db, user_wrong_praction)
pass

View File

@ -11,7 +11,7 @@ class UserBase(BaseModel):
nickname: Optional[str] = Field(None, max_length=255, description="用户昵称 可保存特殊符号")
headimg: Optional[str] = Field(None, max_length=255, description="用户头像")
name: Optional[str] = Field(None, max_length=50, description="用户姓名")
phone: Optional[str] = Field(None, max_length=11, description="手机号", regex=r"^\d{11}$")
phone: Optional[str] = Field(None, max_length=11, description="手机号", pattern=r"^\d{11}$")
gradeid: Optional[int] = Field(None, ge=0, description="选择年级ID")
classid: Optional[int] = Field(None, ge=0, description="参加班级的ID")
groupid: Optional[int] = Field(None, ge=0, description="群组ID")

View File

@ -5,14 +5,14 @@ redis>=4.0.2
python-dotenv>=0.19.0
sqlalchemy>=2.0.36
fastapi>=0.115.6
pyjwt>=2.10.1
pyjwt>=2.9.0
requests>=2.32.3
pydantic>=2.10.4
passlib>=1.7.4
uvicorn>=0.34.0
uvicorn>=0.33.0
pytest>=7.4.4
Pillow>=11.0.0
numpy>=2.2.1
Pillow>=10.4.0
numpy>=1.24.4
python-multipart>=0.0.20
pymysql>=1.1.1
cryptography>=42.0.5