1.统一CRUD操作

2.完成登录部分接口
3.暂时挂载本地图片链接作为头像存储
This commit is contained in:
?..濡.. 2025-03-04 20:36:52 +08:00
parent 5d8480dd17
commit 8451ad034c
95 changed files with 8378 additions and 1453 deletions

3
.gitignore vendored
View File

@ -46,3 +46,6 @@ ENV/
.idea/
*.swp
.DS_Store
server-www/
wxapp/
logs/

6355
exam.sql Normal file

File diff suppressed because one or more lines are too long

31
main.py Executable file → Normal file
View File

@ -1,9 +1,13 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from fastapi.staticfiles import StaticFiles
import os
from pathlib import Path
from mooc.db.database import init_db
from mooc.api.v1.api import api_router
from mooc.core.config import settings
from mooc.core.logger import setup_logging, get_logger
app = FastAPI(
title="ExamService",
@ -11,6 +15,10 @@ app = FastAPI(
version="1.0.0"
)
# 初始化日志系统
setup_logging(app)
logger = get_logger(__name__)
# CORS设置
app.add_middleware(
CORSMiddleware,
@ -22,12 +30,29 @@ app.add_middleware(
# 初始化数据库
init_db()
logger.info("Database initialized")
# 确保上传目录存在
upload_dir = settings.UPLOAD_IMG_DIR
if not os.path.isabs(upload_dir):
# 获取当前工作目录
base_dir = os.path.dirname(os.path.abspath(__file__))
upload_dir = os.path.join(base_dir, upload_dir.lstrip('/'))
os.makedirs(upload_dir, exist_ok=True)
logger.info(f"Static files directory: {upload_dir}")
# 挂载静态文件目录
# 注意:这里的'/attachment'必须与你在URL中使用的路径一致
app.mount("/attachment", StaticFiles(directory=upload_dir), name="attachment")
app.include_router(api_router, prefix=settings.API_V1_STR)
logger.info(f"API routes registered with prefix: {settings.API_V1_STR}")
@app.get("/")
async def root():
return {"message": "Welcome to ExamService API"}
return {"message": f"Welcome to {settings.PROJECT_NAME} API"}
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True, workers=1)
logger.info("Starting server...")
uvicorn.run('main:app', host='0.0.0.0', port=2333, reload=True, workers=1)

View File

@ -0,0 +1,18 @@
from fastapi import APIRouter, Depends
from mooc.utils.network_check import diagnose_wechat_api
system_router = APIRouter()
@system_router.get("/diagnose")
async def diagnose_system():
"""系统诊断接口"""
# 诊断微信API连接
wechat_results = await diagnose_wechat_api()
return {
"code": 0,
"msg": "诊断完成",
"data": {
"wechat_api": wechat_results
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,18 @@
from pydantic_settings import BaseSettings
from typing import Optional
from typing import Optional, Dict, Any, List
import os
import logging
# 创建一个基本的配置日志记录器
config_logger = logging.getLogger("config")
if not config_logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
config_logger.addHandler(handler)
config_logger.setLevel(logging.INFO)
class Settings(BaseSettings):
PROJECT_NAME: str = "ExamService"
@ -14,7 +27,7 @@ class Settings(BaseSettings):
MYSQL_DATABASE: str = "mooc"
SQLALCHEMY_DATABASE_URI: Optional[str] = None
SQLALCHEMY_ECHO: bool = True
SQLALCHEMY_ECHO: bool = False
SECRET_KEY: str = "your-secret-key-here"
ALGORITHM: str = "HS256"
@ -23,13 +36,63 @@ class Settings(BaseSettings):
WECHAT_APPID: str = ""
WECHAT_APPSECRET: str = ""
WECHAT_UNIACID: int = 1
# 日志配置
LOG_DIR: str = "logs"
LOG_NAME: str = "exam_service"
LOG_LEVEL: int = logging.DEBUG
LOG_MAX_SIZE: int = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT: int = 5
ENVIRONMENT: str = "development" # development, testing, production
BASE_URL: str = "http://127.0.0.1:2333"
UPLOAD_IMG_DIR: str = "upload/images"
ATTACH_URL: str = "/attachment/" # 用于构建返回的文件URL
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 设置数据库URI
self.SQLALCHEMY_DATABASE_URI = (
f"mysql+pymysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}"
f"@{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DATABASE}"
)
# 确保上传目录为绝对路径并存在
self._setup_upload_directory()
def _setup_upload_directory(self):
"""确保上传目录为绝对路径且存在"""
# 首先记录初始值,便于诊断
config_logger.info(f"初始上传目录配置: {self.UPLOAD_IMG_DIR}")
# 确保没有前导斜杠,避免被误判为绝对路径
if self.UPLOAD_IMG_DIR.startswith('/'):
self.UPLOAD_IMG_DIR = self.UPLOAD_IMG_DIR[1:]
config_logger.info(f"移除前导斜杠: {self.UPLOAD_IMG_DIR}")
# 规范化路径分隔符
self.UPLOAD_IMG_DIR = os.path.normpath(self.UPLOAD_IMG_DIR)
# 现在正确判断并处理绝对路径
if not os.path.isabs(self.UPLOAD_IMG_DIR):
# 获取当前工作目录的绝对路径
current_dir = os.path.abspath(os.getcwd())
config_logger.info(f"当前工作目录: {current_dir}")
# 安全地拼接路径
self.UPLOAD_IMG_DIR = os.path.join(current_dir, self.UPLOAD_IMG_DIR)
config_logger.info(f"转换上传目录为绝对路径: {self.UPLOAD_IMG_DIR}")
# 确保目录存在
if not os.path.exists(self.UPLOAD_IMG_DIR):
os.makedirs(self.UPLOAD_IMG_DIR, exist_ok=True)
config_logger.info(f"创建上传目录: {self.UPLOAD_IMG_DIR}")
else:
config_logger.info(f"上传目录已存在: {self.UPLOAD_IMG_DIR}")
# 记录最终的上传目录路径
config_logger.info(f"上传文件将保存到: {self.UPLOAD_IMG_DIR}")
class Config:
env_file = ".env"

66
mooc/core/exceptions.py Normal file
View File

@ -0,0 +1,66 @@
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException
from typing import Union, Dict, Any
from mooc.core.response import ResponseFactory
from mooc.core.logger import get_logger
logger = get_logger(__name__)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""处理请求参数验证错误"""
logger.warning(f"Request validation error: {exc.errors()}")
# 格式化错误信息
error_details = []
for error in exc.errors():
error_details.append({
"loc": error.get("loc", []),
"msg": error.get("msg", ""),
"type": error.get("type", "")
})
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=ResponseFactory.failed_validation(
msg="请求参数验证失败",
errors=error_details
)
)
async def http_exception_handler(request: Request, exc: HTTPException):
"""处理HTTP异常"""
logger.warning(f"HTTP exception: {exc.status_code} - {exc.detail}")
# 根据状态码选择适当的响应生成方法
if exc.status_code == status.HTTP_404_NOT_FOUND:
content = ResponseFactory.not_found(msg=str(exc.detail))
elif exc.status_code == status.HTTP_401_UNAUTHORIZED:
content = ResponseFactory.unauthorized(msg=str(exc.detail))
elif exc.status_code == status.HTTP_403_FORBIDDEN:
content = ResponseFactory.forbidden(msg=str(exc.detail))
else:
content = ResponseFactory.error(
code=exc.status_code,
msg=str(exc.detail)
)
return JSONResponse(
status_code=exc.status_code,
content=content,
headers=exc.headers or {}
)
async def generic_exception_handler(request: Request, exc: Exception):
"""处理所有其他未捕获的异常"""
logger.exception(f"Unhandled exception: {str(exc)}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=ResponseFactory.server_error(
msg="服务器内部错误",
data={"detail": str(exc)} if not isinstance(exc, AssertionError) else None
)
)

201
mooc/core/logger.py Normal file
View File

@ -0,0 +1,201 @@
import os
import logging
import sys
from datetime import datetime
from pathlib import Path
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
import json
from typing import Dict, Any, Optional
import colorlog # 导入colorlog库
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.responses import JSONResponse
from mooc.core.config import settings
# 创建日志目录
log_dir = Path(settings.LOG_DIR)
if not log_dir.exists():
log_dir.mkdir(parents=True)
# 日志格式
LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
# ANSI颜色代码
COLORS = {
'DEBUG': '\033[36m', # 青色
'INFO': '\033[32m', # 绿色
'WARNING': '\033[33m', # 黄色
'ERROR': '\033[31m', # 红色
'CRITICAL': '\033[41m\033[37m', # 白字红底
'RESET': '\033[0m' # 重置
}
class ColoredFormatter(logging.Formatter):
def format(self, record):
levelname = record.levelname
if levelname in COLORS:
record.levelname = f"{COLORS[levelname]}{levelname}{COLORS['RESET']}"
return super().format(record)
class SQLFilter(logging.Filter):
"""过滤掉 SQLAlchemy 查询日志"""
def filter(self, record):
# 如果是 sqlalchemy.engine 的日志且包含 SELECT 等查询语句,则过滤掉
if record.name.startswith('sqlalchemy.engine'):
msg = record.getMessage()
return not any(keyword in msg for keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE'])
return True
# 获取根日志记录器
root_logger = logging.getLogger()
root_logger.setLevel(settings.LOG_LEVEL)
# 清除现有处理器
if root_logger.handlers:
root_logger.handlers.clear()
# 添加彩色控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
color_formatter = ColoredFormatter(LOGGING_FORMAT, DATE_FORMAT)
console_handler.setFormatter(color_formatter)
console_handler.setLevel(settings.LOG_LEVEL)
root_logger.addHandler(console_handler)
# 添加文件处理器 - 按大小轮转
file_handler = RotatingFileHandler(
filename=os.path.join(settings.LOG_DIR, f"{settings.LOG_NAME}.log"),
maxBytes=settings.LOG_MAX_SIZE,
backupCount=settings.LOG_BACKUP_COUNT,
encoding='utf-8'
)
file_handler.setFormatter(logging.Formatter(LOGGING_FORMAT, DATE_FORMAT))
file_handler.setLevel(settings.LOG_LEVEL)
root_logger.addHandler(file_handler)
# 添加文件处理器 - 错误日志单独存储
error_file_handler = RotatingFileHandler(
filename=os.path.join(settings.LOG_DIR, f"{settings.LOG_NAME}_error.log"),
maxBytes=settings.LOG_MAX_SIZE,
backupCount=settings.LOG_BACKUP_COUNT,
encoding='utf-8'
)
error_file_handler.setFormatter(logging.Formatter(LOGGING_FORMAT, DATE_FORMAT))
error_file_handler.setLevel(logging.ERROR)
root_logger.addHandler(error_file_handler)
# 如果是生产环境,添加每日轮转的文件处理器
if settings.ENVIRONMENT == "production":
daily_handler = TimedRotatingFileHandler(
filename=os.path.join(settings.LOG_DIR, f"{settings.LOG_NAME}_daily.log"),
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
daily_handler.setFormatter(logging.Formatter(LOGGING_FORMAT, DATE_FORMAT))
daily_handler.setLevel(settings.LOG_LEVEL)
root_logger.addHandler(daily_handler)
# 为第三方库设置更高的日志级别,减少日志噪音
for logger_name in ['uvicorn', 'uvicorn.error', 'uvicorn.access', 'sqlalchemy','httpcore','httpx','urllib3','python_multipart',"sqlalchemy.engine"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
# 创建获取日志记录器的函数
def get_logger(name: str) -> logging.Logger:
"""获取指定名称的日志记录器"""
return logging.getLogger(name)
# 请求/响应日志记录中间件
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
# 生成请求ID
request_id = f"{datetime.now().strftime('%Y%m%dT%H%M%S')}-{os.urandom(3).hex()}"
# 记录请求信息
logger = get_logger("api.request")
# 提取并安全处理请求体
try:
body = await request.body()
body_str = body.decode('utf-8') if body else ""
# 对敏感信息进行脱敏
if 'password' in body_str or 'token' in body_str:
try:
body_json = json.loads(body_str)
if 'password' in body_json:
body_json['password'] = '*****'
if 'token' in body_json:
body_json['token'] = '*****'
body_str = json.dumps(body_json)
except:
# 如果不是JSON格式简单替换敏感信息
body_str = body_str.replace('"password":"', '"password":"*****')
body_str = body_str.replace('"token":"', '"token":"*****')
except Exception as e:
body_str = f"[Error reading body: {str(e)}]"
# 记录请求日志
request_log = {
"request_id": request_id,
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers),
"client_ip": request.client.host if request.client else None,
"body": body_str[:1000] if body_str else None, # 限制大小
}
logger.info(f"Request: {json.dumps(request_log)}")
# 将请求ID传递给请求状态
request.state.request_id = request_id
try:
# 处理请求
response = await call_next(request)
# 记录响应信息
response_log = {
"request_id": request_id,
"status_code": response.status_code,
"headers": dict(response.headers),
"processing_time": None # 目前未实现处理时间计算
}
logger.info(f"Response: {json.dumps(response_log)}")
return response
except Exception as e:
# 记录未捕获的异常
error_logger = get_logger("api.error")
error_logger.exception(f"Request {request_id} failed with unhandled exception: {str(e)}")
# 返回错误响应
return JSONResponse(
status_code=500,
content={
"code": 500,
"msg": "Internal server error",
"request_id": request_id,
"data": None
}
)
# 初始化日志系统
def setup_logging(app: FastAPI) -> None:
"""初始化应用的日志系统"""
# 添加日志中间件
app.add_middleware(LoggingMiddleware)
# 设置 SQLAlchemy 日志级别
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
# 添加 SQL 过滤器
sql_filter = SQLFilter()
for handler in logging.getLogger().handlers:
handler.addFilter(sql_filter)
# 记录应用启动信息
logger = get_logger("app")
logger.info(f"Application {app.title} v{app.version} starting in {settings.ENVIRONMENT} environment")
logger.info(f"Log level: {settings.LOG_LEVEL}")

129
mooc/core/response.py Normal file
View File

@ -0,0 +1,129 @@
from typing import Any, Dict, Generic, Optional, TypeVar, Union
from pydantic import BaseModel, Field
from datetime import datetime
import uuid
from fastapi.responses import JSONResponse
from fastapi import status
# 定义泛型类型变量
T = TypeVar('T')
class ResponseModel(BaseModel, Generic[T]):
"""统一API响应模型"""
code: int = Field(0, description="状态码0表示成功非0表示错误")
message: str = Field("success", description="响应消息")
data: Optional[T] = Field(None, description="响应数据")
request_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8], description="请求ID")
timestamp: datetime = Field(default_factory=datetime.now, description="响应时间戳")
class Config:
arbitrary_types_allowed = True
json_encoders = {
datetime: lambda dt: dt.isoformat()
}
class ResponseFactory:
"""响应工厂类,用于生成标准格式的响应"""
@staticmethod
def success(data: Any = None, message: str = "success") -> Dict[str, Any]:
"""生成成功响应"""
return ResponseModel(
code=200,
message=message,
data=data
).model_dump(exclude_none=True)
@staticmethod
def error(code: int = 1, message: str = "error", data: Any = None) -> Dict[str, Any]:
"""生成错误响应"""
return ResponseModel(
code=code,
message=message,
data=data
).model_dump(exclude_none=True)
@staticmethod
def failed_validation(message: str = "数据验证失败", errors: Any = None) -> Dict[str, Any]:
"""生成验证失败响应"""
return ResponseModel(
code=400,
message=message,
data=errors
).model_dump(exclude_none=True)
@staticmethod
def not_found(message: str = "资源不存在", data: Any = None) -> Dict[str, Any]:
"""生成资源不存在响应"""
return ResponseModel(
code=404,
message=message,
data=data
).model_dump(exclude_none=True)
@staticmethod
def unauthorized(message: str = "未授权访问", data: Any = None) -> Dict[str, Any]:
"""生成未授权响应"""
return ResponseModel(
code=401,
message=message,
data=data
).model_dump(exclude_none=True)
@staticmethod
def forbidden(message: str = "禁止访问", data: Any = None) -> Dict[str, Any]:
"""生成禁止访问响应"""
return ResponseModel(
code=403,
message=message,
data=data
).model_dump(exclude_none=True)
@staticmethod
def server_error(message: str = "服务器内部错误", data: Any = None) -> Dict[str, Any]:
"""生成服务器错误响应"""
return ResponseModel(
code=500,
message=message,
data=data
).model_dump(exclude_none=True)
@staticmethod
def json_response(
data: Any = None,
message: str = "success",
code: int = 0,
status_code: int = status.HTTP_200_OK
) -> JSONResponse:
"""生成JSONResponse对象"""
content = ResponseModel(
code=code,
message=message,
data=data
).model_dump(exclude_none=True)
return JSONResponse(
content=content,
status_code=status_code
)
class Result:
"""统一API响应工具类"""
@staticmethod
def success(data: Any = None, message: str = "success") -> Dict[str, Any]:
"""返回成功响应"""
return {
"code": 0,
"message": message,
"data": data
}
@staticmethod
def error(code: int = 1, message: str = "error", data: Any = None) -> Dict[str, Any]:
"""返回错误响应"""
return {
"code": code,
"message": message,
"data": data
}

View File

@ -0,0 +1 @@

View File

@ -20,6 +20,9 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def get(self, db: Session, id: Any) -> Optional[ModelType]:
"""通过主键获取记录"""
# 检查模型是否有id字段如果没有则使用acid
if hasattr(self.model, 'id'):
return db.query(self.model).filter(self.model.id == id).first()
return db.query(self.model).filter(self.model.acid == id).first()
def get_by_field(
@ -37,27 +40,44 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
"""
return db.query(self.model).filter(getattr(self.model, field) == value).first()
def get_multi_by_field(
def get_by_fields(
self,
db: Session,
field: str,
value: Any,
filters: Dict[str, Any]
) -> Optional[ModelType]:
"""
通过多个字段AND条件查询获取单条记录
Args:
db: 数据库会话
filters: 字段名和字段值的字典 {"field1": value1, "field2": value2}
"""
query = db.query(self.model)
for field, value in filters.items():
query = query.filter(getattr(self.model, field) == value)
return query.first()
def get_multi_by_fields(
self,
db: Session,
filters: Dict[str, Any],
*,
skip: int = 0,
limit: int = 100
) -> List[ModelType]:
"""
通过任意字段获取多条记录
通过多个字段AND条件查询获取多条记录
Args:
db: 数据库会话
field: 字段名
value: 字段值
filters: 字段名和字段值的字典 {"field1": value1, "field2": value2}
skip: 跳过记录数
limit: 返回记录数限制
"""
return db.query(self.model).filter(
getattr(self.model, field) == value
).offset(skip).limit(limit).all()
query = db.query(self.model)
for field, value in filters.items():
query = query.filter(getattr(self.model, field) == value)
return query.offset(skip).limit(limit).all()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100

View File

@ -1,32 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.coupon import CouponLocation
from mooc.schemas.coupon import CouponLocationCreate, CouponLocationUpdate
def get_coupon_location(db: Session, coupon_location_id: int):
return db.query(models.CouponLocation).filter(models.CouponLocation.id == coupon_location_id).first()
class CRUDCouponLocation(CRUDBase[CouponLocation, CouponLocationCreate, CouponLocationUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[CouponLocation]:
return self.get_by_field(db, "id", id)
def get_coupon_locations(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.CouponLocation).offset(skip).limit(limit).all()
def create_coupon_location(db: Session, coupon_location: schemas.CouponLocationCreate):
db_coupon_location = models.CouponLocation(**coupon_location.dict())
db.add(db_coupon_location)
db.commit()
db.refresh(db_coupon_location)
return db_coupon_location
def update_coupon_location(db: Session, coupon_location_id: int, coupon_location: schemas.CouponLocationUpdate):
db_coupon_location = db.query(models.CouponLocation).filter(models.CouponLocation.id == coupon_location_id).first()
if db_coupon_location:
for var, value in vars(coupon_location).items():
setattr(db_coupon_location, var, value) if value else None
db.add(db_coupon_location)
db.commit()
db.refresh(db_coupon_location)
return db_coupon_location
def delete_coupon_location(db: Session, coupon_location_id: int):
db_coupon_location = db.query(models.CouponLocation).filter(models.CouponLocation.id == coupon_location_id).first()
if db_coupon_location:
db.delete(db_coupon_location)
db.commit()
return db_coupon_location
# 创建实例
coupon_location = CRUDCouponLocation(CouponLocation)

View File

@ -1,32 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.cover import CoverReply
from mooc.schemas.cover import CoverReplyCreate, CoverReplyUpdate
def get_cover_reply(db: Session, cover_reply_id: int):
return db.query(models.CoverReply).filter(models.CoverReply.id == cover_reply_id).first()
class CRUDCoverReply(CRUDBase[CoverReply, CoverReplyCreate, CoverReplyUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[CoverReply]:
return self.get_by_field(db, "id", id)
def get_cover_replies(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.CoverReply).offset(skip).limit(limit).all()
def create_cover_reply(db: Session, cover_reply: schemas.CoverReplyCreate):
db_cover_reply = models.CoverReply(**cover_reply.dict())
db.add(db_cover_reply)
db.commit()
db.refresh(db_cover_reply)
return db_cover_reply
def update_cover_reply(db: Session, cover_reply_id: int, cover_reply: schemas.CoverReplyUpdate):
db_cover_reply = db.query(models.CoverReply).filter(models.CoverReply.id == cover_reply_id).first()
if db_cover_reply:
for var, value in vars(cover_reply).items():
setattr(db_cover_reply, var, value) if value else None
db.add(db_cover_reply)
db.commit()
db.refresh(db_cover_reply)
return db_cover_reply
def delete_cover_reply(db: Session, cover_reply_id: int):
db_cover_reply = db.query(models.CoverReply).filter(models.CoverReply.id == cover_reply_id).first()
if db_cover_reply:
db.delete(db_cover_reply)
db.commit()
return db_cover_reply
# 创建实例
cover_reply = CRUDCoverReply(CoverReply)

View File

@ -1,32 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.custom import CustomReply
from mooc.schemas.custom import CustomReplyCreate, CustomReplyUpdate
def get_custom_reply(db: Session, custom_reply_id: int):
return db.query(models.CustomReply).filter(models.CustomReply.id == custom_reply_id).first()
class CRUDCustomReply(CRUDBase[CustomReply, CustomReplyCreate, CustomReplyUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[CustomReply]:
return self.get_by_field(db, "id", id)
def get_custom_replies(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.CustomReply).offset(skip).limit(limit).all()
def create_custom_reply(db: Session, custom_reply: schemas.CustomReplyCreate):
db_custom_reply = models.CustomReply(**custom_reply.dict())
db.add(db_custom_reply)
db.commit()
db.refresh(db_custom_reply)
return db_custom_reply
def update_custom_reply(db: Session, custom_reply_id: int, custom_reply: schemas.CustomReplyUpdate):
db_custom_reply = db.query(models.CustomReply).filter(models.CustomReply.id == custom_reply_id).first()
if db_custom_reply:
for var, value in vars(custom_reply).items():
setattr(db_custom_reply, var, value) if value else None
db.add(db_custom_reply)
db.commit()
db.refresh(db_custom_reply)
return db_custom_reply
def delete_custom_reply(db: Session, custom_reply_id: int):
db_custom_reply = db.query(models.CustomReply).filter(models.CustomReply.id == custom_reply_id).first()
if db_custom_reply:
db.delete(db_custom_reply)
db.commit()
return db_custom_reply
# 创建实例
custom_reply = CRUDCustomReply(CustomReply)

View File

@ -111,7 +111,12 @@ class CRUDPaperTest(CRUDBase[PaperTest, PaperTestCreate, PaperTestUpdate]):
class CRUDPhonecode(CRUDBase[Phonecode, PhoneCodeCreate, PhoneCodeUpdate]):
def get_phonecode(self, db: Session, phonecode_id: int):
return self.get_by_field(db, "id", phonecode_id)
def get_latest_by_phone(self, db: Session, phone: str, weid: int):
"""获取指定手机号的最新验证码记录"""
return db.query(self.model).filter(
self.model.phone == phone,
self.model.weid == weid
).order_by(self.model.createtime.desc()).first()
class CRUDQYear(CRUDBase[QYear, QYearCreate, QYearUpdate]):
def get_qyear(self, db: Session, qyear_id: int):

View File

@ -4,6 +4,7 @@ from mooc.crud.crud_base import CRUDBase
from mooc.models.goouc_fullexam_user import (
# 导入全部
FullExamUser,
UserDoexam,
UserDoOtherExamAnswer,
UserExamAnswer,
@ -21,6 +22,8 @@ from mooc.models.goouc_fullexam_user import (
)
from mooc.schemas.goouc_fullexam_user import (
# 导入全部create和update
FullExamUserCreate,
FullExamUserUpdate,
UserDoexamCreate,
UserDoexamUpdate,
UserDoOtherExamAnswerCreate,
@ -82,6 +85,14 @@ class CRUDUserMember(CRUDBase[UserMember, UserMemberCreate, UserMemberUpdate]):
def get_user_member(self, db: Session, user_member_id: int):
return self.get_by_field(db, "id", user_member_id)
def get_by_openid_and_weid(self, db: Session, openid: str, weid: int, istatus: int = 1):
"""通过openid和weid获取用户"""
return db.query(self.model).filter(
self.model.openid == openid,
self.model.weid == weid,
self.model.istatus == istatus
).first()
class CRUDUserCollectionPraction(
CRUDBase[UserCollectionPraction, UserCollectionPractionCreate, UserCollectionPractionUpdate]):
@ -122,3 +133,42 @@ class CRUDUserWrongPraction(CRUDBase[UserWrongPraction, UserWrongPractionCreate,
class CRUDUserPool(CRUDBase[UserPool, UserPoolCreate, UserPoolUpdate]):
def get_user_pool(self, db: Session, user_pool_id: int):
return self.get_by_field(db, "id", user_pool_id)
class CRUDFullExamUser(CRUDBase[FullExamUser, FullExamUserCreate, FullExamUserUpdate]):
def get_full_exam_user(self, db: Session, full_exam_user_id: int):
return self.get_by_field(db, "id", full_exam_user_id)
def get_by_openid_and_weid(self, db: Session, openid: str, weid: int, istatus: int = 1):
"""通过openid和weid获取用户"""
return db.query(self.model).filter(
self.model.openid == openid,
self.model.weid == weid,
self.model.istatus == istatus
).first()
def get_by_id_and_weid(self, db: Session, id: int, weid: int, istatus: int = 1):
"""通过id和weid获取用户"""
return db.query(self.model).filter(
self.model.id == id,
self.model.weid == weid,
self.model.istatus == istatus
).first()
# 创建实例
user_doexam = CRUDUserDoexam(UserDoexam)
user_doother_exam_answer = CRUDUserDoOtherExamAnswer(UserDoOtherExamAnswer)
user_exam_answer = CRUDUserExamAnswer(UserExamAnswer)
user_special = CRUDUserSpecial(UserSpecial)
user_spequence = CRUDUserSpequence(UserSpequence)
user_member = CRUDUserMember(UserMember)
user_collection_praction = CRUDUserCollectionPraction(UserCollectionPraction)
user_gift = CRUDUserGift(UserGift)
user_qhigh = CRUDUserQHigh(UserQhigh)
user_qintensive = CRUDUserQIntensive(UserQintensive)
user_read = CRUDUserRead(UserRead)
user_doother_exam = CRUDUserDoOtherExam(UserDootherExam)
user_wrong_praction = CRUDUserWrongPraction(UserWrongPraction)
user_pool = CRUDUserPool(UserPool)
full_exam_user = CRUDFullExamUser(FullExamUser)

View File

@ -1,32 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.images import ImagesReply
from mooc.schemas.images import ImagesReplyCreate, ImagesReplyUpdate
def get_images_reply(db: Session, images_reply_id: int):
return db.query(models.ImagesReply).filter(models.ImagesReply.id == images_reply_id).first()
class CRUDImagesReply(CRUDBase[ImagesReply, ImagesReplyCreate, ImagesReplyUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[ImagesReply]:
return self.get_by_field(db, "id", id)
def get_images_replies(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.ImagesReply).offset(skip).limit(limit).all()
def create_images_reply(db: Session, images_reply: schemas.ImagesReplyCreate):
db_images_reply = models.ImagesReply(**images_reply.dict())
db.add(db_images_reply)
db.commit()
db.refresh(db_images_reply)
return db_images_reply
def update_images_reply(db: Session, images_reply_id: int, images_reply: schemas.ImagesReplyUpdate):
db_images_reply = db.query(models.ImagesReply).filter(models.ImagesReply.id == images_reply_id).first()
if db_images_reply:
for var, value in vars(images_reply).items():
setattr(db_images_reply, var, value) if value else None
db.add(db_images_reply)
db.commit()
db.refresh(db_images_reply)
return db_images_reply
def delete_images_reply(db: Session, images_reply_id: int):
db_images_reply = db.query(models.ImagesReply).filter(models.ImagesReply.id == images_reply_id).first()
if db_images_reply:
db.delete(db_images_reply)
db.commit()
return db_images_reply
# 创建实例
images_reply = CRUDImagesReply(ImagesReply)

View File

@ -1,494 +1,109 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
def get_mc_cash_record(db: Session, mc_cash_record_id: int):
return db.query(models.McCashRecord).filter(models.McCashRecord.id == mc_cash_record_id).first()
def get_mc_cash_records(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McCashRecord).offset(skip).limit(limit).all()
def create_mc_cash_record(db: Session, mc_cash_record: schemas.McCashRecordCreate):
db_mc_cash_record = models.McCashRecord(**mc_cash_record.dict())
db.add(db_mc_cash_record)
db.commit()
db.refresh(db_mc_cash_record)
return db_mc_cash_record
def get_mc_chats_record(db: Session, mc_chats_record_id: int):
return db.query(models.McChatsRecord).filter(models.McChatsRecord.id == mc_chats_record_id).first()
def get_mc_chats_records(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McChatsRecord).offset(skip).limit(limit).all()
def create_mc_chats_record(db: Session, mc_chats_record: schemas.McChatsRecordCreate):
db_mc_chats_record = models.McChatsRecord(**mc_chats_record.dict())
db.add(db_mc_chats_record)
db.commit()
db.refresh(db_mc_chats_record)
return db_mc_chats_record
def update_mc_chats_record(db: Session, mc_chats_record_id: int, mc_chats_record: schemas.McChatsRecordUpdate):
db_mc_chats_record = db.query(models.McChatsRecord).filter(models.McChatsRecord.id == mc_chats_record_id).first()
if db_mc_chats_record:
for var, value in vars(mc_chats_record).items():
setattr(db_mc_chats_record, var, value) if value else None
db.add(db_mc_chats_record)
db.commit()
db.refresh(db_mc_chats_record)
return db_mc_chats_record
def delete_mc_chats_record(db: Session, mc_chats_record_id: int):
db_mc_chats_record = db.query(models.McChatsRecord).filter(models.McChatsRecord.id == mc_chats_record_id).first()
if db_mc_chats_record:
db.delete(db_mc_chats_record)
db.commit()
return db_mc_chats_record
def update_mc_cash_record(db: Session, mc_cash_record_id: int, mc_cash_record: schemas.McCashRecordUpdate):
db_mc_cash_record = db.query(models.McCashRecord).filter(models.McCashRecord.id == mc_cash_record_id).first()
if db_mc_cash_record:
for var, value in vars(mc_cash_record).items():
setattr(db_mc_cash_record, var, value) if value else None
db.add(db_mc_cash_record)
db.commit()
db.refresh(db_mc_cash_record)
return db_mc_cash_record
def delete_mc_cash_record(db: Session, mc_cash_record_id: int):
db_mc_cash_record = db.query(models.McCashRecord).filter(models.McCashRecord.id == mc_cash_record_id).first()
if db_mc_cash_record:
db.delete(db_mc_cash_record)
db.commit()
return db_mc_cash_record
# McMappingFans CRUD
def get_mc_mapping_fans(db: Session, fanid: int):
return db.query(models.McMappingFans).filter(models.McMappingFans.fanid == fanid).first()
def get_mc_mapping_fanss(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McMappingFans).offset(skip).limit(limit).all()
def create_mc_mapping_fans(db: Session, mc_mapping_fans: schemas.McMappingFansCreate):
db_mc_mapping_fans = models.McMappingFans(**mc_mapping_fans.dict())
db.add(db_mc_mapping_fans)
db.commit()
db.refresh(db_mc_mapping_fans)
return db_mc_mapping_fans
def update_mc_mapping_fans(db: Session, fanid: int, mc_mapping_fans: schemas.McMappingFansUpdate):
db_mc_mapping_fans = db.query(models.McMappingFans).filter(models.McMappingFans.fanid == fanid).first()
if db_mc_mapping_fans:
for var, value in vars(mc_mapping_fans).items():
setattr(db_mc_mapping_fans, var, value) if value else None
db.add(db_mc_mapping_fans)
db.commit()
db.refresh(db_mc_mapping_fans)
return db_mc_mapping_fans
def delete_mc_mapping_fans(db: Session, fanid: int):
db_mc_mapping_fans = db.query(models.McMappingFans).filter(models.McMappingFans.fanid == fanid).first()
if db_mc_mapping_fans:
db.delete(db_mc_mapping_fans)
db.commit()
return db_mc_mapping_fans
# McMassRecord CRUD
def get_mc_mass_record(db: Session, id: int):
return db.query(models.McMassRecord).filter(models.McMassRecord.id == id).first()
def get_mc_mass_records(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McMassRecord).offset(skip).limit(limit).all()
def create_mc_mass_record(db: Session, mc_mass_record: schemas.McMassRecordCreate):
db_mc_mass_record = models.McMassRecord(**mc_mass_record.dict())
db.add(db_mc_mass_record)
db.commit()
db.refresh(db_mc_mass_record)
return db_mc_mass_record
def update_mc_mass_record(db: Session, id: int, mc_mass_record: schemas.McMassRecordUpdate):
db_mc_mass_record = db.query(models.McMassRecord).filter(models.McMassRecord.id == id).first()
if db_mc_mass_record:
for var, value in vars(mc_mass_record).items():
setattr(db_mc_mass_record, var, value) if value else None
db.add(db_mc_mass_record)
db.commit()
db.refresh(db_mc_mass_record)
return db_mc_mass_record
def delete_mc_mass_record(db: Session, id: int):
db_mc_mass_record = db.query(models.McMassRecord).filter(models.McMassRecord.id == id).first()
if db_mc_mass_record:
db.delete(db_mc_mass_record)
db.commit()
return db_mc_mass_record
# McCreditsRecharge CRUD
def get_mc_credits_recharge(db: Session, mc_credits_recharge_id: int):
return db.query(models.McCreditsRecharge).filter(models.McCreditsRecharge.id == mc_credits_recharge_id).first()
def get_mc_credits_recharges(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McCreditsRecharge).offset(skip).limit(limit).all()
def create_mc_credits_recharge(db: Session, mc_credits_recharge: schemas.McCreditsRechargeCreate):
db_mc_credits_recharge = models.McCreditsRecharge(**mc_credits_recharge.dict())
db.add(db_mc_credits_recharge)
db.commit()
db.refresh(db_mc_credits_recharge)
return db_mc_credits_recharge
def update_mc_credits_recharge(db: Session, mc_credits_recharge_id: int, mc_credits_recharge: schemas.McCreditsRechargeUpdate):
db_mc_credits_recharge = db.query(models.McCreditsRecharge).filter(models.McCreditsRecharge.id == mc_credits_recharge_id).first()
if db_mc_credits_recharge:
for var, value in vars(mc_credits_recharge).items():
setattr(db_mc_credits_recharge, var, value) if value else None
db.add(db_mc_credits_recharge)
db.commit()
db.refresh(db_mc_credits_recharge)
return db_mc_credits_recharge
def delete_mc_credits_recharge(db: Session, mc_credits_recharge_id: int):
db_mc_credits_recharge = db.query(models.McCreditsRecharge).filter(models.McCreditsRecharge.id == mc_credits_recharge_id).first()
if db_mc_credits_recharge:
db.delete(db_mc_credits_recharge)
db.commit()
return db_mc_credits_recharge
# McCreditsRecord CRUD
def get_mc_credits_record(db: Session, mc_credits_record_id: int):
return db.query(models.McCreditsRecord).filter(models.McCreditsRecord.id == mc_credits_record_id).first()
def get_mc_credits_records(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McCreditsRecord).offset(skip).limit(limit).all()
def create_mc_credits_record(db: Session, mc_credits_record: schemas.McCreditsRecordCreate):
db_mc_credits_record = models.McCreditsRecord(**mc_credits_record.dict())
db.add(db_mc_credits_record)
db.commit()
db.refresh(db_mc_credits_record)
return db_mc_credits_record
def update_mc_credits_record(db: Session, mc_credits_record_id: int, mc_credits_record: schemas.McCreditsRecordUpdate):
db_mc_credits_record = db.query(models.McCreditsRecord).filter(models.McCreditsRecord.id == mc_credits_record_id).first()
if db_mc_credits_record:
for var, value in vars(mc_credits_record).items():
setattr(db_mc_credits_record, var, value) if value else None
db.add(db_mc_credits_record)
db.commit()
db.refresh(db_mc_credits_record)
return db_mc_credits_record
def delete_mc_credits_record(db: Session, mc_credits_record_id: int):
db_mc_credits_record = db.query(models.McCreditsRecord).filter(models.McCreditsRecord.id == mc_credits_record_id).first()
if db_mc_credits_record:
db.delete(db_mc_credits_record)
db.commit()
return db_mc_credits_record
# MCFansGroups CRUD
def get_mc_fans_group(db: Session, mc_fans_group_id: int):
return db.query(models.MCFansGroups).filter(models.MCFansGroups.id == mc_fans_group_id).first()
def get_mc_fans_groups(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MCFansGroups).offset(skip).limit(limit).all()
def create_mc_fans_group(db: Session, mc_fans_group: schemas.MCFansGroupsCreate):
db_mc_fans_group = models.MCFansGroups(**mc_fans_group.dict())
db.add(db_mc_fans_group)
db.commit()
db.refresh(db_mc_fans_group)
return db_mc_fans_group
def update_mc_fans_group(db: Session, mc_fans_group_id: int, mc_fans_group: schemas.MCFansGroupsUpdate):
db_mc_fans_group = db.query(models.MCFansGroups).filter(models.MCFansGroups.id == mc_fans_group_id).first()
if db_mc_fans_group:
for var, value in vars(mc_fans_group).items():
setattr(db_mc_fans_group, var, value) if value else None
db.add(db_mc_fans_group)
db.commit()
db.refresh(db_mc_fans_group)
return db_mc_fans_group
def delete_mc_fans_group(db: Session, mc_fans_group_id: int):
db_mc_fans_group = db.query(models.MCFansGroups).filter(models.MCFansGroups.id == mc_fans_group_id).first()
if db_mc_fans_group:
db.delete(db_mc_fans_group)
db.commit()
return db_mc_fans_group
# MCFansTag CRUD
def get_mc_fans_tag(db: Session, mc_fans_tag_id: int):
return db.query(models.MCFansTag).filter(models.MCFansTag.id == mc_fans_tag_id).first()
def get_mc_fans_tags(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MCFansTag).offset(skip).limit(limit).all()
def create_mc_fans_tag(db: Session, mc_fans_tag: schemas.MCFansTagCreate):
db_mc_fans_tag = models.MCFansTag(**mc_fans_tag.dict())
db.add(db_mc_fans_tag)
db.commit()
db.refresh(db_mc_fans_tag)
return db_mc_fans_tag
def update_mc_fans_tag(db: Session, mc_fans_tag_id: int, mc_fans_tag: schemas.MCFansTagUpdate):
db_mc_fans_tag = db.query(models.MCFansTag).filter(models.MCFansTag.id == mc_fans_tag_id).first()
if db_mc_fans_tag:
for var, value in vars(mc_fans_tag).items():
setattr(db_mc_fans_tag, var, value) if value else None
db.add(db_mc_fans_tag)
db.commit()
db.refresh(db_mc_fans_tag)
return db_mc_fans_tag
def delete_mc_fans_tag(db: Session, mc_fans_tag_id: int):
db_mc_fans_tag = db.query(models.MCFansTag).filter(models.MCFansTag.id == mc_fans_tag_id).first()
if db_mc_fans_tag:
db.delete(db_mc_fans_tag)
db.commit()
return db_mc_fans_tag
# MCFansTagMapping CRUD
def get_mc_fans_tag_mapping(db: Session, mc_fans_tag_mapping_id: int):
return db.query(models.MCFansTagMapping).filter(models.MCFansTagMapping.id == mc_fans_tag_mapping_id).first()
def get_mc_fans_tag_mappings(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MCFansTagMapping).offset(skip).limit(limit).all()
def create_mc_fans_tag_mapping(db: Session, mc_fans_tag_mapping: schemas.MCFansTagMappingCreate):
db_mc_fans_tag_mapping = models.MCFansTagMapping(**mc_fans_tag_mapping.dict())
db.add(db_mc_fans_tag_mapping)
db.commit()
db.refresh(db_mc_fans_tag_mapping)
return db_mc_fans_tag_mapping
def update_mc_fans_tag_mapping(db: Session, mc_fans_tag_mapping_id: int, mc_fans_tag_mapping: schemas.MCFansTagMappingUpdate):
db_mc_fans_tag_mapping = db.query(models.MCFansTagMapping).filter(models.MCFansTagMapping.id == mc_fans_tag_mapping_id).first()
if db_mc_fans_tag_mapping:
for var, value in vars(mc_fans_tag_mapping).items():
setattr(db_mc_fans_tag_mapping, var, value) if value else None
db.add(db_mc_fans_tag_mapping)
db.commit()
db.refresh(db_mc_fans_tag_mapping)
return db_mc_fans_tag_mapping
def delete_mc_fans_tag_mapping(db: Session, mc_fans_tag_mapping_id: int):
db_mc_fans_tag_mapping = db.query(models.MCFansTagMapping).filter(models.MCFansTagMapping.id == mc_fans_tag_mapping_id).first()
if db_mc_fans_tag_mapping:
db.delete(db_mc_fans_tag_mapping)
db.commit()
return db_mc_fans_tag_mapping
# MCGroups CRUD
def get_mc_group(db: Session, mc_group_id: int):
return db.query(models.MCGroups).filter(models.MCGroups.groupid == mc_group_id).first()
def get_mc_groups(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MCGroups).offset(skip).limit(limit).all()
def create_mc_group(db: Session, mc_group: schemas.MCGroupsCreate):
db_mc_group = models.MCGroups(**mc_group.dict())
db.add(db_mc_group)
db.commit()
db.refresh(db_mc_group)
return db_mc_group
def update_mc_group(db: Session, mc_group_id: int, mc_group: schemas.MCGroupsUpdate):
db_mc_group = db.query(models.MCGroups).filter(models.MCGroups.groupid == mc_group_id).first()
if db_mc_group:
for var, value in vars(mc_group).items():
setattr(db_mc_group, var, value) if value else None
db.add(db_mc_group)
db.commit()
db.refresh(db_mc_group)
return db_mc_group
def delete_mc_group(db: Session, mc_group_id: int):
db_mc_group = db.query(models.MCGroups).filter(models.MCGroups.groupid == mc_group_id).first()
if db_mc_group:
db.delete(db_mc_group)
db.commit()
return db_mc_group
# MCHandsel CRUD
def get_mc_handsel(db: Session, mc_handsel_id: int):
return db.query(models.MCHandsel).filter(models.MCHandsel.id == mc_handsel_id).first()
def get_mc_handsels(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MCHandsel).offset(skip).limit(limit).all()
def create_mc_handsel(db: Session, mc_handsel: schemas.MCHandselCreate):
db_mc_handsel = models.MCHandsel(**mc_handsel.dict())
db.add(db_mc_handsel)
db.commit()
db.refresh(db_mc_handsel)
return db_mc_handsel
def update_mc_handsel(db: Session, mc_handsel_id: int, mc_handsel: schemas.MCHandselUpdate):
db_mc_handsel = db.query(models.MCHandsel).filter(models.MCHandsel.id == mc_handsel_id).first()
if db_mc_handsel:
for var, value in vars(mc_handsel).items():
setattr(db_mc_handsel, var, value) if value else None
db.add(db_mc_handsel)
db.commit()
db.refresh(db_mc_handsel)
return db_mc_handsel
def delete_mc_handsel(db: Session, mc_handsel_id: int):
db_mc_handsel = db.query(models.MCHandsel).filter(models.MCHandsel.id == mc_handsel_id).first()
if db_mc_handsel:
db.delete(db_mc_handsel)
db.commit()
return db_mc_handsel
# McMembers CRUD
def get_mc_member(db: Session, uid: int):
return db.query(models.McMembers).filter(models.McMembers.uid == uid).first()
def get_mc_members(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McMembers).offset(skip).limit(limit).all()
def create_mc_member(db: Session, mc_member: schemas.McMembersCreate):
db_mc_member = models.McMembers(**mc_member.dict())
db.add(db_mc_member)
db.commit()
db.refresh(db_mc_member)
return db_mc_member
def update_mc_member(db: Session, uid: int, mc_member: schemas.McMembersUpdate):
db_mc_member = db.query(models.McMembers).filter(models.McMembers.uid == uid).first()
if db_mc_member:
for var, value in vars(mc_member).items():
setattr(db_mc_member, var, value) if value else None
db.add(db_mc_member)
db.commit()
db.refresh(db_mc_member)
return db_mc_member
def delete_mc_member(db: Session, uid: int):
db_mc_member = db.query(models.McMembers).filter(models.McMembers.uid == uid).first()
if db_mc_member:
db.delete(db_mc_member)
db.commit()
return db_mc_member
# McMemberAddress CRUD
def get_mc_member_address(db: Session, id: int):
return db.query(models.McMemberAddress).filter(models.McMemberAddress.id == id).first()
def get_mc_member_addresses(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McMemberAddress).offset(skip).limit(limit).all()
def create_mc_member_address(db: Session, mc_member_address: schemas.McMemberAddressCreate):
db_mc_member_address = models.McMemberAddress(**mc_member_address.dict())
db.add(db_mc_member_address)
db.commit()
db.refresh(db_mc_member_address)
return db_mc_member_address
def update_mc_member_address(db: Session, id: int, mc_member_address: schemas.McMemberAddressUpdate):
db_mc_member_address = db.query(models.McMemberAddress).filter(models.McMemberAddress.id == id).first()
if db_mc_member_address:
for var, value in vars(mc_member_address).items():
setattr(db_mc_member_address, var, value) if value else None
db.add(db_mc_member_address)
db.commit()
db.refresh(db_mc_member_address)
return db_mc_member_address
def delete_mc_member_address(db: Session, id: int):
db_mc_member_address = db.query(models.McMemberAddress).filter(models.McMemberAddress.id == id).first()
if db_mc_member_address:
db.delete(db_mc_member_address)
db.commit()
return db_mc_member_address
# McMemberFields CRUD
def get_mc_member_fields(db: Session, id: int):
return db.query(models.McMemberFields).filter(models.McMemberFields.id == id).first()
def get_mc_member_fieldss(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McMemberFields).offset(skip).limit(limit).all()
def create_mc_member_fields(db: Session, mc_member_fields: schemas.McMemberFieldsCreate):
db_mc_member_fields = models.McMemberFields(**mc_member_fields.dict())
db.add(db_mc_member_fields)
db.commit()
db.refresh(db_mc_member_fields)
return db_mc_member_fields
def update_mc_member_fields(db: Session, id: int, mc_member_fields: schemas.McMemberFieldsUpdate):
db_mc_member_fields = db.query(models.McMemberFields).filter(models.McMemberFields.id == id).first()
if db_mc_member_fields:
for var, value in vars(mc_member_fields).items():
setattr(db_mc_member_fields, var, value) if value else None
db.add(db_mc_member_fields)
db.commit()
db.refresh(db_mc_member_fields)
return db_mc_member_fields
def delete_mc_member_fields(db: Session, id: int):
db_mc_member_fields = db.query(models.McMemberFields).filter(models.McMemberFields.id == id).first()
if db_mc_member_fields:
db.delete(db_mc_member_fields)
db.commit()
return db_mc_member_fields
# McMemberProperty CRUD
def get_mc_member_property(db: Session, id: int):
return db.query(models.McMemberProperty).filter(models.McMemberProperty.id == id).first()
def get_mc_member_properties(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McMemberProperty).offset(skip).limit(limit).all()
def create_mc_member_property(db: Session, mc_member_property: schemas.McMemberPropertyCreate):
db_mc_member_property = models.McMemberProperty(**mc_member_property.dict())
db.add(db_mc_member_property)
db.commit()
db.refresh(db_mc_member_property)
return db_mc_member_property
def update_mc_member_property(db: Session, id: int, mc_member_property: schemas.McMemberPropertyUpdate):
db_mc_member_property = db.query(models.McMemberProperty).filter(models.McMemberProperty.id == id).first()
if db_mc_member_property:
for var, value in vars(mc_member_property).items():
setattr(db_mc_member_property, var, value) if value else None
db.add(db_mc_member_property)
db.commit()
db.refresh(db_mc_member_property)
return db_mc_member_property
def delete_mc_member_property(db: Session, id: int):
db_mc_member_property = db.query(models.McMemberProperty).filter(models.McMemberProperty.id == id).first()
if db_mc_member_property:
db.delete(db_mc_member_property)
db.commit()
return db_mc_member_property
# McOauthFans CRUD
def get_mc_oauth_fans(db: Session, id: int):
return db.query(models.McOauthFans).filter(models.McOauthFans.id == id).first()
def get_mc_oauth_fanss(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.McOauthFans).offset(skip).limit(limit).all()
def create_mc_oauth_fans(db: Session, mc_oauth_fans: schemas.McOauthFansCreate):
db_mc_oauth_fans = models.McOauthFans(**mc_oauth_fans.dict())
db.add(db_mc_oauth_fans)
db.commit()
db.refresh(db_mc_oauth_fans)
return db_mc_oauth_fans
def update_mc_oauth_fans(db: Session, id: int, mc_oauth_fans: schemas.McOauthFansUpdate):
db_mc_oauth_fans = db.query(models.McOauthFans).filter(models.McOauthFans.id == id).first()
if db_mc_oauth_fans:
for var, value in vars(mc_oauth_fans).items():
setattr(db_mc_oauth_fans, var, value) if value else None
db.add(db_mc_oauth_fans)
db.commit()
db.refresh(db_mc_oauth_fans)
return db_mc_oauth_fans
def delete_mc_oauth_fans(db: Session, id: int):
db_mc_oauth_fans = db.query(models.McOauthFans).filter(models.McOauthFans.id == id).first()
if db_mc_oauth_fans:
db.delete(db_mc_oauth_fans)
db.commit()
return db_mc_oauth_fans
from mooc.crud.crud_base import CRUDBase
from mooc.models.mc import (
McCashRecord, McChatsRecord, McMappingFans, McMassRecord,
McCreditsRecharge, McCreditsRecord, MCFansGroups, MCFansTag,
MCFansTagMapping, MCGroups, MCHandsel, McMembers,
McMemberAddress, McMemberFields, McMemberProperty, McOauthFans
)
from mooc.schemas.mc import (
McCashRecordCreate, McCashRecordUpdate,
McChatsRecordCreate, McChatsRecordUpdate,
McMappingFansCreate, McMappingFansUpdate,
McMassRecordCreate, McMassRecordUpdate,
McCreditsRechargeCreate, McCreditsRechargeUpdate,
McCreditsRecordCreate, McCreditsRecordUpdate,
MCFansGroupsCreate, MCFansGroupsUpdate,
MCFansTagCreate, MCFansTagUpdate,
MCFansTagMappingCreate, MCFansTagMappingUpdate,
MCGroupsCreate, MCGroupsUpdate,
MCHandselCreate, MCHandselUpdate,
McMembersCreate, McMembersUpdate,
McMemberAddressCreate, McMemberAddressUpdate,
McMemberFieldsCreate, McMemberFieldsUpdate,
McMemberPropertyCreate, McMemberPropertyUpdate,
McOauthFansCreate, McOauthFansUpdate
)
class CRUDMcCashRecord(CRUDBase[McCashRecord, McCashRecordCreate, McCashRecordUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McCashRecord]:
return self.get_by_field(db, "id", id)
class CRUDMcChatsRecord(CRUDBase[McChatsRecord, McChatsRecordCreate, McChatsRecordUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McChatsRecord]:
return self.get_by_field(db, "id", id)
class CRUDMcMappingFans(CRUDBase[McMappingFans, McMappingFansCreate, McMappingFansUpdate]):
def get_by_fanid(self, db: Session, *, fanid: int) -> Optional[McMappingFans]:
return self.get_by_field(db, "fanid", fanid)
class CRUDMcMassRecord(CRUDBase[McMassRecord, McMassRecordCreate, McMassRecordUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McMassRecord]:
return self.get_by_field(db, "id", id)
class CRUDMcCreditsRecharge(CRUDBase[McCreditsRecharge, McCreditsRechargeCreate, McCreditsRechargeUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McCreditsRecharge]:
return self.get_by_field(db, "id", id)
class CRUDMcCreditsRecord(CRUDBase[McCreditsRecord, McCreditsRecordCreate, McCreditsRecordUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McCreditsRecord]:
return self.get_by_field(db, "id", id)
class CRUDMCFansGroups(CRUDBase[MCFansGroups, MCFansGroupsCreate, MCFansGroupsUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[MCFansGroups]:
return self.get_by_field(db, "id", id)
class CRUDMCFansTag(CRUDBase[MCFansTag, MCFansTagCreate, MCFansTagUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[MCFansTag]:
return self.get_by_field(db, "id", id)
class CRUDMCFansTagMapping(CRUDBase[MCFansTagMapping, MCFansTagMappingCreate, MCFansTagMappingUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[MCFansTagMapping]:
return self.get_by_field(db, "id", id)
class CRUDMCGroups(CRUDBase[MCGroups, MCGroupsCreate, MCGroupsUpdate]):
def get_by_groupid(self, db: Session, *, groupid: int) -> Optional[MCGroups]:
return self.get_by_field(db, "groupid", groupid)
class CRUDMCHandsel(CRUDBase[MCHandsel, MCHandselCreate, MCHandselUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[MCHandsel]:
return self.get_by_field(db, "id", id)
class CRUDMcMembers(CRUDBase[McMembers, McMembersCreate, McMembersUpdate]):
def get_by_uid(self, db: Session, *, uid: int) -> Optional[McMembers]:
return self.get_by_field(db, "uid", uid)
class CRUDMcMemberAddress(CRUDBase[McMemberAddress, McMemberAddressCreate, McMemberAddressUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McMemberAddress]:
return self.get_by_field(db, "id", id)
class CRUDMcMemberFields(CRUDBase[McMemberFields, McMemberFieldsCreate, McMemberFieldsUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McMemberFields]:
return self.get_by_field(db, "id", id)
class CRUDMcMemberProperty(CRUDBase[McMemberProperty, McMemberPropertyCreate, McMemberPropertyUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McMemberProperty]:
return self.get_by_field(db, "id", id)
class CRUDMcOauthFans(CRUDBase[McOauthFans, McOauthFansCreate, McOauthFansUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[McOauthFans]:
return self.get_by_field(db, "id", id)
# 创建实例
mc_cash_record = CRUDMcCashRecord(McCashRecord)
mc_chats_record = CRUDMcChatsRecord(McChatsRecord)
mc_mapping_fans = CRUDMcMappingFans(McMappingFans)
mc_mass_record = CRUDMcMassRecord(McMassRecord)
mc_credits_recharge = CRUDMcCreditsRecharge(McCreditsRecharge)
mc_credits_record = CRUDMcCreditsRecord(McCreditsRecord)
mc_fans_groups = CRUDMCFansGroups(MCFansGroups)
mc_fans_tag = CRUDMCFansTag(MCFansTag)
mc_fans_tag_mapping = CRUDMCFansTagMapping(MCFansTagMapping)
mc_groups = CRUDMCGroups(MCGroups)
mc_handsel = CRUDMCHandsel(MCHandsel)
mc_members = CRUDMcMembers(McMembers)
mc_member_address = CRUDMcMemberAddress(McMemberAddress)
mc_member_fields = CRUDMcMemberFields(McMemberFields)
mc_member_property = CRUDMcMemberProperty(McMemberProperty)
mc_oauth_fans = CRUDMcOauthFans(McOauthFans)

View File

@ -1,32 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.menu import MenuEvent
from mooc.schemas.menu import MenuEventCreate, MenuEventUpdate
def get_menu_event(db: Session, menu_event_id: int):
return db.query(models.MenuEvent).filter(models.MenuEvent.id == menu_event_id).first()
class CRUDMenuEvent(CRUDBase[MenuEvent, MenuEventCreate, MenuEventUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[MenuEvent]:
return self.get_by_field(db, "id", id)
def get_menu_events(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MenuEvent).offset(skip).limit(limit).all()
def create_menu_event(db: Session, menu_event: schemas.MenuEventCreate):
db_menu_event = models.MenuEvent(**menu_event.dict())
db.add(db_menu_event)
db.commit()
db.refresh(db_menu_event)
return db_menu_event
def update_menu_event(db: Session, menu_event_id: int, menu_event: schemas.MenuEventUpdate):
db_menu_event = db.query(models.MenuEvent).filter(models.MenuEvent.id == menu_event_id).first()
if db_menu_event:
for var, value in vars(menu_event).items():
setattr(db_menu_event, var, value) if value else None
db.add(db_menu_event)
db.commit()
db.refresh(db_menu_event)
return db_menu_event
def delete_menu_event(db: Session, menu_event_id: int):
db_menu_event = db.query(models.MenuEvent).filter(models.MenuEvent.id == menu_event_id).first()
if db_menu_event:
db.delete(db_menu_event)
db.commit()
return db_menu_event
# 创建实例
menu_event = CRUDMenuEvent(MenuEvent)

View File

@ -1,32 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.message import MessageNoticeLog
from mooc.schemas.message import MessageNoticeLogCreate, MessageNoticeLogUpdate
def get_message_notice_log(db: Session, message_notice_log_id: int):
return db.query(models.MessageNoticeLog).filter(models.MessageNoticeLog.id == message_notice_log_id).first()
class CRUDMessageNoticeLog(CRUDBase[MessageNoticeLog, MessageNoticeLogCreate, MessageNoticeLogUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[MessageNoticeLog]:
return self.get_by_field(db, "id", id)
def get_message_notice_logs(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MessageNoticeLog).offset(skip).limit(limit).all()
def create_message_notice_log(db: Session, message_notice_log: schemas.MessageNoticeLogCreate):
db_message_notice_log = models.MessageNoticeLog(**message_notice_log.dict())
db.add(db_message_notice_log)
db.commit()
db.refresh(db_message_notice_log)
return db_message_notice_log
def update_message_notice_log(db: Session, message_notice_log_id: int, message_notice_log: schemas.MessageNoticeLogUpdate):
db_message_notice_log = db.query(models.MessageNoticeLog).filter(models.MessageNoticeLog.id == message_notice_log_id).first()
if db_message_notice_log:
for var, value in vars(message_notice_log).items():
setattr(db_message_notice_log, var, value) if value else None
db.add(db_message_notice_log)
db.commit()
db.refresh(db_message_notice_log)
return db_message_notice_log
def delete_message_notice_log(db: Session, message_notice_log_id: int):
db_message_notice_log = db.query(models.MessageNoticeLog).filter(models.MessageNoticeLog.id == message_notice_log_id).first()
if db_message_notice_log:
db.delete(db_message_notice_log)
db.commit()
return db_message_notice_log
# 创建实例
message_notice_log = CRUDMessageNoticeLog(MessageNoticeLog)

View File

@ -1,32 +1,13 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.mobile import Mobilenumber
from mooc.schemas.mobile import MobilenumberCreate, MobilenumberUpdate
def get_mobilenumber(db: Session, mobilenumber_id: int):
return db.query(models.Mobilenumber).filter(models.Mobilenumber.id == mobilenumber_id).first()
class CRUDMobilenumber(CRUDBase[Mobilenumber, MobilenumberCreate, MobilenumberUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[Mobilenumber]:
return self.get_by_field(db, "id", id)
def get_mobilenumbers(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Mobilenumber).offset(skip).limit(limit).all()
# 创建实例
mobilenumber = CRUDMobilenumber(Mobilenumber)
def create_mobilenumber(db: Session, mobilenumber: schemas.MobilenumberCreate):
db_mobilenumber = models.Mobilenumber(**mobilenumber.dict())
db.add(db_mobilenumber)
db.commit()
db.refresh(db_mobilenumber)
return db_mobilenumber
def update_mobilenumber(db: Session, mobilenumber_id: int, mobilenumber: schemas.MobilenumberUpdate):
db_mobilenumber = db.query(models.Mobilenumber).filter(models.Mobilenumber.id == mobilenumber_id).first()
if db_mobilenumber:
for var, value in vars(mobilenumber).items():
setattr(db_mobilenumber, var, value) if value is not None else None
db.add(db_mobilenumber)
db.commit()
db.refresh(db_mobilenumber)
return db_mobilenumber
def delete_mobilenumber(db: Session, mobilenumber_id: int):
db_mobilenumber = db.query(models.Mobilenumber).filter(models.Mobilenumber.id == mobilenumber_id).first()
if db_mobilenumber:
db.delete(db_mobilenumber)
db.commit()
return db_mobilenumber

View File

@ -1,248 +1,61 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.module import (
IMSModulesBindings, IMSModulesCloud, IMSModulesIgnore,
IMSModulesPlugin, IMSModulesPluginRank, IMSModulesRank,
IMSModulesRecycle, Modules
)
from mooc.schemas.module import (
IMSModulesBindingsCreate, IMSModulesBindingsUpdate,
IMSModulesCloudCreate, IMSModulesCloudUpdate,
IMSModulesIgnoreCreate, IMSModulesIgnoreUpdate,
IMSModulesPluginCreate, IMSModulesPluginUpdate,
IMSModulesPluginRankCreate, IMSModulesPluginRankUpdate,
IMSModulesRankCreate, IMSModulesRankUpdate,
IMSModulesRecycleCreate, IMSModulesRecycleUpdate,
ModulesCreate, ModulesUpdate
)
def get_ims_modules_bindings(db: Session, eid: int):
return db.query(models.IMSModulesBindings).filter(models.IMSModulesBindings.eid == eid).first()
class CRUDIMSModulesBindings(CRUDBase[IMSModulesBindings, IMSModulesBindingsCreate, IMSModulesBindingsUpdate]):
def get_by_eid(self, db: Session, *, eid: int) -> Optional[IMSModulesBindings]:
return self.get_by_field(db, "eid", eid)
def get_ims_modules_bindings_all(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.IMSModulesBindings).offset(skip).limit(limit).all()
class CRUDIMSModulesCloud(CRUDBase[IMSModulesCloud, IMSModulesCloudCreate, IMSModulesCloudUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[IMSModulesCloud]:
return self.get_by_field(db, "id", id)
def create_ims_modules_bindings(db: Session, ims_modules_bindings: schemas.IMSModulesBindingsCreate):
db_ims_modules_bindings = models.IMSModulesBindings(**ims_modules_bindings.dict())
db.add(db_ims_modules_bindings)
db.commit()
db.refresh(db_ims_modules_bindings)
return db_ims_modules_bindings
class CRUDIMSModulesIgnore(CRUDBase[IMSModulesIgnore, IMSModulesIgnoreCreate, IMSModulesIgnoreUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[IMSModulesIgnore]:
return self.get_by_field(db, "id", id)
def update_ims_modules_bindings(db: Session, eid: int, ims_modules_bindings: schemas.IMSModulesBindingsCreate):
db_ims_modules_bindings = db.query(models.IMSModulesBindings).filter(models.IMSModulesBindings.eid == eid).first()
if db_ims_modules_bindings:
for var, value in vars(ims_modules_bindings).items():
setattr(db_ims_modules_bindings, var, value) if value else None
db.add(db_ims_modules_bindings)
db.commit()
db.refresh(db_ims_modules_bindings)
return db_ims_modules_bindings
class CRUDIMSModulesPlugin(CRUDBase[IMSModulesPlugin, IMSModulesPluginCreate, IMSModulesPluginUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[IMSModulesPlugin]:
return self.get_by_field(db, "id", id)
def delete_ims_modules_bindings(db: Session, eid: int):
db_ims_modules_bindings = db.query(models.IMSModulesBindings).filter(models.IMSModulesBindings.eid == eid).first()
if db_ims_modules_bindings:
db.delete(db_ims_modules_bindings)
db.commit()
return db_ims_modules_bindings
class CRUDIMSModulesPluginRank(CRUDBase[IMSModulesPluginRank, IMSModulesPluginRankCreate, IMSModulesPluginRankUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[IMSModulesPluginRank]:
return self.get_by_field(db, "id", id)
def get_ims_modules_cloud(db: Session, id: int):
return db.query(models.IMSModulesCloud).filter(models.IMSModulesCloud.id == id).first()
class CRUDIMSModulesRank(CRUDBase[IMSModulesRank, IMSModulesRankCreate, IMSModulesRankUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[IMSModulesRank]:
return self.get_by_field(db, "id", id)
def get_ims_modules_cloud_all(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.IMSModulesCloud).offset(skip).limit(limit).all()
class CRUDIMSModulesRecycle(CRUDBase[IMSModulesRecycle, IMSModulesRecycleCreate, IMSModulesRecycleUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[IMSModulesRecycle]:
return self.get_by_field(db, "id", id)
def create_ims_modules_cloud(db: Session, ims_modules_cloud: schemas.IMSModulesCloudCreate):
db_ims_modules_cloud = models.IMSModulesCloud(**ims_modules_cloud.dict())
db.add(db_ims_modules_cloud)
db.commit()
db.refresh(db_ims_modules_cloud)
return db_ims_modules_cloud
class CRUDModules(CRUDBase[Modules, ModulesCreate, ModulesUpdate]):
def get_by_mid(self, db: Session, *, mid: int) -> Optional[Modules]:
return self.get_by_field(db, "mid", mid)
def update_ims_modules_cloud(db: Session, id: int, ims_modules_cloud: schemas.IMSModulesCloudCreate):
db_ims_modules_cloud = db.query(models.IMSModulesCloud).filter(models.IMSModulesCloud.id == id).first()
if db_ims_modules_cloud:
for var, value in vars(ims_modules_cloud).items():
setattr(db_ims_modules_cloud, var, value) if value else None
db.add(db_ims_modules_cloud)
db.commit()
db.refresh(db_ims_modules_cloud)
return db_ims_modules_cloud
# 创建实例
ims_modules_bindings = CRUDIMSModulesBindings(IMSModulesBindings)
ims_modules_cloud = CRUDIMSModulesCloud(IMSModulesCloud)
ims_modules_ignore = CRUDIMSModulesIgnore(IMSModulesIgnore)
ims_modules_plugin = CRUDIMSModulesPlugin(IMSModulesPlugin)
ims_modules_plugin_rank = CRUDIMSModulesPluginRank(IMSModulesPluginRank)
ims_modules_rank = CRUDIMSModulesRank(IMSModulesRank)
ims_modules_recycle = CRUDIMSModulesRecycle(IMSModulesRecycle)
modules = CRUDModules(Modules)
def delete_ims_modules_cloud(db: Session, id: int):
db_ims_modules_cloud = db.query(models.IMSModulesCloud).filter(models.IMSModulesCloud.id == id).first()
if db_ims_modules_cloud:
db.delete(db_ims_modules_cloud)
db.commit()
return db_ims_modules_cloud
def get_ims_modules_ignore(db: Session, id: int):
return db.query(models.IMSModulesIgnore).filter(models.IMSModulesIgnore.id == id).first()
def get_ims_modules_ignore_all(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.IMSModulesIgnore).offset(skip).limit(limit).all()
def create_ims_modules_ignore(db: Session, ims_modules_ignore: schemas.IMSModulesIgnoreCreate):
db_ims_modules_ignore = models.IMSModulesIgnore(**ims_modules_ignore.dict())
db.add(db_ims_modules_ignore)
db.commit()
db.refresh(db_ims_modules_ignore)
return db_ims_modules_ignore
def update_ims_modules_ignore(db: Session, id: int, ims_modules_ignore: schemas.IMSModulesIgnoreCreate):
db_ims_modules_ignore = db.query(models.IMSModulesIgnore).filter(models.IMSModulesIgnore.id == id).first()
if db_ims_modules_ignore:
for var, value in vars(ims_modules_ignore).items():
setattr(db_ims_modules_ignore, var, value) if value is not None else None
db.add(db_ims_modules_ignore)
db.commit()
db.refresh(db_ims_modules_ignore)
return db_ims_modules_ignore
def delete_ims_modules_ignore(db: Session, id: int):
db_ims_modules_ignore = db.query(models.IMSModulesIgnore).filter(models.IMSModulesIgnore.id == id).first()
if db_ims_modules_ignore:
db.delete(db_ims_modules_ignore)
db.commit()
return db_ims_modules_ignore
def get_ims_modules_plugin(db: Session, id: int):
return db.query(models.IMSModulesPlugin).filter(models.IMSModulesPlugin.id == id).first()
def get_ims_modules_plugin_all(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.IMSModulesPlugin).offset(skip).limit(limit).all()
def create_ims_modules_plugin(db: Session, ims_modules_plugin: schemas.IMSModulesPluginCreate):
db_ims_modules_plugin = models.IMSModulesPlugin(**ims_modules_plugin.dict())
db.add(db_ims_modules_plugin)
db.commit()
db.refresh(db_ims_modules_plugin)
return db_ims_modules_plugin
def update_ims_modules_plugin(db: Session, id: int, ims_modules_plugin: schemas.IMSModulesPluginCreate):
db_ims_modules_plugin = db.query(models.IMSModulesPlugin).filter(models.IMSModulesPlugin.id == id).first()
if db_ims_modules_plugin:
for var, value in vars(ims_modules_plugin).items():
setattr(db_ims_modules_plugin, var, value) if value is not None else None
db.add(db_ims_modules_plugin)
db.commit()
db.refresh(db_ims_modules_plugin)
return db_ims_modules_plugin
def delete_ims_modules_plugin(db: Session, id: int):
db_ims_modules_plugin = db.query(models.IMSModulesPlugin).filter(models.IMSModulesPlugin.id == id).first()
if db_ims_modules_plugin:
db.delete(db_ims_modules_plugin)
db.commit()
return db_ims_modules_plugin
def get_ims_modules_plugin_rank(db: Session, id: int):
return db.query(models.IMSModulesPluginRank).filter(models.IMSModulesPluginRank.id == id).first()
def get_ims_modules_plugin_rank_all(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.IMSModulesPluginRank).offset(skip).limit(limit).all()
def create_ims_modules_plugin_rank(db: Session, ims_modules_plugin_rank: schemas.IMSModulesPluginRankCreate):
db_ims_modules_plugin_rank = models.IMSModulesPluginRank(**ims_modules_plugin_rank.dict())
db.add(db_ims_modules_plugin_rank)
db.commit()
db.refresh(db_ims_modules_plugin_rank)
return db_ims_modules_plugin_rank
def update_ims_modules_plugin_rank(db: Session, id: int, ims_modules_plugin_rank: schemas.IMSModulesPluginRankCreate):
db_ims_modules_plugin_rank = db.query(models.IMSModulesPluginRank).filter(models.IMSModulesPluginRank.id == id).first()
if db_ims_modules_plugin_rank:
for var, value in vars(ims_modules_plugin_rank).items():
setattr(db_ims_modules_plugin_rank, var, value) if value is not None else None
db.add(db_ims_modules_plugin_rank)
db.commit()
db.refresh(db_ims_modules_plugin_rank)
return db_ims_modules_plugin_rank
def delete_ims_modules_plugin_rank(db: Session, id: int):
db_ims_modules_plugin_rank = db.query(models.IMSModulesPluginRank).filter(models.IMSModulesPluginRank.id == id).first()
if db_ims_modules_plugin_rank:
db.delete(db_ims_modules_plugin_rank)
db.commit()
return db_ims_modules_plugin_rank
def get_ims_modules_rank(db: Session, id: int):
return db.query(models.IMSModulesRank).filter(models.IMSModulesRank.id == id).first()
def get_ims_modules_rank_all(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.IMSModulesRank).offset(skip).limit(limit).all()
def create_ims_modules_rank(db: Session, ims_modules_rank: schemas.IMSModulesRankCreate):
db_ims_modules_rank = models.IMSModulesRank(**ims_modules_rank.dict())
db.add(db_ims_modules_rank)
db.commit()
db.refresh(db_ims_modules_rank)
return db_ims_modules_rank
def update_ims_modules_rank(db: Session, id: int, ims_modules_rank: schemas.IMSModulesRankCreate):
db_ims_modules_rank = db.query(models.IMSModulesRank).filter(models.IMSModulesRank.id == id).first()
if db_ims_modules_rank:
for var, value in vars(ims_modules_rank).items():
setattr(db_ims_modules_rank, var, value) if value is not None else None
db.add(db_ims_modules_rank)
db.commit()
db.refresh(db_ims_modules_rank)
return db_ims_modules_rank
def delete_ims_modules_rank(db: Session, id: int):
db_ims_modules_rank = db.query(models.IMSModulesRank).filter(models.IMSModulesRank.id == id).first()
if db_ims_modules_rank:
db.delete(db_ims_modules_rank)
db.commit()
return db_ims_modules_rank
def get_ims_modules_recycle(db: Session, id: int):
return db.query(models.IMSModulesRecycle).filter(models.IMSModulesRecycle.id == id).first()
def get_ims_modules_recycle_all(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.IMSModulesRecycle).offset(skip).limit(limit).all()
def create_ims_modules_recycle(db: Session, ims_modules_recycle: schemas.IMSModulesRecycleCreate):
db_ims_modules_recycle = models.IMSModulesRecycle(**ims_modules_recycle.dict())
db.add(db_ims_modules_recycle)
db.commit()
db.refresh(db_ims_modules_recycle)
return db_ims_modules_recycle
def update_ims_modules_recycle(db: Session, id: int, ims_modules_recycle: schemas.IMSModulesRecycleCreate):
db_ims_modules_recycle = db.query(models.IMSModulesRecycle).filter(models.IMSModulesRecycle.id == id).first()
if db_ims_modules_recycle:
for var, value in vars(ims_modules_recycle).items():
setattr(db_ims_modules_recycle, var, value) if value is not None else None
db.add(db_ims_modules_recycle)
db.commit()
db.refresh(db_ims_modules_recycle)
return db_ims_modules_recycle
def delete_ims_modules_recycle(db: Session, id: int):
db_ims_modules_recycle = db.query(models.IMSModulesRecycle).filter(models.IMSModulesRecycle.id == id).first()
if db_ims_modules_recycle:
db.delete(db_ims_modules_recycle)
db.commit()
return db_ims_modules_recycle
# Modules CRUD
def get_module(db: Session, mid: int):
return db.query(models.Modules).filter(models.Modules.mid == mid).first()
def get_modules(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Modules).offset(skip).limit(limit).all()
def create_module(db: Session, module: schemas.ModulesCreate):
db_module = models.Modules(**module.dict())
db.add(db_module)
db.commit()
db.refresh(db_module)
return db_module
def update_module(db: Session, mid: int, module: schemas.ModulesUpdate):
db_module = db.query(models.Modules).filter(models.Modules.mid == mid).first()
if db_module:
for var, value in vars(module).items():
setattr(db_module, var, value) if value is not None else None
db.add(db_module)
db.commit()
db.refresh(db_module)
return db_module
def delete_module(db: Session, mid: int):
db_module = db.query(models.Modules).filter(models.Modules.mid == mid).first()
if db_module:
db.delete(db_module)
db.commit()
return db_module

View File

@ -1,31 +1,13 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.music import MusicReply
from mooc.schemas.music import MusicReplyCreate, MusicReplyUpdate
def get_music_reply(db: Session, music_reply_id: int):
return db.query(models.MusicReply).filter(models.MusicReply.Id == music_reply_id).first()
class CRUDMusicReply(CRUDBase[MusicReply, MusicReplyCreate, MusicReplyUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[MusicReply]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
def get_music_replies(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.MusicReply).offset(skip).limit(limit).all()
# 创建实例
music_reply = CRUDMusicReply(MusicReply)
def create_music_reply(db: Session, music_reply: schemas.MusicReplyCreate):
db_music_reply = models.MusicReply(**music_reply.dict())
db.add(db_music_reply)
db.commit()
db.refresh(db_music_reply)
return db_music_reply
def update_music_reply(db: Session, music_reply_id: int, music_reply: schemas.MusicReplyCreate):
db_music_reply = db.query(models.MusicReply).filter(models.MusicReply.Id == music_reply_id).first()
if db_music_reply:
for key, value in music_reply.dict().items():
setattr(db_music_reply, key, value)
db.commit()
db.refresh(db_music_reply)
return db_music_reply
def delete_music_reply(db: Session, music_reply_id: int):
db_music_reply = db.query(models.MusicReply).filter(models.MusicReply.Id == music_reply_id).first()
if db_music_reply:
db.delete(db_music_reply)
db.commit()
return db_music_reply

View File

@ -1,5 +1,15 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.news import NewsReply
from mooc.schemas.news import NewsReplyCreate, NewsReplyUpdate
class CRUDNewsReply(CRUDBase[NewsReply, NewsReplyCreate, NewsReplyUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[NewsReply]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
# 创建实例
news_reply = CRUDNewsReply(NewsReply)
def get_news_reply(db: Session, news_reply_id: int):
return db.query(models.NewsReply).filter(models.NewsReply.Id == news_reply_id).first()

View File

@ -1,31 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.phoneapp import PhoneappVersions
from mooc.schemas.phoneapp import PhoneappVersionsCreate, PhoneappVersionsUpdate
def get_phoneapp_version(db: Session, phoneapp_version_id: int):
return db.query(models.PhoneappVersions).filter(models.PhoneappVersions.Id == phoneapp_version_id).first()
class CRUDPhoneappVersions(CRUDBase[PhoneappVersions, PhoneappVersionsCreate, PhoneappVersionsUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[PhoneappVersions]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
def get_phoneapp_versions(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.PhoneappVersions).offset(skip).limit(limit).all()
def create_phoneapp_version(db: Session, phoneapp_version: schemas.PhoneappVersionsCreate):
db_phoneapp_version = models.PhoneappVersions(**phoneapp_version.dict())
db.add(db_phoneapp_version)
db.commit()
db.refresh(db_phoneapp_version)
return db_phoneapp_version
def update_phoneapp_version(db: Session, phoneapp_version_id: int, phoneapp_version: schemas.PhoneappVersionsCreate):
db_phoneapp_version = db.query(models.PhoneappVersions).filter(models.PhoneappVersions.Id == phoneapp_version_id).first()
if db_phoneapp_version:
for key, value in phoneapp_version.dict().items():
setattr(db_phoneapp_version, key, value)
db.commit()
db.refresh(db_phoneapp_version)
return db_phoneapp_version
def delete_phoneapp_version(db: Session, phoneapp_version_id: int):
db_phoneapp_version = db.query(models.PhoneappVersions).filter(models.PhoneappVersions.Id == phoneapp_version_id).first()
if db_phoneapp_version:
db.delete(db_phoneapp_version)
db.commit()
return db_phoneapp_version
# 创建实例
phoneapp_versions = CRUDPhoneappVersions(PhoneappVersions)

View File

@ -1,31 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.profile import ProfileFields
from mooc.schemas.profile import ProfileFieldsCreate, ProfileFieldsUpdate
def get_profile_field(db: Session, profile_field_id: int):
return db.query(models.ProfileFields).filter(models.ProfileFields.Id == profile_field_id).first()
class CRUDProfileFields(CRUDBase[ProfileFields, ProfileFieldsCreate, ProfileFieldsUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[ProfileFields]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
def get_profile_fields(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.ProfileFields).offset(skip).limit(limit).all()
def create_profile_field(db: Session, profile_field: schemas.ProfileFieldsCreate):
db_profile_field = models.ProfileFields(**profile_field.dict())
db.add(db_profile_field)
db.commit()
db.refresh(db_profile_field)
return db_profile_field
def update_profile_field(db: Session, profile_field_id: int, profile_field: schemas.ProfileFieldsCreate):
db_profile_field = db.query(models.ProfileFields).filter(models.ProfileFields.Id == profile_field_id).first()
if db_profile_field:
for key, value in profile_field.dict().items():
setattr(db_profile_field, key, value)
db.commit()
db.refresh(db_profile_field)
return db_profile_field
def delete_profile_field(db: Session, profile_field_id: int):
db_profile_field = db.query(models.ProfileFields).filter(models.ProfileFields.Id == profile_field_id).first()
if db_profile_field:
db.delete(db_profile_field)
db.commit()
return db_profile_field
# 创建实例
profile_fields = CRUDProfileFields(ProfileFields)

View File

@ -1,31 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.qrcode import Qrcode
from mooc.schemas.qrcode import QrcodeCreate, QrcodeUpdate
def get_qrcode(db: Session, qrcode_id: int):
return db.query(models.Qrcode).filter(models.Qrcode.Id == qrcode_id).first()
class CRUDQrcode(CRUDBase[Qrcode, QrcodeCreate, QrcodeUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[Qrcode]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
def get_qrcodes(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Qrcode).offset(skip).limit(limit).all()
def create_qrcode(db: Session, qrcode: schemas.QrcodeCreate):
db_qrcode = models.Qrcode(**qrcode.dict())
db.add(db_qrcode)
db.commit()
db.refresh(db_qrcode)
return db_qrcode
def update_qrcode(db: Session, qrcode_id: int, qrcode: schemas.QrcodeCreate):
db_qrcode = db.query(models.Qrcode).filter(models.Qrcode.Id == qrcode_id).first()
if db_qrcode:
for key, value in qrcode.dict().items():
setattr(db_qrcode, key, value)
db.commit()
db.refresh(db_qrcode)
return db_qrcode
def delete_qrcode(db: Session, qrcode_id: int):
db_qrcode = db.query(models.Qrcode).filter(models.Qrcode.Id == qrcode_id).first()
if db_qrcode:
db.delete(db_qrcode)
db.commit()
return db_qrcode
# 创建实例
qrcode = CRUDQrcode(Qrcode)

View File

@ -1,31 +1,13 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.qrcode import QrcodeStat
from mooc.schemas.qrcode import QrcodeStatCreate, QrcodeStatUpdate
def get_qrcode_stat(db: Session, qrcode_stat_id: int):
return db.query(models.QrcodeStat).filter(models.QrcodeStat.Id == qrcode_stat_id).first()
class CRUDQrcodeStat(CRUDBase[QrcodeStat, QrcodeStatCreate, QrcodeStatUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[QrcodeStat]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
def get_qrcode_stats(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.QrcodeStat).offset(skip).limit(limit).all()
# 创建实例
qrcode_stat = CRUDQrcodeStat(QrcodeStat)
def create_qrcode_stat(db: Session, qrcode_stat: schemas.QrcodeStatCreate):
db_qrcode_stat = models.QrcodeStat(**qrcode_stat.dict())
db.add(db_qrcode_stat)
db.commit()
db.refresh(db_qrcode_stat)
return db_qrcode_stat
def update_qrcode_stat(db: Session, qrcode_stat_id: int, qrcode_stat: schemas.QrcodeStatCreate):
db_qrcode_stat = db.query(models.QrcodeStat).filter(models.QrcodeStat.Id == qrcode_stat_id).first()
if db_qrcode_stat:
for key, value in qrcode_stat.dict().items():
setattr(db_qrcode_stat, key, value)
db.commit()
db.refresh(db_qrcode_stat)
return db_qrcode_stat
def delete_qrcode_stat(db: Session, qrcode_stat_id: int):
db_qrcode_stat = db.query(models.QrcodeStat).filter(models.QrcodeStat.Id == qrcode_stat_id).first()
if db_qrcode_stat:
db.delete(db_qrcode_stat)
db.commit()
return db_qrcode_stat

View File

@ -1,31 +1,13 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.rule import Rule
from mooc.schemas.rule import RuleCreate, RuleUpdate
def get_rule(db: Session, rule_id: int):
return db.query(models.Rule).filter(models.Rule.Id == rule_id).first()
class CRUDRule(CRUDBase[Rule, RuleCreate, RuleUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[Rule]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
def get_rules(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Rule).offset(skip).limit(limit).all()
# 创建实例
rule = CRUDRule(Rule)
def create_rule(db: Session, rule: schemas.RuleCreate):
db_rule = models.Rule(**rule.dict())
db.add(db_rule)
db.commit()
db.refresh(db_rule)
return db_rule
def update_rule(db: Session, rule_id: int, rule: schemas.RuleCreate):
db_rule = db.query(models.Rule).filter(models.Rule.Id == rule_id).first()
if db_rule:
for key, value in rule.dict().items():
setattr(db_rule, key, value)
db.commit()
db.refresh(db_rule)
return db_rule
def delete_rule(db: Session, rule_id: int):
db_rule = db.query(models.Rule).filter(models.Rule.Id == rule_id).first()
if db_rule:
db.delete(db_rule)
db.commit()
return db_rule

View File

@ -1,31 +1,12 @@
from typing import Optional
from sqlalchemy.orm import Session
from . import models, schemas
from mooc.crud.crud_base import CRUDBase
from mooc.models.rule import RuleKeyword
from mooc.schemas.rule import RuleKeywordCreate, RuleKeywordUpdate
def get_rule_keyword(db: Session, rule_keyword_id: int):
return db.query(models.RuleKeyword).filter(models.RuleKeyword.Id == rule_keyword_id).first()
class CRUDRuleKeyword(CRUDBase[RuleKeyword, RuleKeywordCreate, RuleKeywordUpdate]):
def get_by_id(self, db: Session, *, id: int) -> Optional[RuleKeyword]:
return self.get_by_field(db, "Id", id) # 注意这里使用 "Id" 而不是 "id"
def get_rule_keywords(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.RuleKeyword).offset(skip).limit(limit).all()
def create_rule_keyword(db: Session, rule_keyword: schemas.RuleKeywordCreate):
db_rule_keyword = models.RuleKeyword(**rule_keyword.dict())
db.add(db_rule_keyword)
db.commit()
db.refresh(db_rule_keyword)
return db_rule_keyword
def update_rule_keyword(db: Session, rule_keyword_id: int, rule_keyword: schemas.RuleKeywordCreate):
db_rule_keyword = db.query(models.RuleKeyword).filter(models.RuleKeyword.Id == rule_keyword_id).first()
if db_rule_keyword:
for key, value in rule_keyword.dict().items():
setattr(db_rule_keyword, key, value)
db.commit()
db.refresh(db_rule_keyword)
return db_rule_keyword
def delete_rule_keyword(db: Session, rule_keyword_id: int):
db_rule_keyword = db.query(models.RuleKeyword).filter(models.RuleKeyword.Id == rule_keyword_id).first()
if db_rule_keyword:
db.delete(db_rule_keyword)
db.commit()
return db_rule_keyword
# 创建实例
rule_keyword = CRUDRuleKeyword(RuleKeyword)

27
mooc/main.py Normal file
View File

@ -0,0 +1,27 @@
from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException
from mooc.core.exceptions import (
validation_exception_handler,
http_exception_handler,
generic_exception_handler
)
# 应用的其他代码...
def create_app() -> FastAPI:
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
# 其他设置...
)
# 注册异常处理器
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(HTTPException, http_exception_handler)
app.add_exception_handler(Exception, generic_exception_handler)
# 应用的其他设置...
return app

View File

@ -17,7 +17,7 @@ class Advert(Base):
createtime = Column(Integer, nullable=True, comment='创建时间')
class Config:
orm_mode = True
from_attributes = True
class Banji(Base):
@ -49,7 +49,7 @@ class Banji(Base):
status = Column(Integer, nullable=False, server_default=text("'0'"))
class Config:
orm_mode = True
from_attributes = True
class Banner(Base):
@ -70,7 +70,7 @@ class Banner(Base):
istatus = Column(Integer, nullable=False, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class Category(Base):
@ -91,7 +91,7 @@ class Category(Base):
istatus = Column(Integer, nullable=False, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class Cdkey(Base):
@ -116,7 +116,7 @@ class Cdkey(Base):
endtime = Column(Integer, nullable=True)
class Config:
orm_mode = True
from_attributes = True
class CdkeyCate(Base):
@ -138,7 +138,7 @@ class CdkeyCate(Base):
is_delete = Column(Integer, nullable=False, server_default=text("'1'"), comment='状态 1正常 2删除')
class Config:
orm_mode = True
from_attributes = True
class Cdkeys(Base):
@ -156,7 +156,7 @@ class Cdkeys(Base):
type = Column(Integer, nullable=False, server_default=text("'0'"), comment='绑定类型 1 试卷 2题库')
class Config:
orm_mode = True
from_attributes = True
class Exercise(Base):
@ -177,7 +177,7 @@ class Exercise(Base):
createtime = Column(Integer, nullable=True, comment='创建时间')
class Config:
orm_mode = True
from_attributes = True
class Feedback(Base):
@ -197,7 +197,7 @@ class Feedback(Base):
createtime = Column(Integer, nullable=True, comment='时间')
class Config:
orm_mode = True
from_attributes = True
class Gift(Base):
@ -215,7 +215,7 @@ class Gift(Base):
about = Column(Text, nullable=False)
class Config:
orm_mode = True
from_attributes = True
class IndexBtn(Base):
@ -236,7 +236,7 @@ class IndexBtn(Base):
sort = Column(Integer, nullable=False, server_default=text("'0'"), comment='排序')
class Config:
orm_mode = True
from_attributes = True
class Knowledge(Base):
@ -258,7 +258,7 @@ class Knowledge(Base):
istatus = Column(Integer, nullable=False, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class KnowledgeCate(Base):
@ -279,7 +279,7 @@ class KnowledgeCate(Base):
price = Column(DECIMAL(10, 2), nullable=True, server_default=text("'0.00'"))
class Config:
orm_mode = True
from_attributes = True
class Notice(Base):
@ -306,7 +306,7 @@ class Notice(Base):
article_type = Column(Integer, nullable=True, comment='文章的类型1文章2视频')
class Config:
orm_mode = True
from_attributes = True
class Order(Base):
@ -335,7 +335,7 @@ class Order(Base):
istatus = Column(Integer, nullable=False, server_default=text("'1'"), comment='订单删除标识')
class Config:
orm_mode = True
from_attributes = True
class Paper(Base):
@ -364,7 +364,7 @@ class Paper(Base):
displayorder = Column(Integer, nullable=True, server_default=text("'0'"), comment='显示顺序')
class Config:
orm_mode = True
from_attributes = True
class PaperTest(Base):
@ -384,7 +384,7 @@ class PaperTest(Base):
istatus = Column(Integer, nullable=False, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class Phonecode(Base):
@ -401,7 +401,7 @@ class Phonecode(Base):
createtime = Column(Integer, nullable=False)
class Config:
orm_mode = True
from_attributes = True
class QYear(Base):
@ -420,7 +420,7 @@ class QYear(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class School(Base):
@ -451,7 +451,7 @@ class School(Base):
add_time = Column(Integer, nullable=True, server_default=text("'0'"))
class Config:
orm_mode = True
from_attributes = True
class Setting(Base):
@ -521,7 +521,7 @@ class Setting(Base):
freeknowledgenum = Column(Integer, nullable=True, server_default=text("'0'"), comment='知识点体验章数')
class Config:
orm_mode = True
from_attributes = True
class ShareRecord(Base):
@ -539,7 +539,7 @@ class ShareRecord(Base):
createtime = Column(Integer, nullable=True, comment='创建时间')
class Config:
orm_mode = True
from_attributes = True
class SonSimple(Base):
@ -550,7 +550,7 @@ class SonSimple(Base):
son_title = Column(String(255), nullable=False)
class Config:
orm_mode = True
from_attributes = True
class Test(Base):
@ -590,7 +590,7 @@ class Test(Base):
display = Column(Integer, nullable=True, server_default=text("'1'"), comment='1-显示2-不显示')
class Config:
orm_mode = True
from_attributes = True
class TestType(Base):
@ -613,7 +613,7 @@ class TestType(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class TypeCate(Base):
@ -632,7 +632,7 @@ class TypeCate(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class Watermark(Base):
__tablename__ = 'ims_goouc_fullexam_watermark'
@ -652,7 +652,7 @@ class Watermark(Base):
vertical = Column(Integer, nullable=False, comment='垂直距离')
class Config:
orm_mode = True
from_attributes = True
class Wxtpl(Base):
@ -673,7 +673,7 @@ class Wxtpl(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class Xuesheng(Base):
@ -750,4 +750,4 @@ class Xuesheng(Base):
uid8 = Column(Integer, nullable=True, server_default=text("'0'"))
class Config:
orm_mode = True
from_attributes = True

View File

@ -4,7 +4,7 @@ from sqlalchemy.dialects.mysql import DECIMAL
from mooc.db.database import Base
class User(Base):
class FullExamUser(Base):
__tablename__ = 'ims_goouc_fullexam_user'
__table_args__ = (
Index('idx_openid', 'openid'),
@ -50,7 +50,7 @@ class User(Base):
count_day = Column(Integer, nullable=True, server_default=text("'0'"), comment='累计天数')
class Config:
orm_mode = True
from_attributes = True
class UserCollectionPraction(Base):
@ -73,7 +73,7 @@ class UserCollectionPraction(Base):
iscollect = Column(Integer, nullable=True, server_default=text("'2'"))
class Config:
orm_mode = True
from_attributes = True
class UserDoexam(Base):
@ -98,7 +98,7 @@ class UserDoexam(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class UserDootherExam(Base):
@ -121,7 +121,7 @@ class UserDootherExam(Base):
type = Column(Integer, nullable=True, comment='类型2-优先未做3-智能考试')
class Config:
orm_mode = True
from_attributes = True
class UserDoOtherExamAnswer(Base):
@ -146,7 +146,7 @@ class UserDoOtherExamAnswer(Base):
type = Column(Integer, nullable=True, comment='类型2-优先未做3-智能考试')
class Config:
orm_mode = True
from_attributes = True
class UserExamAnswer(Base):
@ -175,7 +175,7 @@ class UserExamAnswer(Base):
simple_score = Column(Integer, nullable=True, comment='简答题评分')
class Config:
orm_mode = True
from_attributes = True
class UserFormid(Base):
@ -194,7 +194,7 @@ class UserFormid(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class UserGift(Base):
@ -216,7 +216,7 @@ class UserGift(Base):
consignee_address = Column(String(255), nullable=True, comment='收货人地址')
class Config:
orm_mode = True
from_attributes = True
class UserKnowledgeCate(Base):
@ -234,7 +234,7 @@ class UserKnowledgeCate(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class UserMember(Base):
@ -255,7 +255,7 @@ class UserMember(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class UserPool(Base):
@ -274,7 +274,7 @@ class UserPool(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='是否删除的标识')
class Config:
orm_mode = True
from_attributes = True
class UserQhigh(Base):
@ -292,7 +292,7 @@ class UserQhigh(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class UserQintensive(Base):
@ -310,7 +310,7 @@ class UserQintensive(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class UserQtype(Base):
@ -329,7 +329,7 @@ class UserQtype(Base):
createtime = Column(Integer, nullable=True, comment='时间')
class Config:
orm_mode = True
from_attributes = True
class UserRead(Base):
@ -347,7 +347,7 @@ class UserRead(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class UserSpecial(Base):
@ -366,7 +366,7 @@ class UserSpecial(Base):
createtime = Column(Integer, nullable=True, comment='时间')
class Config:
orm_mode = True
from_attributes = True
class UserSpequence(Base):
@ -385,7 +385,7 @@ class UserSpequence(Base):
istatus = Column(Integer, nullable=True, server_default=text("'1'"), comment='删除状态')
class Config:
orm_mode = True
from_attributes = True
class UserWrongPraction(Base):
@ -409,6 +409,6 @@ class UserWrongPraction(Base):
iscollect = Column(Integer, nullable=True, server_default=text("'2'"))
class Config:
orm_mode = True
from_attributes = True

View File

@ -35,4 +35,4 @@ class CouponLocation(CouponLocationBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -21,4 +21,4 @@ class CoverReply(CoverReplyBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -17,4 +17,4 @@ class CustomReply(CustomReplyBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -13,7 +13,7 @@ class AdvertBase(BaseModel):
createtime: Optional[int] = Field(None, description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class AdvertCreate(AdvertBase):
@ -73,7 +73,7 @@ class BanjiBase(BaseModel):
css: str
class Config:
orm_mode = True
from_attributes = True
class BanjiCreate(BanjiBase):
@ -111,7 +111,7 @@ class BanjiInDB(BanjiBase):
banji_id: int
class Config:
orm_mode = True
from_attributes = True
# 轮播图
@ -126,7 +126,7 @@ class BannerBase(BaseModel):
istatus: Optional[int] = Field(1, ge=1, le=2, description="是否删除的标识")
class Config:
orm_mode = True
from_attributes = True
class BannerCreate(BannerBase):
@ -148,7 +148,7 @@ class BannerInDB(BannerBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class BannerResponse(BannerInDB):
@ -169,7 +169,7 @@ class CategoryBase(BaseModel):
istatus: Optional[int] = Field(1, ge=1, le=2, description="是否删除的标识")
class Config:
orm_mode = True
from_attributes = True
class CategoryCreate(CategoryBase):
@ -186,7 +186,7 @@ class CategoryUpdate(CategoryBase):
createtime: Optional[int] = None
class Config:
orm_mode = True
from_attributes = True
class CategoryInDB(CategoryBase):
@ -195,7 +195,7 @@ class CategoryInDB(CategoryBase):
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class CategoryResponse(CategoryInDB):
@ -222,7 +222,7 @@ class CdkeyBase(BaseModel):
endtime: Optional[int] = Field(None, description="结束时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class CdkeyCreate(CdkeyBase):
@ -247,7 +247,7 @@ class CdkeyInDB(CdkeyBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class CdkeyResponse(CdkeyInDB):
@ -271,7 +271,7 @@ class CdkeyCateBase(BaseModel):
papers: str = Field(..., max_length=2000, description="试卷 IDs")
class Config:
orm_mode = True
from_attributes = True
class CdkeyCateCreate(CdkeyCateBase):
@ -294,7 +294,7 @@ class CdkeyCateInDB(CdkeyCateBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class CdkeyCateResponse(CdkeyCateInDB):
@ -313,7 +313,7 @@ class CdkeysBase(BaseModel):
type: int = Field(0, ge=0, le=2, description="绑定类型 1 试卷 2 题库")
class Config:
orm_mode = True
from_attributes = True
class CdkeysCreate(CdkeysBase):
@ -331,7 +331,7 @@ class CdkeysInDB(CdkeysBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class CdkeysResponse(CdkeysInDB):
@ -355,7 +355,7 @@ class ExerciseBase(BaseModel):
isright: int = Field(..., ge=0, le=1, description="是否正确 1正确 0错误")
class Config:
orm_mode = True
from_attributes = True
class ExerciseCreate(ExerciseBase):
@ -377,7 +377,7 @@ class ExerciseInDB(ExerciseBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class ExerciseResponse(ExerciseInDB):
@ -399,7 +399,7 @@ class FeedbackBase(BaseModel):
createtime: Optional[int] = Field(None, description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class FeedbackCreate(FeedbackBase):
@ -420,7 +420,7 @@ class FeedbackInDB(FeedbackBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class FeedbackResponse(FeedbackInDB):
@ -441,7 +441,7 @@ class GiftBase(BaseModel):
about: str = Field(..., description="礼品描述")
class Config:
orm_mode = True
from_attributes = True
class GiftCreate(GiftBase):
@ -461,7 +461,7 @@ class GiftInDB(GiftBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class GiftResponse(GiftInDB):
@ -484,7 +484,7 @@ class IndexBtnBase(BaseModel):
library_id: int = Field(..., ge=0, description="关联的题库或模块 ID")
class Config:
orm_mode = True
from_attributes = True
class IndexBtnCreate(IndexBtnBase):
@ -506,7 +506,7 @@ class IndexBtnInDB(IndexBtnBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class IndexBtnResponse(IndexBtnInDB):
@ -530,7 +530,7 @@ class KnowledgeBase(BaseModel):
istatus: Optional[int] = Field(1, ge=1, le=2, description="删除标识 1正常 2删除默认为1")
class Config:
orm_mode = True
from_attributes = True
class KnowledgeCreate(KnowledgeBase):
@ -553,7 +553,7 @@ class KnowledgeInDB(KnowledgeBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class KnowledgeResponse(KnowledgeInDB):
@ -576,7 +576,7 @@ class KnowledgeCateBase(BaseModel):
create_time: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class KnowledgeCateCreate(KnowledgeCateBase):
@ -598,7 +598,7 @@ class KnowledgeCateInDB(KnowledgeCateBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class KnowledgeCateResponse(KnowledgeCateInDB):
@ -626,7 +626,7 @@ class NoticeBase(BaseModel):
article_type: Optional[int] = Field(None, ge=1, le=2, description="文章类型 1文章 2视频")
class Config:
orm_mode = True
from_attributes = True
class NoticeCreate(NoticeBase):
@ -653,7 +653,7 @@ class NoticeInDB(NoticeBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class NoticeResponse(NoticeInDB):
@ -682,7 +682,7 @@ class OrderBase(BaseModel):
istatus: Optional[int] = Field(1, ge=1, le=2, description="订单删除标识 1正常 2删除默认为1")
class Config:
orm_mode = True
from_attributes = True
class OrderCreate(OrderBase):
@ -710,7 +710,7 @@ class OrderInDB(OrderBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class OrderResponse(OrderInDB):
@ -738,7 +738,7 @@ class PaperBase(BaseModel):
is_repeat: int = Field(2, ge=1, le=2, description="重复答题 1可以 2否默认为2")
class Config:
orm_mode = True
from_attributes = True
class PaperCreate(PaperBase):
@ -765,7 +765,7 @@ class PaperInDB(PaperBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class PaperResponse(PaperInDB):
@ -787,7 +787,7 @@ class PaperTestBase(BaseModel):
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class PaperTestCreate(PaperTestBase):
@ -807,7 +807,7 @@ class PaperTestInDB(PaperTestBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class PaperTestResponse(PaperTestInDB):
@ -821,12 +821,12 @@ class PaperTestListResponse(BaseModel):
class PhoneCodeBase(BaseModel):
weid: int = Field(..., ge=0, description="站点 ID")
phone: str = Field(..., min_length=11, max_length=11, regex=r"^\d{11}$", description="手机号码")
phone: str = Field(..., min_length=11, max_length=11, pattern=r"^\d{11}$", description="手机号码")
code: int = Field(..., ge=100000, le=999999, description="手机验证码通常为6位数字")
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class PhoneCodeCreate(PhoneCodeBase):
@ -844,7 +844,7 @@ class PhoneCodeInDB(PhoneCodeBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class PhoneCodeResponse(PhoneCodeInDB):
@ -864,7 +864,7 @@ class QYearBase(BaseModel):
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class QYearCreate(QYearBase):
@ -883,7 +883,7 @@ class QYearInDB(QYearBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class QYearResponse(QYearInDB):
@ -919,7 +919,7 @@ class SchoolBase(BaseModel):
add_time: Optional[int] = Field(0, ge=0, description="添加时间默认为0")
class Config:
orm_mode = True
from_attributes = True
class SchoolCreate(SchoolBase):
@ -934,7 +934,7 @@ class SchoolInDB(SchoolBase):
school_id: int
class Config:
orm_mode = True
from_attributes = True
class SchoolResponse(SchoolInDB):
@ -1009,7 +1009,7 @@ class SettingBase(BaseModel):
customer_service: str = Field(..., max_length=255, description="客服二维码")
class Config:
orm_mode = True
from_attributes = True
class SettingCreate(SettingBase):
@ -1083,7 +1083,7 @@ class SettingInDB(SettingBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class SettingResponse(SettingInDB):
@ -1104,7 +1104,7 @@ class ShareRecordBase(BaseModel):
createtime: Optional[int] = Field(None, description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class ShareRecordCreate(ShareRecordBase):
@ -1124,7 +1124,7 @@ class ShareRecordInDB(ShareRecordBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class ShareRecordResponse(ShareRecordInDB):
@ -1141,7 +1141,7 @@ class SonSimpleBase(BaseModel):
son_title: str = Field(..., max_length=255, description="子标题")
class Config:
orm_mode = True
from_attributes = True
class SonSimpleCreate(SonSimpleBase):
@ -1157,7 +1157,7 @@ class SonSimpleInDB(SonSimpleBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class SonSimpleResponse(SonSimpleInDB):
@ -1198,7 +1198,7 @@ class TestBase(BaseModel):
son_simple: int = Field(0, ge=0, description="子题简化")
class Config:
orm_mode = True
from_attributes = True
class TestCreate(TestBase):
@ -1245,7 +1245,7 @@ class TestInDB(TestBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class TestResponse(TestInDB):
@ -1270,7 +1270,7 @@ class TestTypeBase(BaseModel):
display_order: int = Field(0, description="显示顺序默认为0")
class Config:
orm_mode = True
from_attributes = True
class TestTypeCreate(TestTypeBase):
@ -1301,7 +1301,7 @@ class TestTypeInDB(TestTypeBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class TestTypeResponse(TestTypeInDB):
@ -1321,7 +1321,7 @@ class TypeCateBase(BaseModel):
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class TypeCateCreate(TypeCateBase):
@ -1347,7 +1347,7 @@ class TypeCateInDB(TypeCateBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class TypeCateResponse(TypeCateInDB):
@ -1377,7 +1377,7 @@ class WatermarkBase(BaseModel):
vertical: int
class Config:
orm_mode = True
from_attributes = True
class WatermarkCreate(WatermarkBase):
@ -1402,7 +1402,7 @@ class WatermarkInDB(WatermarkBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class WatermarkResponse(WatermarkInDB):
@ -1424,7 +1424,7 @@ class WxTplBase(BaseModel):
createtime: int
class Config:
orm_mode = True
from_attributes = True
class WxTplCreate(WxTplBase):
@ -1446,7 +1446,7 @@ class WxTplInDB(WxTplBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class WxTplResponse(WxTplInDB):
@ -1528,7 +1528,7 @@ class XueshengBase(BaseModel):
css: str = Field(..., max_length=500)
class Config:
orm_mode = True
from_attributes = True
class XueshengCreate(XueshengBase):
@ -1611,7 +1611,7 @@ class XueshengInDB(XueshengBase):
xuesheng_id: int
class Config:
orm_mode = True
from_attributes = True
class XueshengResponse(XueshengInDB):

View File

@ -1,11 +1,13 @@
from pydantic import BaseModel, Field
from typing import Optional, List
from typing import Optional, List, Union
from datetime import datetime
from decimal import Decimal
from pydantic import field_validator
class UserBase(BaseModel):
weid: int = Field(..., ge=0, description="站点 ID")
class FullExamUserBase(BaseModel):
id: Optional[int] = Field(None, ge=0, description="用户ID")
weid: Union[int, str] = Field(..., description="站点 ID")
openid: str = Field(..., max_length=255, description="用户标识")
unionid: Optional[str] = Field(None, max_length=255, description="联合用户标识")
nickname: Optional[str] = Field(None, max_length=255, description="用户昵称 可保存特殊符号")
@ -37,18 +39,22 @@ class UserBase(BaseModel):
grade: Optional[str] = Field(None, max_length=255, description="年级")
count_day: Optional[int] = Field(0, ge=0, description="累计天数默认为0")
is_band: int = Field(0, ge=0, le=1, description="是否绑定微信 1是0否默认为0")
h5_openid: str = Field(..., max_length=255, description="H5 用户标识")
h5_openid: Optional[str] = Field("", max_length=255, description="H5 用户标识")
class Config:
orm_mode = True
from_attributes = True
class UserCreate(UserBase):
pass # 如果创建时需要额外字段或默认值不同,可以在这里添加
class FullExamUserCreate(FullExamUserBase):
@field_validator('h5_openid', mode='before')
def set_h5_openid(cls, v, values):
if not v and 'openid' in values:
return values['openid']
return v or ""
class UserUpdate(UserBase):
weid: Optional[int] = None
class FullExamUserUpdate(FullExamUserBase):
weid: Optional[Union[int, str]] = None
openid: Optional[str] = None
unionid: Optional[str] = None
nickname: Optional[str] = None
@ -83,7 +89,7 @@ class UserUpdate(UserBase):
h5_openid: Optional[str] = None
class UserInDB(UserBase):
class FullExamUserInDB(FullExamUserBase):
id: int
last_login_time: Optional[datetime] = Field(None, description="最近一次登录时间")
createtime: Optional[datetime] = Field(None, description="创建时间")
@ -94,19 +100,19 @@ class UserInDB(UserBase):
obj.last_login_time = datetime.fromtimestamp(obj.last_login_time)
if isinstance(obj.createtime, int):
obj.createtime = datetime.fromtimestamp(obj.createtime)
return super().from_orm(obj)
return super().model_validate(obj)
class Config:
orm_mode = True
from_attributes = True
class UserResponse(UserInDB):
class FullExamUserResponse(FullExamUserInDB):
pass # 可以根据需要添加额外的字段或调整现有字段
# 用于批量操作的模型
class UserListResponse(BaseModel):
data: List[UserResponse]
class FullExamUserListResponse(BaseModel):
data: List[FullExamUserResponse]
class UserCollectionPractionBase(BaseModel):
@ -119,7 +125,7 @@ class UserCollectionPractionBase(BaseModel):
iscollect: Optional[int] = Field(2, ge=0, le=2, description="是否收藏 1是 2不是默认为2")
class Config:
orm_mode = True
from_attributes = True
class UserCollectionPractionCreate(UserCollectionPractionBase):
@ -147,7 +153,7 @@ class UserCollectionPractionInDB(UserCollectionPractionBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class UserCollectionPractionResponse(UserCollectionPractionInDB):
@ -172,7 +178,7 @@ class UserDoexamBase(BaseModel):
evaluation: int = Field(2, ge=1, le=3, description="评阅状态 1已评2未评默认2 3批改中")
class Config:
orm_mode = True
from_attributes = True
class UserDoexamCreate(UserDoexamBase):
@ -203,7 +209,7 @@ class UserDoexamInDB(UserDoexamBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class UserDoexamResponse(UserDoexamInDB):
@ -227,7 +233,7 @@ class UserDoOtherExamBase(BaseModel):
type: Optional[int] = Field(None, ge=2, le=3, description="类型 2-优先未做 3-智能考试")
class Config:
orm_mode = True
from_attributes = True
class UserDoOtherExamCreate(UserDoOtherExamBase):
@ -257,7 +263,7 @@ class UserDoOtherExamInDB(UserDoOtherExamBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class UserDoOtherExamResponse(UserDoOtherExamInDB):
@ -284,7 +290,7 @@ class UserDoOtherExamAnswerBase(BaseModel):
type: Optional[int] = Field(None, ge=2, le=3, description="类型 2-优先未做 3-智能考试")
class Config:
orm_mode = True
from_attributes = True
class UserDoOtherExamAnswerCreate(UserDoOtherExamAnswerBase):
@ -317,7 +323,7 @@ class UserDoOtherExamAnswerInDB(UserDoOtherExamAnswerBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class UserDoOtherExamAnswerResponse(UserDoOtherExamAnswerInDB):
@ -348,7 +354,7 @@ class UserExamAnswerBase(BaseModel):
simple_evaluation: int = Field(2, ge=1, le=3, description="简答题评阅状态 1已批改默认2未3批改中")
class Config:
orm_mode = True
from_attributes = True
class UserExamAnswerCreate(UserExamAnswerBase):
@ -385,7 +391,7 @@ class UserExamAnswerInDB(UserExamAnswerBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class UserExamAnswerResponse(UserExamAnswerInDB):
@ -405,7 +411,7 @@ class UserFormidBase(BaseModel):
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserFormidCreate(UserFormidBase):
@ -431,7 +437,7 @@ class UserFormidInDB(UserFormidBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class UserFormidResponse(UserFormidInDB):
@ -455,7 +461,7 @@ class UserGiftBase(BaseModel):
consignee_address: Optional[str] = Field(None, max_length=255, description="收货人地址")
class Config:
orm_mode = True
from_attributes = True
class UserGiftCreate(UserGiftBase):
@ -488,7 +494,7 @@ class UserGiftInDB(UserGiftBase):
return super().from_orm(obj)
class Config:
orm_mode = True
from_attributes = True
class UserGiftResponse(UserGiftInDB):
@ -508,7 +514,7 @@ class UserKnowledgeCateBase(BaseModel):
istatus: Optional[int] = Field(1, ge=0, le=1, description="是否删除的标识默认为1")
class Config:
orm_mode = True
from_attributes = True
class UserKnowledgeCateCreate(UserKnowledgeCateBase):
@ -553,7 +559,7 @@ class UserMemberBase(BaseModel):
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserMemberCreate(UserMemberBase):
@ -599,7 +605,7 @@ class UserPoolBase(BaseModel):
paperid: int = Field(0, ge=0, description="试卷id")
class Config:
orm_mode = True
from_attributes = True
class UserPoolCreate(UserPoolBase):
@ -642,7 +648,7 @@ class UserQHighBase(BaseModel):
create_time: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserQHighCreate(UserQHighBase):
@ -684,7 +690,7 @@ class UserQIntensiveBase(BaseModel):
create_time: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserQIntensiveCreate(UserQIntensiveBase):
@ -727,7 +733,7 @@ class UserQTypeBase(BaseModel):
createtime: Optional[int] = Field(None, description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserQTypeCreate(UserQTypeBase):
@ -770,7 +776,7 @@ class UserReadBase(BaseModel):
createtime: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserReadCreate(UserReadBase):
@ -813,7 +819,7 @@ class UserSpecialBase(BaseModel):
createtime: Optional[int] = Field(None, description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserSpecialCreate(UserSpecialBase):
@ -857,7 +863,7 @@ class UserSpequenceBase(BaseModel):
create_time: int = Field(..., description="创建时间Unix 时间戳")
class Config:
orm_mode = True
from_attributes = True
class UserSpequenceCreate(UserSpequenceBase):
@ -903,7 +909,7 @@ class UserWrongPractionBase(BaseModel):
iscollect: Optional[int] = Field(2, ge=0, le=2, description="是否收藏默认为2")
class Config:
orm_mode = True
from_attributes = True
class UserWrongPractionCreate(UserWrongPractionBase):

View File

@ -17,4 +17,4 @@ class ImagesReply(ImagesReplyBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -23,7 +23,7 @@ class McCreditsRecharge(McCreditsRechargeBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class McCreditsRecordBase(BaseModel):
uid: int
@ -49,7 +49,7 @@ class McCreditsRecord(McCreditsRecordBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class MCFansGroupsBase(BaseModel):
uniacid: int
@ -66,7 +66,7 @@ class MCFansGroups(MCFansGroupsBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class MCFansTagBase(BaseModel):
uniacid: int | None = None
@ -99,7 +99,7 @@ class MCFansTag(MCFansTagBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class McChatsRecordBase(BaseModel):
uniacid: int
acid: int
@ -119,7 +119,7 @@ class McChatsRecord(McChatsRecordBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class MCFansTagMappingBase(BaseModel):
fanid: int
@ -135,7 +135,7 @@ class MCFansTagMapping(MCFansTagMappingBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class MCGroupsBase(BaseModel):
uniacid: int
@ -153,7 +153,7 @@ class MCGroups(MCGroupsBase):
groupid: int
class Config:
orm_mode = True
from_attributes = True
class MCHandselBase(BaseModel):
uniacid: int
@ -175,7 +175,7 @@ class MCHandsel(MCHandselBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class McMembersBase(BaseModel):
uniacid: int
@ -242,7 +242,7 @@ class McMembers(McMembersBase):
uid: int
class Config:
orm_mode = True
from_attributes = True
class McMemberAddressBase(BaseModel):
uniacid: int
@ -266,7 +266,7 @@ class McMemberAddress(McMemberAddressBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class McMemberFieldsBase(BaseModel):
uniacid: int
@ -285,7 +285,7 @@ class McMemberFields(McMemberFieldsBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class McMemberPropertyBase(BaseModel):
uniacid: int
@ -301,7 +301,7 @@ class McMemberProperty(McMemberPropertyBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class McOauthFansBase(BaseModel):
oauth_openid: str
@ -319,4 +319,4 @@ class McOauthFans(McOauthFansBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -18,4 +18,4 @@ class MenuEvent(MenuEventBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -21,4 +21,4 @@ class MessageNoticeLog(MessageNoticeLogBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -21,7 +21,7 @@ class IMSModulesBindings(IMSModulesBindingsBase):
eid: int
class Config:
orm_mode = True
from_attributes = True
class IMSModulesCloudBase(BaseModel):
name: str
@ -57,7 +57,7 @@ class IMSModulesCloud(IMSModulesCloudBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class ModulesBase(BaseModel):
name: str
@ -103,7 +103,7 @@ class Modules(ModulesBase):
mid: int
class Config:
orm_mode = True
from_attributes = True
class IMSModulesIgnoreBase(BaseModel):
@ -117,7 +117,7 @@ class IMSModulesIgnore(IMSModulesIgnoreBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class IMSModulesPluginBase(BaseModel):
@ -131,7 +131,7 @@ class IMSModulesPlugin(IMSModulesPluginBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class IMSModulesPluginRankBase(BaseModel):
@ -148,7 +148,7 @@ class IMSModulesPluginRank(IMSModulesPluginRankBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class IMSModulesRankBase(BaseModel):
@ -164,7 +164,7 @@ class IMSModulesRank(IMSModulesRankBase):
id: int
class Config:
orm_mode = True
from_attributes = True
class IMSModulesRecycleBase(BaseModel):
@ -187,4 +187,4 @@ class IMSModulesRecycle(IMSModulesRecycleBase):
id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -14,4 +14,4 @@ class MusicReply(MusicReplyBase):
Id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -21,4 +21,4 @@ class NewsReply(NewsReplyBase):
Id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -14,4 +14,4 @@ class PhoneappVersions(PhoneappVersionsBase):
Id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -44,7 +44,7 @@ class Qrcode(QrcodeBase):
Id: int
class Config:
orm_mode = True
from_attributes = True
class QrcodeStatBase(BaseModel):
Uniacid: int
@ -64,4 +64,4 @@ class QrcodeStat(QrcodeStatBase):
Id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -15,4 +15,4 @@ class Rule(RuleBase):
Id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -16,4 +16,4 @@ class RuleKeyword(RuleKeywordBase):
Id: int
class Config:
orm_mode = True
from_attributes = True

View File

@ -0,0 +1,47 @@
import asyncio
import socket
import logging
from typing import Dict, List, Tuple
logger = logging.getLogger(__name__)
async def check_connection(host: str, port: int = 443) -> Tuple[bool, str]:
"""检查与指定主机的连接"""
try:
# 尝试创建套接字连接
future = asyncio.open_connection(host, port)
reader, writer = await asyncio.wait_for(future, timeout=5)
# 关闭连接
writer.close()
await writer.wait_closed()
return True, f"成功连接到 {host}:{port}"
except asyncio.TimeoutError:
return False, f"连接到 {host}:{port} 超时"
except socket.gaierror:
return False, f"无法解析主机名 {host}"
except Exception as e:
return False, f"连接到 {host}:{port} 失败: {str(e)}"
async def diagnose_wechat_api() -> Dict[str, str]:
"""诊断微信API连接问题"""
results = {}
# 检查几个微信API域名
domains = [
("api.weixin.qq.com", 443),
("mp.weixin.qq.com", 443)
]
for domain, port in domains:
success, message = await check_connection(domain, port)
results[domain] = message
return results
# 使用示例
# async def run_diagnostic():
# results = await diagnose_wechat_api()
# for domain, result in results.items():
# print(f"{domain}: {result}")

View File

@ -2,22 +2,81 @@ import json
import httpx
from typing import Optional, Dict, Any, Union
from mooc.core.config import settings
import asyncio
import time
import logging
import os
logger = logging.getLogger(__name__)
class WeChatAPI:
def __init__(self, appid: str = None, appsecret: str = None):
self.appid = appid or settings.WECHAT_APPID
self.appsecret = appsecret or settings.WECHAT_APPSECRET
self._access_token = ""
self.timeout = 10.0 # 设置10秒超时
self.max_retries = 3 # 最大重试次数
async def _get(self, url: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(url, verify=False)
"""带重试功能的GET请求"""
retry_count = 0
last_error = None
while retry_count < self.max_retries:
try:
logger.info(f"请求微信API: {url}")
async with httpx.AsyncClient(verify=False, timeout=self.timeout) as client:
response = await client.get(url)
logger.info(f"微信API响应: {response.status_code}")
return response.json()
except (httpx.ConnectTimeout, httpx.ReadTimeout) as e:
retry_count += 1
last_error = e
logger.warning(f"请求微信API超时{retry_count}次重试: {str(e)}")
if retry_count < self.max_retries:
# 等待一段时间再重试,使用指数退避策略
await asyncio.sleep(1 * (2 ** (retry_count - 1)))
# 所有重试都失败
logger.error(f"请求微信API失败已重试{self.max_retries}次: {str(last_error)}")
# 如果开发模式已启用,则返回模拟数据
if hasattr(settings, 'DEV_MODE') and settings.DEV_MODE:
# 返回模拟数据
logger.info("使用模拟数据替代微信API响应")
if 'jscode2session' in url:
return {
"openid": f"test_openid_{int(time.time())}",
"session_key": "test_session_key",
"unionid": f"test_unionid_{int(time.time())}"
}
elif 'token' in url:
return {"access_token": "test_access_token", "expires_in": 7200}
# 如果没有开发模式或者不是可以模拟的API则抛出异常
raise last_error
async def _post(self, url: str, data: Dict[str, Any]) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, verify=False)
"""带重试功能的POST请求"""
retry_count = 0
last_error = None
while retry_count < self.max_retries:
try:
logger.info(f"POST请求微信API: {url}")
async with httpx.AsyncClient(verify=False, timeout=self.timeout) as client:
response = await client.post(url, json=data)
logger.info(f"微信API响应: {response.status_code}")
return response.json()
except (httpx.ConnectTimeout, httpx.ReadTimeout) as e:
retry_count += 1
last_error = e
logger.warning(f"POST请求微信API超时{retry_count}次重试: {str(e)}")
if retry_count < self.max_retries:
await asyncio.sleep(1 * (2 ** (retry_count - 1)))
logger.error(f"POST请求微信API失败已重试{self.max_retries}次: {str(last_error)}")
raise last_error
async def get_access_token(self) -> str:
"""获取access token"""
@ -33,7 +92,7 @@ class WeChatAPI:
url = f"https://api.weixin.qq.com/sns/jscode2session?appid={self.appid}&secret={self.appsecret}&js_code={code}&grant_type=authorization_code"
result = await self._get(url)
if "errcode" in result and result["errcode"] != 0:
raise Exception(f"Code2Session failed: {result['errmsg']}")
raise Exception(f"Code2Session failed: {result['errmsg']}code:{code}")
return result
async def get_unlimited_qrcode(self, scene: str, access_token: str = None) -> bytes:
@ -133,14 +192,33 @@ class WeChatAPI:
"""生成并保存小程序码到文件"""
if not access_token:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={access_token}"
data = {"scene": f"uid={uid}"}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, verify=False)
retry_count = 0
last_error = None
while retry_count < self.max_retries:
try:
async with httpx.AsyncClient(verify=False, timeout=self.timeout) as client:
response = await client.post(url, json=data)
if response.status_code == 200:
# 确保目录存在
os.makedirs(os.path.dirname(f"{path}{filename}"), exist_ok=True)
full_path = f"{path}{filename}"
with open(full_path, 'wb') as f:
f.write(response.content)
return True
except Exception as e:
retry_count += 1
last_error = e
logger.warning(f"生成二维码失败,第{retry_count}次重试: {str(e)}")
if retry_count < self.max_retries:
await asyncio.sleep(1 * (2 ** (retry_count - 1)))
logger.error(f"生成二维码失败,已重试{self.max_retries}次: {str(last_error)}")
return False
wx_api = WeChatAPI()

View File

@ -18,3 +18,4 @@ pymysql>=1.1.1
cryptography>=42.0.5
mysqlclient>=2.2.6
pydantic-settings>=2.7.0
colorlog>=6.9.0

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 441 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 568 KiB

1
小程序端代码示例 Normal file
View File

@ -0,0 +1 @@