54 lines
1.7 KiB
Python
54 lines
1.7 KiB
Python
from typing import Dict
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from api.base import app
|
|
from extension.standard import db_manager
|
|
|
|
|
|
@app.get("/api/schemas", response_model=Dict)
|
|
async def get_schemas(page: int = 1, per_page: int = 5):
|
|
result = db_manager.schemas.get_all(page=page, per_page=per_page)
|
|
return {
|
|
'items': [dict(row) for row in result['items']],
|
|
'pagination': {
|
|
'total': result['total'],
|
|
'page': result['page'],
|
|
'per_page': result['per_page'],
|
|
'total_pages': result['total_pages']
|
|
}
|
|
}
|
|
|
|
|
|
@app.post("/api/schemas", response_model=Dict)
|
|
async def create_schema(data: Dict):
|
|
content = data.get('content')
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail='内容不能为空')
|
|
schema_id = db_manager.schemas.create(name=data.get('name', ''), content=content)
|
|
return {'id': schema_id}
|
|
|
|
|
|
@app.get("/api/schemas/{schema_id}", response_model=Dict)
|
|
async def get_schema(schema_id: int):
|
|
schema = db_manager.schemas.get_by_id(schema_id)
|
|
if schema:
|
|
return dict(schema)
|
|
raise HTTPException(status_code=404, detail='Schema not found')
|
|
|
|
|
|
@app.put("/api/schemas/{schema_id}", response_model=Dict)
|
|
async def update_schema(schema_id: int, data: Dict):
|
|
name = data.get('name', '')
|
|
content = data.get('content')
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail='内容不能为空')
|
|
db_manager.schemas.update(schema_id, name=name, content=content)
|
|
return {'message': 'Schema updated'}
|
|
|
|
|
|
@app.delete("/api/schemas/{schema_id}", response_model=Dict)
|
|
async def delete_schema(schema_id: int):
|
|
db_manager.schemas.delete(schema_id)
|
|
return {'message': 'Schema deleted'}
|