diff --git a/.env.template b/.env.template index 0b660b575..1f653f231 100644 --- a/.env.template +++ b/.env.template @@ -235,4 +235,10 @@ SUMMARY_CONFIG=FAST # FATAL, ERROR, WARNING, WARNING, INFO, DEBUG, NOTSET DBGPT_LOG_LEVEL=INFO # LOG dir, default: ./logs -#DBGPT_LOG_DIR= \ No newline at end of file +#DBGPT_LOG_DIR= + + +#*******************************************************************# +#** API_KEYS **# +#*******************************************************************# +# API_KEYS=dbgpt \ No newline at end of file diff --git a/dbgpt/_private/config.py b/dbgpt/_private/config.py index e573da7ad..893388b34 100644 --- a/dbgpt/_private/config.py +++ b/dbgpt/_private/config.py @@ -286,6 +286,8 @@ class Config(metaclass=Singleton): self.MODEL_CACHE_STORAGE_DISK_DIR: Optional[str] = os.getenv( "MODEL_CACHE_STORAGE_DISK_DIR" ) + # global dbgpt api key + self.API_KEYS = os.getenv("API_KEYS", None) @property def local_db_manager(self) -> "ConnectorManager": diff --git a/dbgpt/app/dbgpt_server.py b/dbgpt/app/dbgpt_server.py index df6cdb9d4..c78c82f9d 100644 --- a/dbgpt/app/dbgpt_server.py +++ b/dbgpt/app/dbgpt_server.py @@ -89,13 +89,17 @@ def mount_routers(app: FastAPI): router as api_editor_route_v1, ) from dbgpt.app.openapi.api_v1.feedback.api_fb_v1 import router as api_fb_v1 + from dbgpt.app.openapi.api_v2 import router as api_v2 from dbgpt.serve.agent.app.controller import router as gpts_v1 + from dbgpt.serve.agent.app.endpoints import router as app_v2 app.include_router(api_v1, prefix="/api", tags=["Chat"]) + app.include_router(api_v2, prefix="/api", tags=["ChatV2"]) app.include_router(api_editor_route_v1, prefix="/api", tags=["Editor"]) app.include_router(llm_manage_api, prefix="/api", tags=["LLM Manage"]) app.include_router(api_fb_v1, prefix="/api", tags=["FeedBack"]) app.include_router(gpts_v1, prefix="/api", tags=["GptsApp"]) + app.include_router(app_v2, prefix="/api", tags=["App"]) app.include_router(knowledge_router, tags=["Knowledge"]) diff --git a/dbgpt/app/initialization/db_model_initialization.py b/dbgpt/app/initialization/db_model_initialization.py index f2243dfc1..4d6353bcc 100644 --- a/dbgpt/app/initialization/db_model_initialization.py +++ b/dbgpt/app/initialization/db_model_initialization.py @@ -2,12 +2,12 @@ """ from dbgpt.app.knowledge.chunk_db import DocumentChunkEntity from dbgpt.app.knowledge.document_db import KnowledgeDocumentEntity -from dbgpt.app.knowledge.space_db import KnowledgeSpaceEntity from dbgpt.app.openapi.api_v1.feedback.feed_back_db import ChatFeedBackEntity from dbgpt.datasource.manages.connect_config_db import ConnectConfigEntity from dbgpt.serve.agent.db.my_plugin_db import MyPluginEntity from dbgpt.serve.agent.db.plugin_hub_db import PluginHubEntity from dbgpt.serve.prompt.models.models import ServeEntity as PromptManageEntity +from dbgpt.serve.rag.models.models import KnowledgeSpaceEntity from dbgpt.storage.chat_history.chat_history_db import ( ChatHistoryEntity, ChatHistoryMessageEntity, diff --git a/dbgpt/app/initialization/serve_initialization.py b/dbgpt/app/initialization/serve_initialization.py index 5d8f06709..4e757fca5 100644 --- a/dbgpt/app/initialization/serve_initialization.py +++ b/dbgpt/app/initialization/serve_initialization.py @@ -5,6 +5,8 @@ from dbgpt.component import SystemApp def register_serve_apps(system_app: SystemApp, cfg: Config): """Register serve apps""" system_app.config.set("dbgpt.app.global.language", cfg.LANGUAGE) + if cfg.API_KEYS: + system_app.config.set("dbgpt.app.global.api_keys", cfg.API_KEYS) # ################################ Prompt Serve Register Begin ###################################### from dbgpt.serve.prompt.serve import ( @@ -42,4 +44,12 @@ def register_serve_apps(system_app: SystemApp, cfg: Config): # Register serve app system_app.register(FlowServe) + + from dbgpt.serve.rag.serve import ( + SERVE_CONFIG_KEY_PREFIX as RAG_SERVE_CONFIG_KEY_PREFIX, + ) + from dbgpt.serve.rag.serve import Serve as RagServe + + # Register serve app + system_app.register(RagServe) # ################################ AWEL Flow Serve Register End ######################################## diff --git a/dbgpt/app/knowledge/api.py b/dbgpt/app/knowledge/api.py index 875dbcb18..a891c46ea 100644 --- a/dbgpt/app/knowledge/api.py +++ b/dbgpt/app/knowledge/api.py @@ -4,7 +4,7 @@ import shutil import tempfile from typing import List -from fastapi import APIRouter, File, Form, UploadFile +from fastapi import APIRouter, Depends, File, Form, UploadFile from dbgpt._private.config import Config from dbgpt.app.knowledge.request.request import ( @@ -16,7 +16,6 @@ from dbgpt.app.knowledge.request.request import ( KnowledgeDocumentRequest, KnowledgeQueryRequest, KnowledgeSpaceRequest, - KnowledgeSyncRequest, SpaceArgumentRequest, ) from dbgpt.app.knowledge.request.response import KnowledgeQueryResponse @@ -31,6 +30,8 @@ from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory from dbgpt.rag.knowledge.base import ChunkStrategy from dbgpt.rag.knowledge.factory import KnowledgeFactory from dbgpt.rag.retriever.embedding import EmbeddingRetriever +from dbgpt.serve.rag.api.schemas import KnowledgeSyncRequest +from dbgpt.serve.rag.service.service import Service from dbgpt.storage.vector_store.base import VectorStoreConfig from dbgpt.storage.vector_store.connector import VectorStoreConnector from dbgpt.util.tracer import SpanType, root_tracer @@ -44,6 +45,11 @@ router = APIRouter() knowledge_space_service = KnowledgeService() +def get_rag_service() -> Service: + """Get Rag Service.""" + return Service.get_instance(CFG.SYSTEM_APP) + + @router.post("/knowledge/space/add") def space_add(request: KnowledgeSpaceRequest): print(f"/space/add params: {request}") @@ -226,12 +232,20 @@ def document_sync(space_name: str, request: DocumentSyncRequest): @router.post("/knowledge/{space_name}/document/sync_batch") -def batch_document_sync(space_name: str, request: List[KnowledgeSyncRequest]): +def batch_document_sync( + space_name: str, + request: List[KnowledgeSyncRequest], + service: Service = Depends(get_rag_service), +): logger.info(f"Received params: {space_name}, {request}") try: - doc_ids = knowledge_space_service.batch_document_sync( - space_name=space_name, sync_requests=request - ) + space = service.get({"name": space_name}) + for sync_request in request: + sync_request.space_id = space.id + doc_ids = service.sync_document(requests=request) + # doc_ids = service.sync_document( + # space_name=space_name, sync_requests=request + # ) return Result.succ({"tasks": doc_ids}) except Exception as e: return Result.failed(code="E000X", msg=f"document sync error {e}") diff --git a/dbgpt/app/knowledge/document_db.py b/dbgpt/app/knowledge/document_db.py index 7e08d0733..1165b7296 100644 --- a/dbgpt/app/knowledge/document_db.py +++ b/dbgpt/app/knowledge/document_db.py @@ -1,9 +1,11 @@ from datetime import datetime -from typing import List +from typing import Any, Dict, List, Union from sqlalchemy import Column, DateTime, Integer, String, Text, func from dbgpt._private.config import Config +from dbgpt.serve.conversation.api.schemas import ServeRequest +from dbgpt.serve.rag.api.schemas import DocumentServeRequest, DocumentServeResponse from dbgpt.storage.metadata import BaseDao, Model CFG = Config() @@ -218,3 +220,70 @@ class KnowledgeDocumentDao(BaseDao): knowledge_documents.delete() session.commit() session.close() + + def from_request( + self, request: Union[ServeRequest, Dict[str, Any]] + ) -> KnowledgeDocumentEntity: + """Convert the request to an entity + + Args: + request (Union[ServeRequest, Dict[str, Any]]): The request + + Returns: + T: The entity + """ + request_dict = ( + request.dict() if isinstance(request, DocumentServeRequest) else request + ) + entity = KnowledgeDocumentEntity(**request_dict) + return entity + + def to_request(self, entity: KnowledgeDocumentEntity) -> DocumentServeResponse: + """Convert the entity to a request + + Args: + entity (T): The entity + + Returns: + REQ: The request + """ + return DocumentServeResponse( + id=entity.id, + doc_name=entity.doc_name, + doc_type=entity.doc_type, + space=entity.space, + chunk_size=entity.chunk_size, + status=entity.status, + last_sync=entity.last_sync, + content=entity.content, + result=entity.result, + vector_ids=entity.vector_ids, + summary=entity.summary, + gmt_created=entity.gmt_created, + gmt_modified=entity.gmt_modified, + ) + + def to_response(self, entity: KnowledgeDocumentEntity) -> DocumentServeResponse: + """Convert the entity to a response + + Args: + entity (T): The entity + + Returns: + REQ: The request + """ + return DocumentServeResponse( + id=entity.id, + doc_name=entity.doc_name, + doc_type=entity.doc_type, + space=entity.space, + chunk_size=entity.chunk_size, + status=entity.status, + last_sync=entity.last_sync, + content=entity.content, + result=entity.result, + vector_ids=entity.vector_ids, + summary=entity.summary, + gmt_created=entity.gmt_created, + gmt_modified=entity.gmt_modified, + ) diff --git a/dbgpt/app/knowledge/request/request.py b/dbgpt/app/knowledge/request/request.py index fbd6a697c..f15d3e7f2 100644 --- a/dbgpt/app/knowledge/request/request.py +++ b/dbgpt/app/knowledge/request/request.py @@ -17,6 +17,8 @@ class KnowledgeQueryRequest(BaseModel): class KnowledgeSpaceRequest(BaseModel): """name: knowledge space name""" + """vector_type: vector type""" + id: int = None name: str = None """vector_type: vector type""" vector_type: str = None @@ -37,9 +39,6 @@ class KnowledgeDocumentRequest(BaseModel): """content: content""" source: str = None - """text_chunk_size: text_chunk_size""" - # text_chunk_size: int - class DocumentQueryRequest(BaseModel): """doc_name: doc path""" @@ -80,18 +79,18 @@ class DocumentSyncRequest(BaseModel): chunk_overlap: Optional[int] = None -class KnowledgeSyncRequest(BaseModel): - """Sync request""" - - """doc_ids: doc ids""" - doc_id: int - - """model_name: model name""" - model_name: Optional[str] = None - - """chunk_parameters: chunk parameters - """ - chunk_parameters: ChunkParameters +# class KnowledgeSyncRequest(BaseModel): +# """Sync request""" +# +# """doc_ids: doc ids""" +# doc_id: int +# +# """model_name: model name""" +# model_name: Optional[str] = None +# +# """chunk_parameters: chunk parameters +# """ +# chunk_parameters: ChunkParameters class ChunkQueryRequest(BaseModel): diff --git a/dbgpt/app/knowledge/service.py b/dbgpt/app/knowledge/service.py index c6bf61fc8..5b59bc803 100644 --- a/dbgpt/app/knowledge/service.py +++ b/dbgpt/app/knowledge/service.py @@ -1,7 +1,6 @@ import json import logging from datetime import datetime -from enum import Enum from typing import List from dbgpt._private.config import Config @@ -17,7 +16,6 @@ from dbgpt.app.knowledge.request.request import ( DocumentSyncRequest, KnowledgeDocumentRequest, KnowledgeSpaceRequest, - KnowledgeSyncRequest, SpaceArgumentRequest, ) from dbgpt.app.knowledge.request.response import ( @@ -25,7 +23,6 @@ from dbgpt.app.knowledge.request.response import ( DocumentQueryResponse, SpaceQueryResponse, ) -from dbgpt.app.knowledge.space_db import KnowledgeSpaceDao, KnowledgeSpaceEntity from dbgpt.component import ComponentType from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG from dbgpt.core import Chunk @@ -38,8 +35,11 @@ from dbgpt.rag.text_splitter.text_splitter import ( RecursiveCharacterTextSplitter, SpacyTextSplitter, ) +from dbgpt.serve.rag.api.schemas import KnowledgeSyncRequest from dbgpt.serve.rag.assembler.embedding import EmbeddingAssembler from dbgpt.serve.rag.assembler.summary import SummaryAssembler +from dbgpt.serve.rag.models.models import KnowledgeSpaceDao, KnowledgeSpaceEntity +from dbgpt.serve.rag.service.service import Service, SyncStatus from dbgpt.storage.vector_store.base import VectorStoreConfig from dbgpt.storage.vector_store.connector import VectorStoreConnector from dbgpt.util.executor_utils import ExecutorFactory, blocking_func_to_async @@ -53,13 +53,6 @@ logger = logging.getLogger(__name__) CFG = Config() -class SyncStatus(Enum): - TODO = "TODO" - FAILED = "FAILED" - RUNNING = "RUNNING" - FINISHED = "FINISHED" - - # default summary max iteration call with llm. DEFAULT_SUMMARY_MAX_ITERATION = 5 # default summary concurrency call with llm. @@ -88,8 +81,8 @@ class KnowledgeService: spaces = knowledge_space_dao.get_knowledge_space(query) if len(spaces) > 0: raise Exception(f"space name:{request.name} have already named") - knowledge_space_dao.create_knowledge_space(request) - return True + space_id = knowledge_space_dao.create_knowledge_space(request) + return space_id def create_knowledge_document(self, space, request: KnowledgeDocumentRequest): """create knowledge document @@ -199,7 +192,9 @@ class KnowledgeService: return res def batch_document_sync( - self, space_name, sync_requests: List[KnowledgeSyncRequest] + self, + space_name, + sync_requests: List[KnowledgeSyncRequest], ) -> List[int]: """batch sync knowledge document chunk into vector store Args: diff --git a/dbgpt/app/knowledge/space_db.py b/dbgpt/app/knowledge/space_db.py index bc283db1d..933a2b57b 100644 --- a/dbgpt/app/knowledge/space_db.py +++ b/dbgpt/app/knowledge/space_db.py @@ -1,93 +1,93 @@ -from datetime import datetime - -from sqlalchemy import Column, DateTime, Integer, String, Text - -from dbgpt._private.config import Config -from dbgpt.app.knowledge.request.request import KnowledgeSpaceRequest -from dbgpt.storage.metadata import BaseDao, Model - -CFG = Config() - - -class KnowledgeSpaceEntity(Model): - __tablename__ = "knowledge_space" - id = Column(Integer, primary_key=True) - name = Column(String(100)) - vector_type = Column(String(100)) - desc = Column(String(100)) - owner = Column(String(100)) - context = Column(Text) - gmt_created = Column(DateTime) - gmt_modified = Column(DateTime) - - def __repr__(self): - return f"KnowledgeSpaceEntity(id={self.id}, name='{self.name}', vector_type='{self.vector_type}', desc='{self.desc}', owner='{self.owner}' context='{self.context}', gmt_created='{self.gmt_created}', gmt_modified='{self.gmt_modified}')" - - -class KnowledgeSpaceDao(BaseDao): - def create_knowledge_space(self, space: KnowledgeSpaceRequest): - session = self.get_raw_session() - knowledge_space = KnowledgeSpaceEntity( - name=space.name, - vector_type=CFG.VECTOR_STORE_TYPE, - desc=space.desc, - owner=space.owner, - gmt_created=datetime.now(), - gmt_modified=datetime.now(), - ) - session.add(knowledge_space) - session.commit() - session.close() - - def get_knowledge_space(self, query: KnowledgeSpaceEntity): - session = self.get_raw_session() - knowledge_spaces = session.query(KnowledgeSpaceEntity) - if query.id is not None: - knowledge_spaces = knowledge_spaces.filter( - KnowledgeSpaceEntity.id == query.id - ) - if query.name is not None: - knowledge_spaces = knowledge_spaces.filter( - KnowledgeSpaceEntity.name == query.name - ) - if query.vector_type is not None: - knowledge_spaces = knowledge_spaces.filter( - KnowledgeSpaceEntity.vector_type == query.vector_type - ) - if query.desc is not None: - knowledge_spaces = knowledge_spaces.filter( - KnowledgeSpaceEntity.desc == query.desc - ) - if query.owner is not None: - knowledge_spaces = knowledge_spaces.filter( - KnowledgeSpaceEntity.owner == query.owner - ) - if query.gmt_created is not None: - knowledge_spaces = knowledge_spaces.filter( - KnowledgeSpaceEntity.gmt_created == query.gmt_created - ) - if query.gmt_modified is not None: - knowledge_spaces = knowledge_spaces.filter( - KnowledgeSpaceEntity.gmt_modified == query.gmt_modified - ) - - knowledge_spaces = knowledge_spaces.order_by( - KnowledgeSpaceEntity.gmt_created.desc() - ) - result = knowledge_spaces.all() - session.close() - return result - - def update_knowledge_space(self, space: KnowledgeSpaceEntity): - session = self.get_raw_session() - session.merge(space) - session.commit() - session.close() - return True - - def delete_knowledge_space(self, space: KnowledgeSpaceEntity): - session = self.get_raw_session() - if space: - session.delete(space) - session.commit() - session.close() +# from datetime import datetime +# +# from sqlalchemy import Column, DateTime, Integer, String, Text +# +# from dbgpt._private.config import Config +# from dbgpt.app.knowledge.request.request import KnowledgeSpaceRequest +# from dbgpt.storage.metadata import BaseDao, Model +# +# CFG = Config() +# +# +# class KnowledgeSpaceEntity(Model): +# __tablename__ = "knowledge_space" +# id = Column(Integer, primary_key=True) +# name = Column(String(100)) +# vector_type = Column(String(100)) +# desc = Column(String(100)) +# owner = Column(String(100)) +# context = Column(Text) +# gmt_created = Column(DateTime) +# gmt_modified = Column(DateTime) +# +# def __repr__(self): +# return f"KnowledgeSpaceEntity(id={self.id}, name='{self.name}', vector_type='{self.vector_type}', desc='{self.desc}', owner='{self.owner}' context='{self.context}', gmt_created='{self.gmt_created}', gmt_modified='{self.gmt_modified}')" +# +# +# class KnowledgeSpaceDao(BaseDao): +# def create_knowledge_space(self, space: KnowledgeSpaceRequest): +# session = self.get_raw_session() +# knowledge_space = KnowledgeSpaceEntity( +# name=space.name, +# vector_type=CFG.VECTOR_STORE_TYPE, +# desc=space.desc, +# owner=space.owner, +# gmt_created=datetime.now(), +# gmt_modified=datetime.now(), +# ) +# session.add(knowledge_space) +# session.commit() +# session.close() +# +# def get_knowledge_space(self, query: KnowledgeSpaceEntity): +# session = self.get_raw_session() +# knowledge_spaces = session.query(KnowledgeSpaceEntity) +# if query.id is not None: +# knowledge_spaces = knowledge_spaces.filter( +# KnowledgeSpaceEntity.id == query.id +# ) +# if query.name is not None: +# knowledge_spaces = knowledge_spaces.filter( +# KnowledgeSpaceEntity.name == query.name +# ) +# if query.vector_type is not None: +# knowledge_spaces = knowledge_spaces.filter( +# KnowledgeSpaceEntity.vector_type == query.vector_type +# ) +# if query.desc is not None: +# knowledge_spaces = knowledge_spaces.filter( +# KnowledgeSpaceEntity.desc == query.desc +# ) +# if query.owner is not None: +# knowledge_spaces = knowledge_spaces.filter( +# KnowledgeSpaceEntity.owner == query.owner +# ) +# if query.gmt_created is not None: +# knowledge_spaces = knowledge_spaces.filter( +# KnowledgeSpaceEntity.gmt_created == query.gmt_created +# ) +# if query.gmt_modified is not None: +# knowledge_spaces = knowledge_spaces.filter( +# KnowledgeSpaceEntity.gmt_modified == query.gmt_modified +# ) +# +# knowledge_spaces = knowledge_spaces.order_by( +# KnowledgeSpaceEntity.gmt_created.desc() +# ) +# result = knowledge_spaces.all() +# session.close() +# return result +# +# def update_knowledge_space(self, space: KnowledgeSpaceEntity): +# session = self.get_raw_session() +# session.merge(space) +# session.commit() +# session.close() +# return True +# +# def delete_knowledge_space(self, space: KnowledgeSpaceEntity): +# session = self.get_raw_session() +# if space: +# session.delete(space) +# session.commit() +# session.close() diff --git a/dbgpt/app/openapi/api_v2.py b/dbgpt/app/openapi/api_v2.py new file mode 100644 index 000000000..794f2c62e --- /dev/null +++ b/dbgpt/app/openapi/api_v2.py @@ -0,0 +1,345 @@ +import json +import re +import time +import uuid +from typing import Optional + +from fastapi import APIRouter, Body, Depends, HTTPException +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from fastchat.protocol.api_protocol import ( + ChatCompletionResponse, + ChatCompletionResponseChoice, + ChatCompletionResponseStreamChoice, + ChatCompletionStreamResponse, + ChatMessage, + DeltaMessage, + UsageInfo, +) +from starlette.responses import StreamingResponse + +from dbgpt.app.openapi.api_v1.api_v1 import ( + CHAT_FACTORY, + __new_conversation, + get_chat_flow, + get_chat_instance, + get_executor, + stream_generator, +) +from dbgpt.app.scene import BaseChat, ChatScene +from dbgpt.client.schemas import ChatCompletionRequestBody +from dbgpt.component import logger +from dbgpt.core.awel import CommonLLMHttpRequestBody, CommonLLMHTTPRequestContext +from dbgpt.model.cluster.apiserver.api import APISettings +from dbgpt.serve.agent.agents.controller import multi_agents +from dbgpt.serve.flow.api.endpoints import get_service +from dbgpt.serve.flow.service.service import Service as FlowService +from dbgpt.util.executor_utils import blocking_func_to_async +from dbgpt.util.tracer import SpanType, root_tracer + +router = APIRouter() +api_settings = APISettings() +get_bearer_token = HTTPBearer(auto_error=False) + + +async def check_api_key( + auth: Optional[HTTPAuthorizationCredentials] = Depends(get_bearer_token), + service=Depends(get_service), +) -> Optional[str]: + """Check the api key + Args: + auth (Optional[HTTPAuthorizationCredentials]): The bearer token. + service (Service): The flow service. + """ + if service.config.api_keys: + api_keys = [key.strip() for key in service.config.api_keys.split(",")] + if auth is None or (token := auth.credentials) not in api_keys: + raise HTTPException( + status_code=401, + detail={ + "error": { + "message": "", + "type": "invalid_request_error", + "param": None, + "code": "invalid_api_key", + } + }, + ) + return token + else: + return None + + +@router.post("/v2/chat/completions", dependencies=[Depends(check_api_key)]) +async def chat_completions( + request: ChatCompletionRequestBody = Body(), + flow_service: FlowService = Depends(get_chat_flow), +): + """Chat V2 completions + Args: + request (ChatCompletionRequestBody): The chat request. + flow_service (FlowService): The flow service. + Raises: + HTTPException: If the request is invalid. + """ + logger.info( + f"chat_completions:{request.chat_mode},{request.chat_param},{request.model}" + ) + headers = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Transfer-Encoding": "chunked", + } + # check chat request + check_chat_request(request) + if request.conv_uid is None: + request.conv_uid = str(uuid.uuid4()) + if request.chat_mode == "chat_app": + if request.stream is False: + raise HTTPException( + status_code=400, + detail={ + "error": { + "message": "chat app now not support no stream", + "type": "invalid_request_error", + "param": None, + "code": "invalid_request_error", + } + }, + ) + return StreamingResponse( + chat_app_stream_wrapper( + request=request, + ), + headers=headers, + media_type="text/event-stream", + ) + elif request.chat_mode == ChatScene.ChatFlow.value(): + # flow_ctx = CommonLLMHTTPRequestContext( + # conv_uid=request.conv_uid, + # chat_mode=request.chat_mode, + # user_name=request.user_name, + # sys_code=request.sys_code, + # ) + # flow_req = CommonLLMHttpRequestBody( + # model=request.model, + # messages=request.chat_param, + # stream=True, + # context=flow_ctx, + # ) + return StreamingResponse( + chat_flow_stream_wrapper(request), + headers=headers, + media_type="text/event-stream", + ) + elif ( + request.chat_mode is None + or request.chat_mode == ChatScene.ChatKnowledge.value() + ): + with root_tracer.start_span( + "get_chat_instance", span_type=SpanType.CHAT, metadata=request.dict() + ): + chat: BaseChat = await get_chat_instance(request) + + if not request.stream: + return await no_stream_wrapper(request, chat) + else: + return StreamingResponse( + stream_generator(chat, request.incremental, request.model), + headers=headers, + media_type="text/plain", + ) + else: + raise HTTPException( + status_code=400, + detail={ + "error": { + "message": "chat mode now only support chat_normal, chat_app, chat_flow, chat_knowledge", + "type": "invalid_request_error", + "param": None, + "code": "invalid_chat_mode", + } + }, + ) + + +async def get_chat_instance(dialogue: ChatCompletionRequestBody = Body()) -> BaseChat: + """ + Get chat instance + Args: + dialogue (OpenAPIChatCompletionRequest): The chat request. + """ + logger.info(f"get_chat_instance:{dialogue}") + if not dialogue.chat_mode: + dialogue.chat_mode = ChatScene.ChatNormal.value() + if not dialogue.conv_uid: + conv_vo = __new_conversation( + dialogue.chat_mode, dialogue.user_name, dialogue.sys_code + ) + dialogue.conv_uid = conv_vo.conv_uid + + if not ChatScene.is_valid_mode(dialogue.chat_mode): + raise StopAsyncIteration(f"Unsupported Chat Mode,{dialogue.chat_mode}!") + + chat_param = { + "chat_session_id": dialogue.conv_uid, + "user_name": dialogue.user_name, + "sys_code": dialogue.sys_code, + "current_user_input": dialogue.messages, + "select_param": dialogue.chat_param, + "model_name": dialogue.model, + } + chat: BaseChat = await blocking_func_to_async( + get_executor(), + CHAT_FACTORY.get_implementation, + dialogue.chat_mode, + **{"chat_param": chat_param}, + ) + return chat + + +async def no_stream_wrapper( + request: ChatCompletionRequestBody, chat: BaseChat +) -> ChatCompletionResponse: + """ + no stream wrapper + Args: + request (OpenAPIChatCompletionRequest): request + chat (BaseChat): chat + """ + with root_tracer.start_span("no_stream_generator"): + response = await chat.nostream_call() + msg = response.replace("\ufffd", "") + choice_data = ChatCompletionResponseChoice( + index=0, + message=ChatMessage(role="assistant", content=msg), + ) + usage = UsageInfo() + return ChatCompletionResponse( + id=request.conv_uid, choices=[choice_data], model=request.model, usage=usage + ) + + +async def chat_app_stream_wrapper(request: ChatCompletionRequestBody = None): + """chat app stream + Args: + request (OpenAPIChatCompletionRequest): request + token (APIToken): token + """ + async for output in multi_agents.app_agent_chat( + conv_uid=request.conv_uid, + gpts_name=request.chat_param, + user_query=request.messages, + user_code=request.user_name, + sys_code=request.sys_code, + ): + match = re.search(r"data:\s*({.*})", output) + if match: + json_str = match.group(1) + vis = json.loads(json_str) + vis_content = vis.get("vis", None) + if vis_content != "[DONE]": + choice_data = ChatCompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(role="assistant", content=vis.get("vis", None)), + ) + chunk = ChatCompletionStreamResponse( + id=request.conv_uid, + choices=[choice_data], + model=request.model, + created=int(time.time()), + ) + content = ( + f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n" + ) + yield content + yield "data: [DONE]\n\n" + + +async def chat_flow_stream_wrapper( + request: ChatCompletionRequestBody = None, +): + """chat app stream + Args: + request (OpenAPIChatCompletionRequest): request + token (APIToken): token + """ + flow_service = get_chat_flow() + flow_ctx = CommonLLMHTTPRequestContext( + conv_uid=request.conv_uid, + chat_mode=request.chat_mode, + user_name=request.user_name, + sys_code=request.sys_code, + ) + flow_req = CommonLLMHttpRequestBody( + model=request.model, + messages=request.chat_param, + stream=True, + context=flow_ctx, + ) + async for output in flow_service.chat_flow(request.chat_param, flow_req): + if output.startswith("data: [DONE]"): + yield output + if output.startswith("data:"): + output = output[len("data: ") :] + choice_data = ChatCompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(role="assistant", content=output), + ) + chunk = ChatCompletionStreamResponse( + id=request.conv_uid, + choices=[choice_data], + model=request.model, + created=int(time.time()), + ) + chat_completion_response = ( + f"data: {chunk.json(exclude_unset=True, ensure_ascii=False)}\n\n" + ) + yield chat_completion_response + + +def check_chat_request(request: ChatCompletionRequestBody = Body()): + """ + Check the chat request + Args: + request (ChatCompletionRequestBody): The chat request. + Raises: + HTTPException: If the request is invalid. + """ + if request.chat_mode and request.chat_mode != ChatScene.ChatNormal.value(): + if request.chat_param is None: + raise HTTPException( + status_code=400, + detail={ + "error": { + "message": "chart param is None", + "type": "invalid_request_error", + "param": None, + "code": "invalid_chat_param", + } + }, + ) + if request.model is None: + raise HTTPException( + status_code=400, + detail={ + "error": { + "message": "model is None", + "type": "invalid_request_error", + "param": None, + "code": "invalid_model", + } + }, + ) + if request.messages is None: + raise HTTPException( + status_code=400, + detail={ + "error": { + "message": "messages is None", + "type": "invalid_request_error", + "param": None, + "code": "invalid_messages", + } + }, + ) diff --git a/dbgpt/client/__init__.py b/dbgpt/client/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbgpt/client/app.py b/dbgpt/client/app.py new file mode 100644 index 000000000..8dca987b3 --- /dev/null +++ b/dbgpt/client/app.py @@ -0,0 +1,18 @@ +from dbgpt.client.client import Client + + +async def get_app(client: Client, app_id: str): + """Get an app. + Args: + client (Client): The dbgpt client. + app_id (str): The app id. + """ + return await client.get("/apps/" + app_id) + + +async def list_app(client: Client): + """List apps. + Args: + client (Client): The dbgpt client. + """ + return await client.get("/apps") diff --git a/dbgpt/client/client.py b/dbgpt/client/client.py new file mode 100644 index 000000000..f8c0bbfab --- /dev/null +++ b/dbgpt/client/client.py @@ -0,0 +1,349 @@ +import json +from typing import Any, AsyncGenerator, List, Optional, Union +from urllib.parse import urlparse + +import httpx +from fastchat.protocol.api_protocol import ChatCompletionResponse + +from dbgpt.app.openapi.api_view_model import ChatCompletionStreamResponse +from dbgpt.client.schemas import ChatCompletionRequestBody + +CLIENT_API_PATH = "/api" +CLIENT_SERVE_PATH = "/serve" + + +class ClientException(Exception): + """ClientException is raised when an error occurs in the client.""" + + def __init__(self, status=None, reason=None, http_resp=None): + """ + Args: + status: Optional[int], the HTTP status code. + reason: Optional[str], the reason for the exception. + http_resp: Optional[httpx.Response], the HTTP response object. + """ + reason = json.loads(reason) + if http_resp: + self.status = http_resp.status_code + self.reason = http_resp.content + self.body = http_resp.content + self.headers = None + else: + self.status = status + self.reason = reason + self.body = None + self.headers = None + + def __str__(self): + """Custom error messages for exception""" + error_message = "({0})\n" "Reason: {1}\n".format(self.status, self.reason) + if self.headers: + error_message += "HTTP response headers: {0}\n".format(self.headers) + + if self.body: + error_message += "HTTP response body: {0}\n".format(self.body) + + return error_message + + +class Client(object): + def __init__( + self, + api_base: Optional[str] = "http://localhost:5000", + api_key: Optional[str] = None, + version: Optional[str] = "v2", + timeout: Optional[httpx._types.TimeoutTypes] = 120, + ): + """ + Args: + api_base: Optional[str], a full URL for the DB-GPT API. Defaults to the http://localhost:5000. + api_key: Optional[str], The dbgpt api key to use for authentication. Defaults to None. + timeout: Optional[httpx._types.TimeoutTypes]: The timeout to use. Defaults to None. + In most cases, pass in a float number to specify the timeout in seconds. + Returns: + None + Raise: ClientException + + Examples: + -------- + .. code-block:: python + + from dbgpt.client.client import Client + + DBGPT_API_BASE = "http://localhost:5000" + DBGPT_API_KEY = "dbgpt" + client = Client(api_base=DBGPT_API_BASE, api_key=DBGPT_API_KEY) + client.chat(model="chatgpt_proxyllm", messages="Hello?") + """ + if is_valid_url(api_base): + self._api_url = api_base.rstrip("/") + else: + raise ValueError(f"api url {api_base} does not exist or is not accessible.") + self._api_key = api_key + self._version = version + self._api_url = api_base + CLIENT_API_PATH + "/" + version + self._timeout = timeout + headers = {"Authorization": f"Bearer {self._api_key}"} if self._api_key else {} + self._http_client = httpx.AsyncClient( + headers=headers, timeout=timeout if timeout else httpx.Timeout(None) + ) + + async def chat( + self, + model: str, + messages: Union[str, List[str]], + temperature: Optional[float] = None, + max_new_tokens: Optional[int] = None, + chat_mode: Optional[str] = None, + chat_param: Optional[str] = None, + conv_uid: Optional[str] = None, + user_name: Optional[str] = None, + sys_code: Optional[str] = None, + span_id: Optional[str] = None, + incremental: bool = True, + enable_vis: bool = True, + ) -> ChatCompletionResponse: + """ + Chat Completion. + Args: + model: str, The model name. + messages: Union[str, List[str]], The user input messages. + temperature: Optional[float], What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. + max_new_tokens: Optional[int], The maximum number of tokens that can be generated in the chat completion. + chat_mode: Optional[str], The chat mode. + chat_param: Optional[str], The chat param of chat mode. + conv_uid: Optional[str], The conversation id of the model inference. + user_name: Optional[str], The user name of the model inference. + sys_code: Optional[str], The system code of the model inference. + span_id: Optional[str], The span id of the model inference. + incremental: bool, Used to control whether the content is returned incrementally or in full each time. If this parameter is not provided, the default is full return. + enable_vis: bool, Response content whether to output vis label. + Returns: + ChatCompletionResponse: The chat completion response. + Examples: + -------- + .. code-block:: python + + from dbgpt.client.client import Client + + DBGPT_API_BASE = "http://localhost:5000" + DBGPT_API_KEY = "dbgpt" + client = Client(api_base=DBGPT_API_BASE, api_key=DBGPT_API_KEY) + res = await client.chat(model="chatgpt_proxyllm", messages="Hello?") + """ + request = ChatCompletionRequestBody( + model=model, + messages=messages, + stream=False, + temperature=temperature, + max_new_tokens=max_new_tokens, + chat_mode=chat_mode, + chat_param=chat_param, + conv_uid=conv_uid, + user_name=user_name, + sys_code=sys_code, + span_id=span_id, + incremental=incremental, + enable_vis=enable_vis, + ) + response = await self._http_client.post( + self._api_url + "/chat/completions", json=request.dict() + ) + if response.status_code == 200: + json_data = json.loads(response.text) + chat_completion_response = ChatCompletionResponse(**json_data) + return chat_completion_response + else: + return json.loads(response.content) + + async def chat_stream( + self, + model: str, + messages: Union[str, List[str]], + temperature: Optional[float] = None, + max_new_tokens: Optional[int] = None, + chat_mode: Optional[str] = None, + chat_param: Optional[str] = None, + conv_uid: Optional[str] = None, + user_name: Optional[str] = None, + sys_code: Optional[str] = None, + span_id: Optional[str] = None, + incremental: bool = True, + enable_vis: bool = True, + ) -> AsyncGenerator[ChatCompletionStreamResponse, None]: + """ + Chat Stream Completion. + Args: + model: str, The model name. + messages: Union[str, List[str]], The user input messages. + temperature: Optional[float], What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. + max_new_tokens: Optional[int], The maximum number of tokens that can be generated in the chat completion. + chat_mode: Optional[str], The chat mode. + chat_param: Optional[str], The chat param of chat mode. + conv_uid: Optional[str], The conversation id of the model inference. + user_name: Optional[str], The user name of the model inference. + sys_code: Optional[str], The system code of the model inference. + span_id: Optional[str], The span id of the model inference. + incremental: bool, Used to control whether the content is returned incrementally or in full each time. If this parameter is not provided, the default is full return. + enable_vis: bool, Response content whether to output vis label. + Returns: + ChatCompletionStreamResponse: The chat completion response. + + Examples: + -------- + .. code-block:: python + + from dbgpt.client.client import Client + + DBGPT_API_BASE = "http://localhost:5000" + DBGPT_API_KEY = "dbgpt" + client = Client(api_base=DBGPT_API_BASE, api_key=DBGPT_API_KEY) + res = await client.chat_stream(model="chatgpt_proxyllm", messages="Hello?") + """ + request = ChatCompletionRequestBody( + model=model, + messages=messages, + stream=True, + temperature=temperature, + max_new_tokens=max_new_tokens, + chat_mode=chat_mode, + chat_param=chat_param, + conv_uid=conv_uid, + user_name=user_name, + sys_code=sys_code, + span_id=span_id, + incremental=incremental, + enable_vis=enable_vis, + ) + async with self._http_client.stream( + method="POST", + url=self._api_url + "/chat/completions", + json=request.dict(), + headers={}, + ) as response: + if response.status_code == 200: + async for line in response.aiter_lines(): + try: + if line == "data: [DONE]\n": + break + if line.startswith("data:"): + json_data = json.loads(line[len("data: ") :]) + chat_completion_response = ChatCompletionStreamResponse( + **json_data + ) + yield chat_completion_response + except Exception as e: + yield f"data:[SERVER_ERROR]{str(e)}\n\n" + + else: + try: + error = await response.aread() + yield json.loads(error) + except Exception as e: + yield f"data:[SERVER_ERROR]{str(e)}\n\n" + + async def get(self, path: str, *args): + """ + Get method. + Args: + path: str, The path to get. + args: Any, The arguments to pass to the get method. + """ + try: + response = await self._http_client.get( + self._api_url + CLIENT_SERVE_PATH + path, + *args, + ) + return response + finally: + await self._http_client.aclose() + + async def post(self, path: str, args): + """ + Post method. + Args: + path: str, The path to post. + args: Any, The arguments to pass to the post + """ + try: + return await self._http_client.post( + self._api_url + CLIENT_SERVE_PATH + path, + json=args, + ) + finally: + await self._http_client.aclose() + + async def post_param(self, path: str, args): + """ + Post method. + Args: + path: str, The path to post. + args: Any, The arguments to pass to the post + """ + try: + return await self._http_client.post( + self._api_url + CLIENT_SERVE_PATH + path, + params=args, + ) + finally: + await self._http_client.aclose() + + async def patch(self, path: str, *args): + """ + Patch method. + Args: + path: str, The path to patch. + args: Any, The arguments to pass to the patch. + """ + return self._http_client.patch(self._api_url + CLIENT_SERVE_PATH + path, *args) + + async def put(self, path: str, args): + """ + Put method. + Args: + path: str, The path to put. + args: Any, The arguments to pass to the put. + """ + try: + return await self._http_client.put( + self._api_url + CLIENT_SERVE_PATH + path, json=args + ) + finally: + await self._http_client.aclose() + + async def delete(self, path: str, *args): + """ + Delete method. + Args: + path: str, The path to delete. + args: Any, The arguments to pass to the delete. + """ + try: + return await self._http_client.delete( + self._api_url + CLIENT_SERVE_PATH + path, *args + ) + finally: + await self._http_client.aclose() + + async def head(self, path: str, *args): + """ + Head method. + Args: + path: str, The path to head. + args: Any, The arguments to pass to the head + """ + return self._http_client.head(self._api_url + path, *args) + + +def is_valid_url(api_url: Any) -> bool: + """ + Check if the given URL is valid. + Args: + api_url: Any, The URL to check. + Returns: + bool: True if the URL is valid, False otherwise. + """ + if not isinstance(api_url, str): + return False + parsed = urlparse(api_url) + return parsed.scheme != "" and parsed.netloc != "" diff --git a/dbgpt/client/flow.py b/dbgpt/client/flow.py new file mode 100644 index 000000000..7f0cd34d3 --- /dev/null +++ b/dbgpt/client/flow.py @@ -0,0 +1,49 @@ +from dbgpt.client.client import Client +from dbgpt.core.awel.flow.flow_factory import FlowPanel + + +async def create_flow(client: Client, flow: FlowPanel): + """Create a new flow. + Args: + client (Client): The dbgpt client. + flow (FlowPanel): The flow panel. + """ + return await client.get("/awel/flows", flow.dict()) + + +async def update_flow(client: Client, flow: FlowPanel): + """Update a flow. + Args: + client (Client): The dbgpt client. + flow (FlowPanel): The flow panel. + """ + return await client.put("/awel/flows", flow.dict()) + + +async def delete_flow(client: Client, flow_id: str): + """ + Delete a flow. + Args: + client (Client): The dbgpt client. + flow_id (str): The flow id. + """ + return await client.get("/awel/flows/" + flow_id) + + +async def get_flow(client: Client, flow_id: str): + """ + Get a flow. + Args: + client (Client): The dbgpt client. + flow_id (str): The flow id. + """ + return await client.get("/awel/flows/" + flow_id) + + +async def list_flow(client: Client): + """ + List flows. + Args: + client (Client): The dbgpt client. + """ + return await client.get("/awel/flows") diff --git a/dbgpt/client/knowledge.py b/dbgpt/client/knowledge.py new file mode 100644 index 000000000..7adf40192 --- /dev/null +++ b/dbgpt/client/knowledge.py @@ -0,0 +1,93 @@ +import json + +from dbgpt.client.client import Client +from dbgpt.client.schemas import DocumentModel, SpaceModel, SyncModel + + +async def create_space(client: Client, app_model: SpaceModel): + """Create a new space. + Args: + client (Client): The dbgpt client. + app_model (SpaceModel): The app model. + """ + return await client.post("/knowledge/spaces", app_model.dict()) + + +async def update_space(client: Client, app_model: SpaceModel): + """Update a document. + Args: + client (Client): The dbgpt client. + app_model (SpaceModel): The app model. + """ + return await client.put("/knowledge/spaces", app_model.dict()) + + +async def delete_space(client: Client, space_id: str): + """Delete a space. + Args: + client (Client): The dbgpt client. + app_id (str): The app id. + """ + return await client.delete("/knowledge/spaces/" + space_id) + + +async def get_space(client: Client, space_id: str): + """Get a document. + Args: + client (Client): The dbgpt client. + app_id (str): The app id. + """ + return await client.get("/knowledge/spaces/" + space_id) + + +async def list_space(client: Client): + """List apps. + Args: + client (Client): The dbgpt client. + """ + return await client.get("/knowledge/spaces") + + +async def create_document(client: Client, doc_model: DocumentModel): + """Create a new space. + Args: + client (Client): The dbgpt client. + doc_model (SpaceModel): The document model. + """ + return await client.post_param("/knowledge/documents", doc_model.dict()) + + +async def delete_document(client: Client, document_id: str): + """Delete a document. + Args: + client (Client): The dbgpt client. + app_id (str): The app id. + """ + return await client.delete("/knowledge/documents/" + document_id) + + +async def get_document(client: Client, document_id: str): + """Get a document. + Args: + client (Client): The dbgpt client. + app_id (str): The app id. + """ + return await client.get("/knowledge/documents/" + document_id) + + +async def list_document(client: Client): + """List documents. + Args: + client (Client): The dbgpt client. + """ + return await client.get("/knowledge/documents") + + +async def sync_document(client: Client, sync_model: SyncModel): + """sync document. + Args: + client (Client): The dbgpt client. + """ + return await client.post( + "/knowledge/documents/sync", [json.loads(sync_model.json())] + ) diff --git a/dbgpt/client/schemas.py b/dbgpt/client/schemas.py new file mode 100644 index 000000000..1b97c5d1f --- /dev/null +++ b/dbgpt/client/schemas.py @@ -0,0 +1,206 @@ +from datetime import datetime +from typing import Dict, List, Optional, Union + +from fastapi import File, UploadFile +from pydantic import BaseModel, Field + +from dbgpt.agent.resource.resource_api import AgentResource +from dbgpt.rag.chunk_manager import ChunkParameters + + +class ChatCompletionRequestBody(BaseModel): + """ChatCompletion LLM http request body.""" + + model: str = Field( + ..., description="The model name", examples=["gpt-3.5-turbo", "proxyllm"] + ) + messages: Union[str, List[str]] = Field( + ..., description="User input messages", examples=["Hello", "How are you?"] + ) + stream: bool = Field(default=False, description="Whether return stream") + + temperature: Optional[float] = Field( + default=None, + description="What sampling temperature to use, between 0 and 2. Higher values " + "like 0.8 will make the output more random, while lower values like 0.2 will " + "make it more focused and deterministic.", + ) + max_new_tokens: Optional[int] = Field( + default=None, + description="The maximum number of tokens that can be generated in the chat " + "completion.", + ) + conv_uid: Optional[str] = Field( + default=None, description="The conversation id of the model inference" + ) + span_id: Optional[str] = Field( + default=None, description="The span id of the model inference" + ) + chat_mode: Optional[str] = Field( + default="chat_normal", + description="The chat mode", + examples=["chat_awel_flow", "chat_normal"], + ) + chat_param: Optional[str] = Field( + default=None, + description="The chat param of chat mode", + ) + user_name: Optional[str] = Field( + default=None, description="The user name of the model inference" + ) + sys_code: Optional[str] = Field( + default=None, description="The system code of the model inference" + ) + incremental: bool = Field( + default=True, + description="Used to control whether the content is returned incrementally or in full each time. If this parameter is not provided, the default is full return.", + ) + enable_vis: str = Field( + default=True, description="response content whether to output vis label" + ) + + +class SpaceModel(BaseModel): + """name: knowledge space name""" + + """vector_type: vector type""" + id: int = Field(None, description="The space id") + name: str = Field(None, description="The space name") + """vector_type: vector type""" + vector_type: str = Field(None, description="The vector type") + """desc: description""" + desc: str = Field(None, description="The description") + """owner: owner""" + owner: str = Field(None, description="The owner") + + +class AppDetailModel(BaseModel): + app_code: Optional[str] = Field(None, title="app code") + app_name: Optional[str] = Field(None, title="app name") + agent_name: Optional[str] = Field(None, title="agent name") + node_id: Optional[str] = Field(None, title="node id") + resources: Optional[list[AgentResource]] = Field(None, title="resources") + prompt_template: Optional[str] = Field(None, title="prompt template") + llm_strategy: Optional[str] = Field(None, title="llm strategy") + llm_strategy_value: Optional[str] = Field(None, title="llm strategy value") + created_at: datetime = datetime.now() + updated_at: datetime = datetime.now() + + +class AwelTeamModel(BaseModel): + dag_id: str = Field( + ..., + description="The unique id of dag", + examples=["flow_dag_testflow_66d8e9d6-f32e-4540-a5bd-ea0648145d0e"], + ) + uid: str = Field( + default=None, + description="The unique id of flow", + examples=["66d8e9d6-f32e-4540-a5bd-ea0648145d0e"], + ) + name: Optional[str] = Field( + default=None, + description="The name of dag", + ) + label: Optional[str] = Field( + default=None, + description="The label of dag", + ) + version: Optional[str] = Field( + default=None, + description="The version of dag", + ) + description: Optional[str] = Field( + default=None, + description="The description of dag", + ) + editable: bool = Field( + default=False, + description="is the dag is editable", + examples=[True, False], + ) + state: Optional[str] = Field( + default=None, + description="The state of dag", + ) + user_name: Optional[str] = Field( + default=None, + description="The owner of current dag", + ) + sys_code: Optional[str] = Field( + default=None, + description="The system code of current dag", + ) + flow_category: Optional[str] = Field( + default="common", + description="The flow category of current dag", + ) + + +class AppModel(BaseModel): + app_code: Optional[str] = Field(None, title="app code") + app_name: Optional[str] = Field(None, title="app name") + app_describe: Optional[str] = Field(None, title="app describe") + team_mode: Optional[str] = Field(None, title="team mode") + language: Optional[str] = Field("en", title="language") + team_context: Optional[Union[str, AwelTeamModel]] = Field( + None, title="team context" + ) + user_code: Optional[str] = Field(None, title="user code") + sys_code: Optional[str] = Field(None, title="sys code") + is_collected: Optional[str] = Field(None, title="is collected") + icon: Optional[str] = Field(None, title="icon") + created_at: datetime = datetime.now() + updated_at: datetime = datetime.now() + details: List[AppDetailModel] = Field([], title="app details") + + +class SpaceModel(BaseModel): + name: str = Field( + default=None, + description="knowledge space name", + ) + vector_type: str = Field( + default=None, + description="vector type", + ) + desc: str = Field( + default=None, + description="space description", + ) + owner: str = Field( + default=None, + description="space owner", + ) + + +class DocumentModel(BaseModel): + id: int = Field(None, description="The doc id") + doc_name: str = Field(None, description="doc name") + """doc_type: document type""" + doc_type: str = Field(None, description="The doc type") + """content: description""" + content: str = Field(None, description="content") + """doc file""" + doc_file: UploadFile = Field(File(None), description="doc file") + """doc_source: doc source""" + doc_source: str = Field(None, description="doc source") + """doc_source: doc source""" + space_id: str = Field(None, description="space_id") + + +class SyncModel(BaseModel): + """Sync model""" + + """doc_id: doc id""" + doc_id: str = Field(None, description="The doc id") + + """space id""" + space_id: str = Field(None, description="The space id") + + """model_name: model name""" + model_name: Optional[str] = Field(None, description="model name") + + """chunk_parameters: chunk parameters + """ + chunk_parameters: ChunkParameters = Field(None, description="chunk parameters") diff --git a/dbgpt/client/tests/__init__.py b/dbgpt/client/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbgpt/serve/agent/app/endpoints.py b/dbgpt/serve/agent/app/endpoints.py new file mode 100644 index 000000000..8fb7934b8 --- /dev/null +++ b/dbgpt/serve/agent/app/endpoints.py @@ -0,0 +1,65 @@ +from typing import Optional + +from fastapi import APIRouter, Query + +from dbgpt.serve.agent.db.gpts_app import ( + GptsApp, + GptsAppCollectionDao, + GptsAppDao, + GptsAppQuery, +) +from dbgpt.serve.core import Result + +router = APIRouter() +gpts_dao = GptsAppDao() +collection_dao = GptsAppCollectionDao() + + +@router.get("/v2/serve/apps") +async def app_list( + user_name: Optional[str] = Query(default=None, description="user name"), + sys_code: Optional[str] = Query(default=None, description="system code"), + is_collected: Optional[str] = Query(default=None, description="system code"), + page: int = Query(default=1, description="current page"), + page_size: int = Query(default=20, description="page size"), +): + try: + query = GptsAppQuery( + page_no=page, page_size=page_size, is_collected=is_collected + ) + return Result.succ(gpts_dao.app_list(query, True)) + except Exception as ex: + return Result.failed(err_code="E000X", msg=f"query app error: {ex}") + + +@router.get("/v2/serve/apps/{app_id}") +async def app_detail(app_id: str): + try: + return Result.succ(gpts_dao.app_detail(app_id)) + except Exception as ex: + return Result.failed(err_code="E000X", msg=f"query app error: {ex}") + + +@router.put("/v2/serve/apps/{app_id}") +async def app_update(app_id: str, gpts_app: GptsApp): + try: + return Result.succ(gpts_dao.edit(gpts_app)) + except Exception as ex: + return Result.failed(err_code="E000X", msg=f"edit app error: {ex}") + + +@router.post("/v2/serve/apps") +async def app_create(gpts_app: GptsApp): + try: + return Result.succ(gpts_dao.create(gpts_app)) + except Exception as ex: + return Result.failed(err_code="E000X", msg=f"edit app error: {ex}") + + +@router.delete("/v2/serve/apps/{app_id}") +async def app_delete(app_id: str, user_code: Optional[str], sys_code: Optional[str]): + try: + gpts_dao.delete(app_id, user_code, sys_code) + return Result.succ([]) + except Exception as ex: + return Result.failed(err_code="E000X", msg=f"delete app error: {ex}") diff --git a/dbgpt/serve/conversation/api/endpoints.py b/dbgpt/serve/conversation/api/endpoints.py index 59b155aaf..00b7f08cf 100644 --- a/dbgpt/serve/conversation/api/endpoints.py +++ b/dbgpt/serve/conversation/api/endpoints.py @@ -2,7 +2,7 @@ import uuid from functools import cache from typing import List, Optional -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query, Request from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer from dbgpt.component import SystemApp @@ -45,6 +45,7 @@ def _parse_api_keys(api_keys: str) -> List[str]: async def check_api_key( auth: Optional[HTTPAuthorizationCredentials] = Depends(get_bearer_token), + request: Request = None, service: Service = Depends(get_service), ) -> Optional[str]: """Check the api key @@ -63,6 +64,9 @@ async def check_api_key( assert res.status_code == 200 """ + if request.url.path.startswith(f"/api/v1"): + return None + if service.config.api_keys: api_keys = _parse_api_keys(service.config.api_keys) if auth is None or (token := auth.credentials) not in api_keys: diff --git a/dbgpt/serve/core/config.py b/dbgpt/serve/core/config.py index 9b78c44a2..306c778b3 100644 --- a/dbgpt/serve/core/config.py +++ b/dbgpt/serve/core/config.py @@ -16,7 +16,16 @@ class BaseServeConfig(BaseParameters): config (AppConfig): Application configuration config_prefix (str): Configuration prefix """ + global_prefix = "dbgpt.app.global." + global_dict = config.get_all_by_prefix(global_prefix) config_dict = config.get_all_by_prefix(config_prefix) # remove prefix - config_dict = {k[len(config_prefix) :]: v for k, v in config_dict.items()} + config_dict = { + k[len(config_prefix) :]: v + for k, v in config_dict.items() + if k.startswith(config_prefix) + } + for k, v in global_dict.items(): + if k not in config_dict and k[len(global_prefix) :] in cls().__dict__: + config_dict[k[len(global_prefix) :]] = v return cls(**config_dict) diff --git a/dbgpt/serve/core/schemas.py b/dbgpt/serve/core/schemas.py index 57d85503e..5add79f30 100644 --- a/dbgpt/serve/core/schemas.py +++ b/dbgpt/serve/core/schemas.py @@ -69,11 +69,11 @@ async def validation_exception_handler( async def http_exception_handler(request: Request, exc: HTTPException): res = Result.failed( - msg=exc.detail, - err_code="E0002", + msg=str(exc.detail), + err_code=str(exc.status_code), ) logger.error(f"http_exception_handler catch HTTPException: {res}") - return JSONResponse(status_code=400, content=res.dict()) + return JSONResponse(status_code=exc.status_code, content=res.dict()) async def common_exception_handler(request: Request, exc: Exception) -> JSONResponse: diff --git a/dbgpt/serve/core/serve.py b/dbgpt/serve/core/serve.py index ff0203b65..2280a2136 100644 --- a/dbgpt/serve/core/serve.py +++ b/dbgpt/serve/core/serve.py @@ -18,7 +18,7 @@ class BaseServe(BaseComponent, ABC): def __init__( self, system_app: SystemApp, - api_prefix: str, + api_prefix: str | List[str], api_tags: List[str], db_url_or_db: Union[str, URL, DatabaseManager] = None, try_create_tables: Optional[bool] = False, diff --git a/dbgpt/serve/flow/api/endpoints.py b/dbgpt/serve/flow/api/endpoints.py index 8082f4984..2b2d8b723 100644 --- a/dbgpt/serve/flow/api/endpoints.py +++ b/dbgpt/serve/flow/api/endpoints.py @@ -1,7 +1,7 @@ from functools import cache from typing import List, Optional, Union -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query, Request from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer from dbgpt.component import SystemApp @@ -9,7 +9,7 @@ from dbgpt.core.awel.flow import ResourceMetadata, ViewMetadata from dbgpt.serve.core import Result from dbgpt.util import PaginationResult -from ..config import APP_NAME, SERVE_APP_NAME, SERVE_SERVICE_COMPONENT_NAME, ServeConfig +from ..config import APP_NAME, SERVE_SERVICE_COMPONENT_NAME, ServeConfig from ..service.service import Service from .schemas import ServeRequest, ServerResponse @@ -45,6 +45,7 @@ def _parse_api_keys(api_keys: str) -> List[str]: async def check_api_key( auth: Optional[HTTPAuthorizationCredentials] = Depends(get_bearer_token), + request: Request = None, service: Service = Depends(get_service), ) -> Optional[str]: """Check the api key @@ -63,6 +64,10 @@ async def check_api_key( assert res.status_code == 200 """ + if request.url.path.startswith(f"/api/v1"): + return None + + # for api_version in serve.serve_versions(): if service.config.api_keys: api_keys = _parse_api_keys(service.config.api_keys) if auth is None or (token := auth.credentials) not in api_keys: diff --git a/dbgpt/serve/flow/serve.py b/dbgpt/serve/flow/serve.py index e661928e2..126841e57 100644 --- a/dbgpt/serve/flow/serve.py +++ b/dbgpt/serve/flow/serve.py @@ -27,11 +27,13 @@ class Serve(BaseServe): def __init__( self, system_app: SystemApp, - api_prefix: Optional[str] = f"/api/v1/serve/awel", + api_prefix: Optional[List[str]] = None, api_tags: Optional[List[str]] = None, db_url_or_db: Union[str, URL, DatabaseManager] = None, try_create_tables: Optional[bool] = False, ): + if api_prefix is None: + api_prefix = [f"/api/v1/serve/awel", "/api/v2/serve/awel"] if api_tags is None: api_tags = [SERVE_APP_NAME_HUMP] super().__init__( @@ -43,9 +45,10 @@ class Serve(BaseServe): if self._app_has_initiated: return self._system_app = system_app - self._system_app.app.include_router( - router, prefix=self._api_prefix, tags=self._api_tags - ) + for prefix in self._api_prefix: + self._system_app.app.include_router( + router, prefix=prefix, tags=self._api_tags + ) init_endpoints(self._system_app) self._app_has_initiated = True diff --git a/dbgpt/serve/prompt/api/endpoints.py b/dbgpt/serve/prompt/api/endpoints.py index f78d0843c..2da890eb8 100644 --- a/dbgpt/serve/prompt/api/endpoints.py +++ b/dbgpt/serve/prompt/api/endpoints.py @@ -1,7 +1,7 @@ from functools import cache from typing import List, Optional -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query, Request from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer from dbgpt.component import SystemApp @@ -44,6 +44,7 @@ def _parse_api_keys(api_keys: str) -> List[str]: async def check_api_key( auth: Optional[HTTPAuthorizationCredentials] = Depends(get_bearer_token), + request: Request = None, service: Service = Depends(get_service), ) -> Optional[str]: """Check the api key @@ -62,6 +63,9 @@ async def check_api_key( assert res.status_code == 200 """ + if request.url.path.startswith(f"/api/v1"): + return None + if service.config.api_keys: api_keys = _parse_api_keys(service.config.api_keys) if auth is None or (token := auth.credentials) not in api_keys: diff --git a/dbgpt/serve/rag/api/endpoints.py b/dbgpt/serve/rag/api/endpoints.py new file mode 100644 index 000000000..392e70ab9 --- /dev/null +++ b/dbgpt/serve/rag/api/endpoints.py @@ -0,0 +1,300 @@ +from functools import cache +from typing import List, Optional + +from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile +from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer + +from dbgpt.component import SystemApp +from dbgpt.serve.core import Result +from dbgpt.serve.rag.api.schemas import ( + DocumentServeRequest, + DocumentServeResponse, + KnowledgeSyncRequest, + SpaceServeRequest, + SpaceServeResponse, +) +from dbgpt.serve.rag.config import SERVE_SERVICE_COMPONENT_NAME +from dbgpt.serve.rag.service.service import Service +from dbgpt.util import PaginationResult + +router = APIRouter() + +# Add your API endpoints here + +global_system_app: Optional[SystemApp] = None + + +def get_service() -> Service: + """Get the service instance""" + return global_system_app.get_component(SERVE_SERVICE_COMPONENT_NAME, Service) + + +get_bearer_token = HTTPBearer(auto_error=False) + + +@cache +def _parse_api_keys(api_keys: str) -> List[str]: + """Parse the string api keys to a list + + Args: + api_keys (str): The string api keys + + Returns: + List[str]: The list of api keys + """ + if not api_keys: + return [] + return [key.strip() for key in api_keys.split(",")] + + +async def check_api_key( + auth: Optional[HTTPAuthorizationCredentials] = Depends(get_bearer_token), + service: Service = Depends(get_service), +) -> Optional[str]: + """Check the api key + + If the api key is not set, allow all. + + Your can pass the token in you request header like this: + + .. code-block:: python + + import requests + + client_api_key = "your_api_key" + headers = {"Authorization": "Bearer " + client_api_key} + res = requests.get("http://test/hello", headers=headers) + assert res.status_code == 200 + + """ + if service.config.api_keys: + api_keys = _parse_api_keys(service.config.api_keys) + if auth is None or (token := auth.credentials) not in api_keys: + raise HTTPException( + status_code=401, + detail={ + "error": { + "message": "", + "type": "invalid_request_error", + "param": None, + "code": "invalid_api_key", + } + }, + ) + return token + else: + # api_keys not set; allow all + return None + + +@router.get("/health", dependencies=[Depends(check_api_key)]) +async def health(): + """Health check endpoint""" + return {"status": "ok"} + + +@router.get("/test_auth", dependencies=[Depends(check_api_key)]) +async def test_auth(): + """Test auth endpoint""" + return {"status": "ok"} + + +@router.post("/spaces", dependencies=[Depends(check_api_key)]) +async def create( + request: SpaceServeRequest, service: Service = Depends(get_service) +) -> Result: + """Create a new Space entity + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + ServerResponse: The response + """ + return Result.succ(service.create_space(request)) + + +@router.put("/spaces", dependencies=[Depends(check_api_key)]) +async def update( + request: SpaceServeRequest, service: Service = Depends(get_service) +) -> Result: + """Update a Space entity + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + ServerResponse: The response + """ + return Result.succ(service.update_space(request)) + + +@router.delete( + "/spaces/{space_id}", + response_model=Result[None], + dependencies=[Depends(check_api_key)], +) +async def delete( + space_id: str, service: Service = Depends(get_service) +) -> Result[None]: + """Delete a Space entity + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + ServerResponse: The response + """ + return Result.succ(service.delete(space_id)) + + +@router.get( + "/spaces/{space_id}", + dependencies=[Depends(check_api_key)], + response_model=Result[List], +) +async def query( + space_id: str, service: Service = Depends(get_service) +) -> Result[List[SpaceServeResponse]]: + """Query Space entities + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + List[ServeResponse]: The response + """ + request = {"id": space_id} + return Result.succ(service.get(request)) + + +@router.get( + "/spaces", + dependencies=[Depends(check_api_key)], + response_model=Result[PaginationResult[SpaceServeResponse]], +) +async def query_page( + page: int = Query(default=1, description="current page"), + page_size: int = Query(default=20, description="page size"), + service: Service = Depends(get_service), +) -> Result[PaginationResult[SpaceServeResponse]]: + """Query Space entities + + Args: + page (int): The page number + page_size (int): The page size + service (Service): The service + Returns: + ServerResponse: The response + """ + return Result.succ(service.get_list_by_page({}, page, page_size)) + + +@router.post("/documents", dependencies=[Depends(check_api_key)]) +async def create_document( + doc_name: str = Form(...), + doc_type: str = Form(...), + space_id: str = Form(...), + content: Optional[str] = Form(None), + doc_file: Optional[UploadFile] = File(None), + service: Service = Depends(get_service), +) -> Result: + """Create a new Document entity + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + ServerResponse: The response + """ + request = DocumentServeRequest( + doc_name=doc_name, + doc_type=doc_type, + content=content, + doc_file=doc_file, + space_id=space_id, + ) + return Result.succ(await service.create_document(request)) + + +@router.get( + "/documents/{document_id}", + dependencies=[Depends(check_api_key)], + response_model=Result[List], +) +async def query( + document_id: str, service: Service = Depends(get_service) +) -> Result[List[SpaceServeResponse]]: + """Query Space entities + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + List[ServeResponse]: The response + """ + request = {"id": document_id} + return Result.succ(service.get_document(request)) + + +@router.get( + "/documents", + dependencies=[Depends(check_api_key)], + response_model=Result[PaginationResult[SpaceServeResponse]], +) +async def query_page( + page: int = Query(default=1, description="current page"), + page_size: int = Query(default=20, description="page size"), + service: Service = Depends(get_service), +) -> Result[PaginationResult[DocumentServeResponse]]: + """Query Space entities + + Args: + page (int): The page number + page_size (int): The page size + service (Service): The service + Returns: + ServerResponse: The response + """ + return Result.succ(service.get_document_list({}, page, page_size)) + + +@router.post("/documents/sync", dependencies=[Depends(check_api_key)]) +async def sync_documents( + requests: List[KnowledgeSyncRequest], service: Service = Depends(get_service) +) -> Result: + """Create a new Document entity + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + ServerResponse: The response + """ + return Result.succ(service.sync_document(requests)) + + +@router.delete( + "/documents/{document_id}", + dependencies=[Depends(check_api_key)], + response_model=Result[None], +) +async def delete_document( + document_id: str, service: Service = Depends(get_service) +) -> Result[None]: + """Delete a Space entity + + Args: + request (SpaceServeRequest): The request + service (Service): The service + Returns: + ServerResponse: The response + """ + return Result.succ(service.delete_document(document_id)) + + +def init_endpoints(system_app: SystemApp) -> None: + """Initialize the endpoints""" + global global_system_app + system_app.register(Service) + global_system_app = system_app diff --git a/dbgpt/serve/rag/api/schemas.py b/dbgpt/serve/rag/api/schemas.py new file mode 100644 index 000000000..0ffb986d6 --- /dev/null +++ b/dbgpt/serve/rag/api/schemas.py @@ -0,0 +1,93 @@ +from typing import Optional + +from fastapi import File, UploadFile +from pydantic import BaseModel, Field + +from dbgpt.rag.chunk_manager import ChunkParameters + +from ..config import SERVE_APP_NAME_HUMP + + +class SpaceServeRequest(BaseModel): + """name: knowledge space name""" + + """vector_type: vector type""" + id: Optional[int] = Field(None, description="The space id") + name: str = Field(None, description="The space name") + """vector_type: vector type""" + vector_type: str = Field(None, description="The vector type") + """desc: description""" + desc: str = Field(None, description="The description") + """owner: owner""" + owner: str = Field(None, description="The owner") + + +class DocumentServeRequest(BaseModel): + id: int = Field(None, description="The doc id") + doc_name: str = Field(None, description="doc name") + """doc_type: document type""" + doc_type: str = Field(None, description="The doc type") + """content: description""" + content: str = Field(None, description="content") + """doc file""" + doc_file: UploadFile = File(...) + """doc_source: doc source""" + doc_source: str = None + """doc_source: doc source""" + space_id: str = None + + +class DocumentServeResponse(BaseModel): + id: int = Field(None, description="The doc id") + doc_name: str = Field(None, description="doc type") + """vector_type: vector type""" + doc_type: str = Field(None, description="The doc content") + """desc: description""" + content: str = Field(None, description="content") + """vector ids""" + vector_ids: str = Field(None, description="vector ids") + """doc_source: doc source""" + doc_source: str = None + """doc_source: doc source""" + space: str = None + + +class KnowledgeSyncRequest(BaseModel): + """Sync request""" + + """doc_ids: doc ids""" + doc_id: int = Field(None, description="The doc id") + + """space id""" + space_id: str = Field(None, description="space id") + + """model_name: model name""" + model_name: Optional[str] = Field(None, description="model name") + + """chunk_parameters: chunk parameters + """ + chunk_parameters: ChunkParameters = Field(None, description="chunk parameters") + + +class SpaceServeResponse(BaseModel): + """Flow response model""" + + """name: knowledge space name""" + + """vector_type: vector type""" + id: int = Field(None, description="The space id") + name: str = Field(None, description="The space name") + """vector_type: vector type""" + vector_type: str = Field(None, description="The vector type") + """desc: description""" + desc: str = Field(None, description="The description") + """context: argument context""" + context: str = Field(None, description="The context") + """owner: owner""" + owner: str = Field(None, description="The owner") + """sys code""" + sys_code: str = Field(None, description="The sys code") + + # TODO define your own fields here + class Config: + title = f"ServerResponse for {SERVE_APP_NAME_HUMP}" diff --git a/dbgpt/serve/rag/config.py b/dbgpt/serve/rag/config.py new file mode 100644 index 000000000..c0f10ab23 --- /dev/null +++ b/dbgpt/serve/rag/config.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass, field +from typing import Optional + +from dbgpt.serve.core import BaseServeConfig + +APP_NAME = "rag" +SERVE_APP_NAME = "dbgpt_rag" +SERVE_APP_NAME_HUMP = "dbgpt_rag" +SERVE_CONFIG_KEY_PREFIX = "dbgpt_rag" +SERVE_SERVICE_COMPONENT_NAME = f"{SERVE_APP_NAME}_service" + + +@dataclass +class ServeConfig(BaseServeConfig): + """Parameters for the serve command""" + + api_keys: Optional[str] = field( + default=None, metadata={"help": "API keys for the endpoint, if None, allow all"} + ) + + default_user: Optional[str] = field( + default=None, + metadata={"help": "Default user name for prompt"}, + ) + default_sys_code: Optional[str] = field( + default=None, + metadata={"help": "Default system code for prompt"}, + ) diff --git a/dbgpt/serve/rag/dependencies.py b/dbgpt/serve/rag/dependencies.py new file mode 100644 index 000000000..8598ecd97 --- /dev/null +++ b/dbgpt/serve/rag/dependencies.py @@ -0,0 +1 @@ +# Define your dependencies here diff --git a/dbgpt/serve/rag/serve.py b/dbgpt/serve/rag/serve.py new file mode 100644 index 000000000..305364614 --- /dev/null +++ b/dbgpt/serve/rag/serve.py @@ -0,0 +1,62 @@ +import logging +from typing import List, Optional, Union + +from sqlalchemy import URL + +from dbgpt.component import SystemApp +from dbgpt.serve.core import BaseServe +from dbgpt.storage.metadata import DatabaseManager + +from .api.endpoints import init_endpoints, router +from .config import ( + APP_NAME, + SERVE_APP_NAME, + SERVE_APP_NAME_HUMP, + SERVE_CONFIG_KEY_PREFIX, +) + +logger = logging.getLogger(__name__) + + +class Serve(BaseServe): + """Serve component for DB-GPT""" + + name = SERVE_APP_NAME + + def __init__( + self, + system_app: SystemApp, + api_prefix: Optional[str] = f"/api/v2/serve/knowledge", + api_tags: Optional[List[str]] = None, + db_url_or_db: Union[str, URL, DatabaseManager] = None, + try_create_tables: Optional[bool] = False, + ): + if api_tags is None: + api_tags = [SERVE_APP_NAME_HUMP] + super().__init__( + system_app, api_prefix, api_tags, db_url_or_db, try_create_tables + ) + self._db_manager: Optional[DatabaseManager] = None + + def init_app(self, system_app: SystemApp): + if self._app_has_initiated: + return + self._system_app = system_app + self._system_app.app.include_router( + router, prefix=self._api_prefix, tags=self._api_tags + ) + init_endpoints(self._system_app) + self._app_has_initiated = True + + def on_init(self): + """Called when init the application. + + You can do some initialization here. You can't get other components here because they may be not initialized yet + """ + # import your own module here to ensure the module is loaded before the application starts + from .models.models import KnowledgeSpaceEntity + + def before_start(self): + """Called before the start of the application.""" + # TODO: Your code here + self._db_manager = self.create_or_get_db_manager() diff --git a/dbgpt/serve/rag/service/__init__.py b/dbgpt/serve/rag/service/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbgpt/serve/rag/service/service.py b/dbgpt/serve/rag/service/service.py new file mode 100644 index 000000000..9959a0e83 --- /dev/null +++ b/dbgpt/serve/rag/service/service.py @@ -0,0 +1,522 @@ +import json +import logging +import os +import shutil +import tempfile +from datetime import datetime +from enum import Enum +from typing import List, Optional + +from fastapi import HTTPException + +from dbgpt._private.config import Config +from dbgpt.app.knowledge.chunk_db import DocumentChunkDao, DocumentChunkEntity +from dbgpt.app.knowledge.document_db import ( + KnowledgeDocumentDao, + KnowledgeDocumentEntity, +) +from dbgpt.app.knowledge.request.request import KnowledgeSpaceRequest +from dbgpt.component import ComponentType, SystemApp +from dbgpt.configs.model_config import ( + EMBEDDING_MODEL_CONFIG, + KNOWLEDGE_UPLOAD_ROOT_PATH, +) +from dbgpt.core.awel.dag.dag_manager import DAGManager +from dbgpt.rag.chunk import Chunk +from dbgpt.rag.chunk_manager import ChunkParameters +from dbgpt.rag.embedding import EmbeddingFactory +from dbgpt.rag.knowledge import ChunkStrategy, KnowledgeFactory, KnowledgeType +from dbgpt.serve.core import BaseService +from dbgpt.storage.metadata import BaseDao +from dbgpt.storage.metadata._base_dao import QUERY_SPEC +from dbgpt.storage.vector_store.base import VectorStoreConfig +from dbgpt.storage.vector_store.connector import VectorStoreConnector +from dbgpt.util.dbgpts.loader import DBGPTsLoader +from dbgpt.util.executor_utils import ExecutorFactory +from dbgpt.util.pagination_utils import PaginationResult +from dbgpt.util.tracer import root_tracer, trace + +from ..api.schemas import ( + DocumentServeRequest, + DocumentServeResponse, + KnowledgeSyncRequest, + SpaceServeRequest, + SpaceServeResponse, +) +from ..assembler.embedding import EmbeddingAssembler +from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig +from ..models.models import KnowledgeSpaceDao, KnowledgeSpaceEntity + +logger = logging.getLogger(__name__) +CFG = Config() + + +class SyncStatus(Enum): + TODO = "TODO" + FAILED = "FAILED" + RUNNING = "RUNNING" + FINISHED = "FINISHED" + + +class Service(BaseService[KnowledgeSpaceEntity, SpaceServeRequest, SpaceServeResponse]): + """The service class for Flow""" + + name = SERVE_SERVICE_COMPONENT_NAME + + def __init__( + self, + system_app: SystemApp, + dao: Optional[KnowledgeSpaceDao] = None, + document_dao: Optional[KnowledgeDocumentDao] = None, + chunk_dao: Optional[DocumentChunkDao] = None, + ): + self._system_app = None + self._dao: KnowledgeSpaceDao = dao + self._document_dao: KnowledgeDocumentDao = document_dao + self._chunk_dao: DocumentChunkDao = chunk_dao + self._dag_manager: Optional[DAGManager] = None + self._dbgpts_loader: Optional[DBGPTsLoader] = None + + super().__init__(system_app) + + def init_app(self, system_app: SystemApp) -> None: + """Initialize the service + + Args: + system_app (SystemApp): The system app + """ + self._serve_config = ServeConfig.from_app_config( + system_app.config, SERVE_CONFIG_KEY_PREFIX + ) + self._dao = self._dao or KnowledgeSpaceDao() + self._document_dao = self._document_dao or KnowledgeDocumentDao() + self._chunk_dao = self._chunk_dao or DocumentChunkDao() + self._system_app = system_app + + def before_start(self): + """Execute before the application starts""" + + def after_start(self): + """Execute after the application starts""" + + @property + def dao( + self, + ) -> BaseDao[KnowledgeSpaceEntity, SpaceServeRequest, SpaceServeResponse]: + """Returns the internal DAO.""" + return self._dao + + @property + def config(self) -> ServeConfig: + """Returns the internal ServeConfig.""" + return self._serve_config + + def create_space(self, request: SpaceServeRequest) -> SpaceServeResponse: + """Create a new Space entity + + Args: + request (KnowledgeSpaceRequest): The request + + Returns: + SpaceServeResponse: The response + """ + space = self.get(request) + if space is not None: + raise HTTPException( + status_code=400, + detail=f"space name:{request.name} have already named", + ) + return self._dao.create_knowledge_space(request) + + def update_space(self, request: SpaceServeRequest) -> SpaceServeResponse: + """Create a new Space entity + + Args: + request (KnowledgeSpaceRequest): The request + + Returns: + SpaceServeResponse: The response + """ + spaces = self._dao.get_knowledge_space( + KnowledgeSpaceEntity(id=request.id, name=request.name) + ) + if len(spaces) == 0: + raise HTTPException( + status_code=400, + detail=f"no space name named {request.name}", + ) + space = spaces[0] + query_request = {"id": space.id} + update_obj = self._dao.update(query_request, update_request=request) + return update_obj + + async def create_document( + self, request: DocumentServeRequest + ) -> SpaceServeResponse: + """Create a new document entity + + Args: + request (KnowledgeSpaceRequest): The request + + Returns: + SpaceServeResponse: The response + """ + space = self.get({"id": request.space_id}) + if space is None: + raise Exception(f"space id:{request.space_id} not found") + query = KnowledgeDocumentEntity(doc_name=request.doc_name, space=space.name) + documents = self._document_dao.get_knowledge_documents(query) + if len(documents) > 0: + raise Exception(f"document name:{request.doc_name} have already named") + if request.doc_file and request.doc_type == KnowledgeType.DOCUMENT.name: + doc_file = request.doc_file + if not os.path.exists(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name)): + os.makedirs(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name)) + tmp_fd, tmp_path = tempfile.mkstemp( + dir=os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name) + ) + with os.fdopen(tmp_fd, "wb") as tmp: + tmp.write(await request.doc_file.read()) + shutil.move( + tmp_path, + os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space.name, doc_file.filename), + ) + request.content = os.path.join( + KNOWLEDGE_UPLOAD_ROOT_PATH, space.name, doc_file.filename + ) + document = KnowledgeDocumentEntity( + doc_name=request.doc_name, + doc_type=request.doc_type, + space=space.name, + chunk_size=0, + status=SyncStatus.TODO.name, + last_sync=datetime.now(), + content=request.content, + result="", + ) + doc_id = self._document_dao.create_knowledge_document(document) + if doc_id is None: + raise Exception(f"create document failed, {request.doc_name}") + return doc_id + + def sync_document(self, requests: List[KnowledgeSyncRequest]) -> List: + """Create a new document entity + + Args: + request (KnowledgeSpaceRequest): The request + + Returns: + SpaceServeResponse: The response + """ + doc_ids = [] + for sync_request in requests: + space_id = sync_request.space_id + docs = self._document_dao.documents_by_ids([sync_request.doc_id]) + if len(docs) == 0: + raise Exception( + f"there are document called, doc_id: {sync_request.doc_id}" + ) + doc = docs[0] + if ( + doc.status == SyncStatus.RUNNING.name + or doc.status == SyncStatus.FINISHED.name + ): + raise Exception( + f" doc:{doc.doc_name} status is {doc.status}, can not sync" + ) + chunk_parameters = sync_request.chunk_parameters + if chunk_parameters.chunk_strategy != ChunkStrategy.CHUNK_BY_SIZE.name: + space_context = self.get_space_context(space_id) + chunk_parameters.chunk_size = ( + CFG.KNOWLEDGE_CHUNK_SIZE + if space_context is None + else int(space_context["embedding"]["chunk_size"]) + ) + chunk_parameters.chunk_overlap = ( + CFG.KNOWLEDGE_CHUNK_OVERLAP + if space_context is None + else int(space_context["embedding"]["chunk_overlap"]) + ) + self._sync_knowledge_document(space_id, doc, chunk_parameters) + doc_ids.append(doc.id) + return doc_ids + + def get(self, request: QUERY_SPEC) -> Optional[SpaceServeResponse]: + """Get a Flow entity + + Args: + request (SpaceServeRequest): The request + + Returns: + SpaceServeResponse: The response + """ + # TODO: implement your own logic here + # Build the query request from the request + query_request = request + return self._dao.get_one(query_request) + + def get_document(self, request: QUERY_SPEC) -> Optional[SpaceServeResponse]: + """Get a Flow entity + + Args: + request (SpaceServeRequest): The request + + Returns: + SpaceServeResponse: The response + """ + # TODO: implement your own logic here + # Build the query request from the request + query_request = request + return self._document_dao.get_one(query_request) + + def delete(self, space_id: str) -> Optional[SpaceServeResponse]: + """Delete a Flow entity + + Args: + uid (str): The uid + + Returns: + SpaceServeResponse: The data after deletion + """ + + # TODO: implement your own logic here + # Build the query request from the request + query_request = {"id": space_id} + space = self.get(query_request) + if space is None: + raise HTTPException(status_code=400, detail=f"Space {space_id} not found") + config = VectorStoreConfig(name=space.name) + vector_store_connector = VectorStoreConnector( + vector_store_type=CFG.VECTOR_STORE_TYPE, + vector_store_config=config, + ) + # delete vectors + vector_store_connector.delete_vector_name(space.name) + document_query = KnowledgeDocumentEntity(space=space.name) + # delete chunks + documents = self._document_dao.get_documents(document_query) + for document in documents: + self._chunk_dao.raw_delete(document.id) + # delete documents + self._document_dao.raw_delete(document_query) + # delete space + self._dao.delete(query_request) + return space + + def delete_document(self, document_id: str) -> Optional[DocumentServeResponse]: + """Delete a Flow entity + + Args: + uid (str): The uid + + Returns: + SpaceServeResponse: The data after deletion + """ + + query_request = {"id": document_id} + docuemnt = self._document_dao.get_one(query_request) + if docuemnt is None: + raise Exception(f"there are no or more than one document {document_id}") + vector_ids = docuemnt.vector_ids + if vector_ids is not None: + config = VectorStoreConfig(name=docuemnt.space) + vector_store_connector = VectorStoreConnector( + vector_store_type=CFG.VECTOR_STORE_TYPE, + vector_store_config=config, + ) + # delete vector by ids + vector_store_connector.delete_by_ids(vector_ids) + # delete chunks + self._chunk_dao.raw_delete(docuemnt.id) + # delete document + self._document_dao.raw_delete(docuemnt) + return docuemnt + + def get_list(self, request: SpaceServeRequest) -> List[SpaceServeResponse]: + """Get a list of Flow entities + + Args: + request (SpaceServeRequest): The request + + Returns: + List[SpaceServeResponse]: The response + """ + # TODO: implement your own logic here + # Build the query request from the request + query_request = request + return self.dao.get_list(query_request) + + def get_list_by_page( + self, request: QUERY_SPEC, page: int, page_size: int + ) -> PaginationResult[SpaceServeResponse]: + """Get a list of Flow entities by page + + Args: + request (SpaceServeRequest): The request + page (int): The page number + page_size (int): The page size + + Returns: + List[SpaceServeResponse]: The response + """ + return self.dao.get_list_page(request, page, page_size) + + def get_document_list( + self, request: QUERY_SPEC, page: int, page_size: int + ) -> PaginationResult[SpaceServeResponse]: + """Get a list of Flow entities by page + + Args: + request (SpaceServeRequest): The request + page (int): The page number + page_size (int): The page size + + Returns: + List[SpaceServeResponse]: The response + """ + return self._document_dao.get_list_page(request, page, page_size) + + def _batch_document_sync( + self, space_id, sync_requests: List[KnowledgeSyncRequest] + ) -> List[int]: + """batch sync knowledge document chunk into vector store + Args: + - space: Knowledge Space Name + - sync_requests: List[KnowledgeSyncRequest] + Returns: + - List[int]: document ids + """ + doc_ids = [] + for sync_request in sync_requests: + docs = self._document_dao.documents_by_ids([sync_request.doc_id]) + if len(docs) == 0: + raise Exception( + f"there are document called, doc_id: {sync_request.doc_id}" + ) + doc = docs[0] + if ( + doc.status == SyncStatus.RUNNING.name + or doc.status == SyncStatus.FINISHED.name + ): + raise Exception( + f" doc:{doc.doc_name} status is {doc.status}, can not sync" + ) + chunk_parameters = sync_request.chunk_parameters + if chunk_parameters.chunk_strategy != ChunkStrategy.CHUNK_BY_SIZE.name: + space_context = self.get_space_context(space_id) + chunk_parameters.chunk_size = ( + CFG.KNOWLEDGE_CHUNK_SIZE + if space_context is None + else int(space_context["embedding"]["chunk_size"]) + ) + chunk_parameters.chunk_overlap = ( + CFG.KNOWLEDGE_CHUNK_OVERLAP + if space_context is None + else int(space_context["embedding"]["chunk_overlap"]) + ) + self._sync_knowledge_document(space_id, doc, chunk_parameters) + doc_ids.append(doc.id) + return doc_ids + + def _sync_knowledge_document( + self, + space_id, + doc: KnowledgeDocumentEntity, + chunk_parameters: ChunkParameters, + ) -> List[Chunk]: + """sync knowledge document chunk into vector store""" + embedding_factory = CFG.SYSTEM_APP.get_component( + "embedding_factory", EmbeddingFactory + ) + embedding_fn = embedding_factory.create( + model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL] + ) + from dbgpt.storage.vector_store.base import VectorStoreConfig + + space = self.get({"id": space_id}) + config = VectorStoreConfig( + name=space.name, + embedding_fn=embedding_fn, + max_chunks_once_load=CFG.KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD, + ) + vector_store_connector = VectorStoreConnector( + vector_store_type=CFG.VECTOR_STORE_TYPE, + vector_store_config=config, + ) + knowledge = KnowledgeFactory.create( + datasource=doc.content, + knowledge_type=KnowledgeType.get_by_value(doc.doc_type), + ) + assembler = EmbeddingAssembler.load_from_knowledge( + knowledge=knowledge, + chunk_parameters=chunk_parameters, + vector_store_connector=vector_store_connector, + ) + chunk_docs = assembler.get_chunks() + doc.status = SyncStatus.RUNNING.name + doc.chunk_size = len(chunk_docs) + doc.gmt_modified = datetime.now() + self._document_dao.update_knowledge_document(doc) + executor = CFG.SYSTEM_APP.get_component( + ComponentType.EXECUTOR_DEFAULT, ExecutorFactory + ).create() + executor.submit(self.async_doc_embedding, assembler, chunk_docs, doc) + logger.info(f"begin save document chunks, doc:{doc.doc_name}") + return chunk_docs + + @trace("async_doc_embedding") + def async_doc_embedding(self, assembler, chunk_docs, doc): + """async document embedding into vector db + Args: + - client: EmbeddingEngine Client + - chunk_docs: List[Document] + - doc: KnowledgeDocumentEntity + """ + + logger.info( + f"async doc embedding sync, doc:{doc.doc_name}, chunks length is {len(chunk_docs)}, begin embedding to vector store-{CFG.VECTOR_STORE_TYPE}" + ) + try: + with root_tracer.start_span( + "app.knowledge.assembler.persist", + metadata={"doc": doc.doc_name, "chunks": len(chunk_docs)}, + ): + vector_ids = assembler.persist() + doc.status = SyncStatus.FINISHED.name + doc.result = "document embedding success" + if vector_ids is not None: + doc.vector_ids = ",".join(vector_ids) + logger.info(f"async document embedding, success:{doc.doc_name}") + # save chunk details + chunk_entities = [ + DocumentChunkEntity( + doc_name=doc.doc_name, + doc_type=doc.doc_type, + document_id=doc.id, + content=chunk_doc.content, + meta_info=str(chunk_doc.metadata), + gmt_created=datetime.now(), + gmt_modified=datetime.now(), + ) + for chunk_doc in chunk_docs + ] + self._chunk_dao.create_documents_chunks(chunk_entities) + except Exception as e: + doc.status = SyncStatus.FAILED.name + doc.result = "document embedding failed" + str(e) + logger.error(f"document embedding, failed:{doc.doc_name}, {str(e)}") + return self._document_dao.update_knowledge_document(doc) + + def get_space_context(self, space_id): + """get space contect + Args: + - space_name: space name + """ + space = self.get({"id": space_id}) + if space is None: + raise Exception( + f"have not found {space_id} space or found more than one space called {space_id}" + ) + if space.context is not None: + return json.loads(space.context) + return None diff --git a/dbgpt/serve/rag/tests/__init__.py b/dbgpt/serve/rag/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/client/__init__.py b/examples/client/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/client/app_crud_example.py b/examples/client/app_crud_example.py new file mode 100644 index 000000000..8687c8ee0 --- /dev/null +++ b/examples/client/app_crud_example.py @@ -0,0 +1,35 @@ +import asyncio + +from dbgpt.client.app import list_app +from dbgpt.client.client import Client + +""" +Client: Simple App CRUD example + + This example demonstrates how to use the dbgpt client to get, list apps. + Example: + .. code-block:: python + + DBGPT_API_KEY = "dbgpt" + client = Client(api_key=DBGPT_API_KEY) + # 1. List all apps + res = await list_app(client) + # 2. Get an app + res = await get_app( + client, app_id="bf1c7561-13fc-4fe0-bf5d-c22e724766a8" + ) + +""" + + +async def main(): + # initialize client + + DBGPT_API_KEY = "dbgpt" + client = Client(api_key=DBGPT_API_KEY) + res = await list_app(client) + print(res.json()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/client/client_chat_example.py b/examples/client/client_chat_example.py new file mode 100644 index 000000000..5ae7009f8 --- /dev/null +++ b/examples/client/client_chat_example.py @@ -0,0 +1,73 @@ +import asyncio + +from dbgpt.client.client import Client + +""" +Client: Simple Chat example + + This example demonstrates how to use the dbgpt client to chat with the chatgpt model. + + Example: + .. code-block:: python + + DBGPT_API_KEY = "dbgpt" + # chat with stream + client = Client(api_key=DBGPT_API_KEY) + + # 1. chat normal + async for data in client.chat_stream( + model="chatgpt_proxyllm", + messages="hello", + ): + print(data.dict()) + + # chat with no stream + res = await client.chat(model="chatgpt_proxyllm", messages="Hello?") + print(res.json()) + + # 2. chat with app + async for data in client.chat_stream( + model="chatgpt_proxyllm", + chat_mode="chat_app", + chat_param="${app_code}", + messages="hello", + ): + print(data.dict()) + + # 3. chat with knowledge + async for data in client.chat_stream( + model="chatgpt_proxyllm", + chat_mode="chat_knowledge", + chat_param="${space_name}", + messages="hello", + ): + print(data.dict()) + + # 4. chat with flow + async for data in client.chat_stream( + model="chatgpt_proxyllm", + chat_mode="chat_flow", + chat_param="${flow_id}", + messages="hello", + ): + print(data.dict()) +""" + + +async def main(): + # initialize client + DBGPT_API_KEY = "dbgpt" + client = Client(api_key=DBGPT_API_KEY) + + async for data in client.chat_stream( + model="chatgpt_proxyllm", + messages="hello", + ): + print(data) + + # res = await client.chat(model="chatgpt_proxyllm" ,messages="hello") + # print(res) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/client/flow_crud_example.py b/examples/client/flow_crud_example.py new file mode 100644 index 000000000..8fee72196 --- /dev/null +++ b/examples/client/flow_crud_example.py @@ -0,0 +1,48 @@ +import asyncio + +from dbgpt.client.app import list_app +from dbgpt.client.client import Client +from dbgpt.client.flow import list_flow + +""" +Client: Simple Flow CRUD example + + This example demonstrates how to use the dbgpt client to create, get, update, and delete flows. + Example: + .. code-block:: python + + DBGPT_API_KEY = "dbgpt" + client = Client(api_key=DBGPT_API_KEY) + # 1. Create a flow + res = await create_flow( + client, + FlowPanel(name="test_flow", desc="for client flow", owner="dbgpt"), + ) + # 2. Update a flow + res = await update_flow( + client, + FlowPanel(name="test_flow", desc="for client flow333", owner="dbgpt"), + ) + # 3. Delete a flow + res = await delete_flow( + client, flow_id="bf1c7561-13fc-4fe0-bf5d-c22e724766a8" + ) + # 4. Get a flow + res = await get_flow(client, flow_id="bf1c7561-13fc-4fe0-bf5d-c22e724766a8") + # 5. List all flows + res = await list_flow(client) + +""" + + +async def main(): + # initialize client + + DBGPT_API_KEY = "dbgpt" + client = Client(api_key=DBGPT_API_KEY) + res = await list_flow(client) + print(res.json()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/client/knowledge_crud_example.py b/examples/client/knowledge_crud_example.py new file mode 100644 index 000000000..e9ff7ea9f --- /dev/null +++ b/examples/client/knowledge_crud_example.py @@ -0,0 +1,109 @@ +import asyncio + +from dbgpt.client.client import Client +from dbgpt.client.knowledge import list_space + +"""Client: Simple Knowledge CRUD example + + This example demonstrates how to use the dbgpt client to create, get, update, and delete knowledge spaces and documents. + Example: + .. code-block:: python + + DBGPT_API_KEY = "dbgpt" + client = Client(api_key=DBGPT_API_KEY) + # 1. Create a space + res = await create_space( + client, + SpaceModel( + name="test_space", + vector_type="Chroma", + desc="for client space", + owner="dbgpt", + ), + ) + # 2. Update a space + res = await update_space( + client, + SpaceModel( + name="test_space", + vector_type="Chroma", + desc="for client space333", + owner="dbgpt", + ), + ) + # 3. Delete a space + res = await delete_space(client, space_id="37") + # 4. Get a space + res = await get_space(client, space_id="5") + # 5. List all spaces + res = await list_space(client) + # 6. Create a document + res = await create_document( + client, + DocumentModel( + space_id="5", + doc_name="test_doc", + doc_type="TEXT", + doc_content="test content", + doc_source="", + ), + ) + # 7. Sync a document + res = await sync_document( + client, + sync_model=SyncModel( + doc_id="153", + space_id="40", + model_name="text2vec", + chunk_parameters=ChunkParameters(chunk_strategy="Automatic"), + ), + ) + # 8. Get a document + res = await get_document(client, "52") + # 9. List all documents + res = await list_document(client) + # 10. Delete a document + res = await delete_document(client, "150") +""" + + +async def main(): + + DBGPT_API_KEY = "dbgpt" + client = Client(api_key=DBGPT_API_KEY) + + # list all spaces + res = await list_space(client) + print(res) + + # get space + # res = await get_space(client, space_id='5') + + # create space + # res = await create_space(client, SpaceModel(name="test_space", vector_type="Chroma", desc="for client space", owner="dbgpt")) + + # update space + # res = await update_space(client, SpaceModel(name="test_space", vector_type="Chroma", desc="for client space333", owner="dbgpt")) + + # delete space + # res = await delete_space(client, space_id='37') + + # list all documents + # res = await list_document(client) + + # get document + # res = await get_document(client, "52") + + # delete document + # res = await delete_document(client, "150") + + # create document + # res = await create_document(client, DocumentModel(space_id="5", doc_name="test_doc", doc_type="test", doc_content="test content" + # , doc_file=('your_file_name', open('{your_file_path}', 'rb')))) + + # sync document + # res = await sync_document(client, sync_model=SyncModel(doc_id="153", space_id="40", model_name="text2vec", chunk_parameters=ChunkParameters(chunk_strategy="Automatic"))) + + +if __name__ == "__main__": + asyncio.run(main())