DB-GPT/dbgpt/serve/agent/app/endpoints.py
明天 b124ecc10b
feat: (0.6)New UI (#1855)
Co-authored-by: 夏姜 <wenfengjiang.jwf@digital-engine.com>
Co-authored-by: aries_ckt <916701291@qq.com>
Co-authored-by: wb-lh513319 <wb-lh513319@alibaba-inc.com>
Co-authored-by: csunny <cfqsunny@163.com>
2024-08-21 17:37:45 +08:00

68 lines
2.1 KiB
Python

from typing import Optional
from fastapi import APIRouter, Query
from dbgpt.serve.agent.db.gpts_app import (
GptsApp,
GptsAppCollectionDao,
GptsAppDao,
GptsAppQuery,
)
from dbgpt.serve.core import Result
router = APIRouter()
gpts_dao = GptsAppDao()
collection_dao = GptsAppCollectionDao()
gpts_dao.init_native_apps("dbgpt")
@router.get("/v2/serve/apps")
async def app_list(
user_name: Optional[str] = Query(default=None, description="user name"),
sys_code: Optional[str] = Query(default=None, description="system code"),
is_collected: Optional[str] = Query(default=None, description="system code"),
page: int = Query(default=1, description="current page"),
page_size: int = Query(default=20, description="page size"),
):
try:
query = GptsAppQuery(
page_no=page, page_size=page_size, is_collected=is_collected
)
return Result.succ(gpts_dao.app_list(query, True))
except Exception as ex:
return Result.failed(err_code="E000X", msg=f"query app error: {ex}")
@router.get("/v2/serve/apps/{app_id}")
async def app_detail(app_id: str):
try:
return Result.succ(gpts_dao.app_detail(app_id))
except Exception as ex:
return Result.failed(err_code="E000X", msg=f"query app error: {ex}")
@router.put("/v2/serve/apps/{app_id}")
async def app_update(app_id: str, gpts_app: GptsApp):
try:
return Result.succ(gpts_dao.edit(gpts_app))
except Exception as ex:
return Result.failed(err_code="E000X", msg=f"edit app error: {ex}")
@router.post("/v2/serve/apps")
async def app_create(gpts_app: GptsApp):
try:
return Result.succ(gpts_dao.create(gpts_app))
except Exception as ex:
return Result.failed(err_code="E000X", msg=f"edit app error: {ex}")
@router.delete("/v2/serve/apps/{app_id}")
async def app_delete(app_id: str, user_code: Optional[str], sys_code: Optional[str]):
try:
gpts_dao.delete(app_id, user_code, sys_code)
return Result.succ([])
except Exception as ex:
return Result.failed(err_code="E000X", msg=f"delete app error: {ex}")