mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-10-23 10:20:01 +00:00
feat(core): Upgrade pydantic to 2.x (#1428)
This commit is contained in:
@@ -1,9 +1,14 @@
|
||||
"""FastAPI utilities."""
|
||||
|
||||
from typing import Any, Callable, Dict
|
||||
import importlib.metadata as metadata
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.routing import APIRouter
|
||||
|
||||
_FASTAPI_VERSION = metadata.version("fastapi")
|
||||
|
||||
|
||||
class PriorityAPIRouter(APIRouter):
|
||||
"""A router with priority.
|
||||
@@ -41,3 +46,85 @@ class PriorityAPIRouter(APIRouter):
|
||||
return self.route_priority.get(route.path, 0)
|
||||
|
||||
self.routes.sort(key=my_func, reverse=True)
|
||||
|
||||
|
||||
_HAS_STARTUP = False
|
||||
_HAS_SHUTDOWN = False
|
||||
_GLOBAL_STARTUP_HANDLERS: List[Callable] = []
|
||||
|
||||
_GLOBAL_SHUTDOWN_HANDLERS: List[Callable] = []
|
||||
|
||||
|
||||
def register_event_handler(app: FastAPI, event: str, handler: Callable):
|
||||
"""Register an event handler.
|
||||
|
||||
Args:
|
||||
app (FastAPI): The FastAPI app.
|
||||
event (str): The event type.
|
||||
handler (Callable): The handler function.
|
||||
|
||||
"""
|
||||
if _FASTAPI_VERSION >= "0.109.1":
|
||||
# https://fastapi.tiangolo.com/release-notes/#01091
|
||||
if event == "startup":
|
||||
if _HAS_STARTUP:
|
||||
raise ValueError(
|
||||
"FastAPI app already started. Cannot add startup handler."
|
||||
)
|
||||
_GLOBAL_STARTUP_HANDLERS.append(handler)
|
||||
elif event == "shutdown":
|
||||
if _HAS_SHUTDOWN:
|
||||
raise ValueError(
|
||||
"FastAPI app already shutdown. Cannot add shutdown handler."
|
||||
)
|
||||
_GLOBAL_SHUTDOWN_HANDLERS.append(handler)
|
||||
else:
|
||||
raise ValueError(f"Invalid event: {event}")
|
||||
else:
|
||||
if event == "startup":
|
||||
app.add_event_handler("startup", handler)
|
||||
elif event == "shutdown":
|
||||
app.add_event_handler("shutdown", handler)
|
||||
else:
|
||||
raise ValueError(f"Invalid event: {event}")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
# Trigger the startup event.
|
||||
global _HAS_STARTUP, _HAS_SHUTDOWN
|
||||
for handler in _GLOBAL_STARTUP_HANDLERS:
|
||||
await handler()
|
||||
_HAS_STARTUP = True
|
||||
yield
|
||||
# Trigger the shutdown event.
|
||||
for handler in _GLOBAL_SHUTDOWN_HANDLERS:
|
||||
await handler()
|
||||
_HAS_SHUTDOWN = True
|
||||
|
||||
|
||||
def create_app(*args, **kwargs) -> FastAPI:
|
||||
"""Create a FastAPI app."""
|
||||
_sp = None
|
||||
if _FASTAPI_VERSION >= "0.109.1":
|
||||
if "lifespan" not in kwargs:
|
||||
kwargs["lifespan"] = lifespan
|
||||
_sp = kwargs["lifespan"]
|
||||
app = FastAPI(*args, **kwargs)
|
||||
if _sp:
|
||||
app.__dbgpt_custom_lifespan = _sp
|
||||
return app
|
||||
|
||||
|
||||
def replace_router(app: FastAPI, router: Optional[APIRouter] = None):
|
||||
"""Replace the router of the FastAPI app."""
|
||||
if not router:
|
||||
router = PriorityAPIRouter()
|
||||
if _FASTAPI_VERSION >= "0.109.1":
|
||||
if hasattr(app, "__dbgpt_custom_lifespan"):
|
||||
_sp = getattr(app, "__dbgpt_custom_lifespan")
|
||||
router.lifespan_context = _sp
|
||||
|
||||
app.router = router
|
||||
app.setup()
|
||||
return app
|
||||
|
Reference in New Issue
Block a user