import json class Request: def __init__(self, data): self.method = data.get('httpMethod', "POST") self.path = data.get('path', '/') self.headers = data.get('headers', {}) self.raw = data.get('body','{}') self.query_params = data.get('queryStringParameters', {}) self.path_params = data.get('pathParameters', {}) self.request_context = data.get('requestContext', {}) # 尝试解析body为JSON try: self.body = json.loads(self.raw) except json.JSONDecodeError: self.body = {} class Response: def __init__(self, body=None, status_code=200): self.body = body or {} self.status_code = status_code def to_dict(self): return { 'code': self.status_code, 'data': self.body } class Router: def __init__(self): self.routes = {} def route(self, path, methods=None): """路由装饰器,用于注册处理函数""" if methods is None: methods = ['GET'] def decorator(func): for method in methods: key = f"{method}:{path}" self.routes[key] = func return func return decorator def handle_request(self, event): """处理HTTP请求,根据路径和方法找到对应的处理函数""" request = Request(event) # 查找路由 route_key = f"{request.method}:{request.path}" _handler = self.routes.get(route_key) if not _handler: return Response( f'No handler found for {request.path} with method {request.method}' , status_code=404).to_dict() try: # 调用处理函数 response = _handler(request) # 如果处理函数返回的是字典,包装成Response对象 if isinstance(response, dict): return Response(response).to_dict() # 如果处理函数返回的是Response对象,直接转为字典 if isinstance(response, Response): return response.to_dict() # 默认返回成功 return Response(response).to_dict() except Exception as e: # 错误处理 return Response({ 'error': 'Internal Server Error', 'message': str(e) }, status_code=500).to_dict()