From 24e6d16a095c9e526a240ce7bc648d3cfe676426 Mon Sep 17 00:00:00 2001 From: yihong0618 Date: Fri, 22 Sep 2023 16:47:34 +0800 Subject: [PATCH 1/4] fix: proxyllm do not need controller_addr --- pilot/model/cluster/worker/manager.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pilot/model/cluster/worker/manager.py b/pilot/model/cluster/worker/manager.py index 49c582a07..2f84a3216 100644 --- a/pilot/model/cluster/worker/manager.py +++ b/pilot/model/cluster/worker/manager.py @@ -1,17 +1,18 @@ import asyncio import itertools import json -import os -import sys -import random -import time import logging +import os +import random +import sys +import time from concurrent.futures import ThreadPoolExecutor from dataclasses import asdict from typing import Awaitable, Callable, Dict, Iterator, List, Optional from fastapi import APIRouter, FastAPI from fastapi.responses import StreamingResponse + from pilot.component import SystemApp from pilot.model.base import ( ModelInstance, @@ -20,16 +21,16 @@ from pilot.model.base import ( WorkerApplyType, WorkerSupportedModel, ) -from pilot.model.cluster.registry import ModelRegistry -from pilot.model.llm_utils import list_supported_models -from pilot.model.parameter import ModelParameters, ModelWorkerParameters, WorkerType -from pilot.model.cluster.worker_base import ModelWorker +from pilot.model.cluster.base import * from pilot.model.cluster.manager_base import ( WorkerManager, - WorkerRunData, WorkerManagerFactory, + WorkerRunData, ) -from pilot.model.cluster.base import * +from pilot.model.cluster.registry import ModelRegistry +from pilot.model.cluster.worker_base import ModelWorker +from pilot.model.llm_utils import list_supported_models +from pilot.model.parameter import ModelParameters, ModelWorkerParameters, WorkerType from pilot.utils.parameter_utils import ( EnvArgumentParser, ParameterDescription, @@ -638,7 +639,7 @@ def _setup_fastapi(worker_params: ModelWorkerParameters, app=None): router as controller_router, ) - if not worker_params.controller_addr: + if not worker_params.controller_addr and worker_params.model_name != "proxyllm": worker_params.controller_addr = f"http://127.0.0.1:{worker_params.port}" logger.info( f"Run WorkerManager with standalone mode, controller_addr: {worker_params.controller_addr}" From 76fdf52b0ae6ee896492bba9968eb706a2678d3f Mon Sep 17 00:00:00 2001 From: yihong0618 Date: Sat, 23 Sep 2023 14:46:26 +0800 Subject: [PATCH 2/4] fix: another way --- pilot/model/cluster/worker/manager.py | 6 +++++- pilot/model/parameter.py | 6 ++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pilot/model/cluster/worker/manager.py b/pilot/model/cluster/worker/manager.py index 2f84a3216..da34c314a 100644 --- a/pilot/model/cluster/worker/manager.py +++ b/pilot/model/cluster/worker/manager.py @@ -639,7 +639,11 @@ def _setup_fastapi(worker_params: ModelWorkerParameters, app=None): router as controller_router, ) - if not worker_params.controller_addr and worker_params.model_name != "proxyllm": + if not worker_params.controller_addr: + # if we have http_proxy or https_proxy in env, the server can not start + # so set it to empty here + os.environ["http_proxy"] = "" + os.environ["https_proxy"] = "" worker_params.controller_addr = f"http://127.0.0.1:{worker_params.port}" logger.info( f"Run WorkerManager with standalone mode, controller_addr: {worker_params.controller_addr}" diff --git a/pilot/model/parameter.py b/pilot/model/parameter.py index c7917a95f..6fd8ec5ab 100644 --- a/pilot/model/parameter.py +++ b/pilot/model/parameter.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +import os from dataclasses import dataclass, field from enum import Enum from typing import Dict, Optional @@ -246,6 +247,11 @@ class ProxyModelParameters(BaseModelParameters): proxy_api_key: str = field( metadata={"tags": "privacy", "help": "The api key of current proxy LLM"}, ) + http_proxy: Optional[str] = field( + default=os.environ.get("http_proxy") or os.environ.get("https_proxy"), + metadata={"help": "The http or https proxy to use openai"}, + ) + proxyllm_backend: Optional[str] = field( default=None, metadata={ From 3d5e0f4028b25743bcd095a7d46a1c44cb1fef5a Mon Sep 17 00:00:00 2001 From: FangYin Cheng Date: Mon, 25 Sep 2023 17:33:51 +0800 Subject: [PATCH 3/4] fix(core): Use thread pool to replace native threading.Thread --- pilot/component.py | 1 + pilot/configs/config.py | 1 + .../connections/manages/connection_manager.py | 24 +++++++++++------ pilot/server/component_configs.py | 4 +++ .../server/knowledge/_cli/knowledge_client.py | 7 +++-- pilot/server/knowledge/service.py | 16 +++++++++--- pilot/utils/executor_utils.py | 26 +++++++++++++++++++ 7 files changed, 63 insertions(+), 16 deletions(-) create mode 100644 pilot/utils/executor_utils.py diff --git a/pilot/component.py b/pilot/component.py index 7c865479d..3179fa696 100644 --- a/pilot/component.py +++ b/pilot/component.py @@ -46,6 +46,7 @@ class ComponentType(str, Enum): WORKER_MANAGER = "dbgpt_worker_manager" WORKER_MANAGER_FACTORY = "dbgpt_worker_manager_factory" MODEL_CONTROLLER = "dbgpt_model_controller" + EXECUTOR_DEFAULT = "dbgpt_thread_pool_default" class BaseComponent(LifeCycle, ABC): diff --git a/pilot/configs/config.py b/pilot/configs/config.py index 8bd6c4007..b2af3ec3a 100644 --- a/pilot/configs/config.py +++ b/pilot/configs/config.py @@ -141,6 +141,7 @@ class Config(metaclass=Singleton): self.LOCAL_DB_PORT = int(os.getenv("LOCAL_DB_PORT", 3306)) self.LOCAL_DB_USER = os.getenv("LOCAL_DB_USER", "root") self.LOCAL_DB_PASSWORD = os.getenv("LOCAL_DB_PASSWORD", "aa123456") + self.LOCAL_DB_POOL_SIZE = int(os.getenv("LOCAL_DB_POOL_SIZE", 10)) self.LOCAL_DB_MANAGE = None diff --git a/pilot/connections/manages/connection_manager.py b/pilot/connections/manages/connection_manager.py index 534cd36f0..9806b531f 100644 --- a/pilot/connections/manages/connection_manager.py +++ b/pilot/connections/manages/connection_manager.py @@ -4,7 +4,9 @@ import asyncio from pilot.configs.config import Config from pilot.connections.manages.connect_storage_duckdb import DuckdbConnectConfig from pilot.common.schema import DBType -from pilot.component import SystemApp +from pilot.component import SystemApp, ComponentType +from pilot.utils.executor_utils import ExecutorFactory + from pilot.connections.rdbms.conn_mysql import MySQLConnect from pilot.connections.base import BaseConnect @@ -76,7 +78,11 @@ class ConnectManager: + CFG.LOCAL_DB_HOST + ":" + str(CFG.LOCAL_DB_PORT), - engine_args={"pool_size": 10, "pool_recycle": 3600, "echo": True}, + engine_args={ + "pool_size": CFG.LOCAL_DB_POOL_SIZE, + "pool_recycle": 3600, + "echo": True, + }, ) # default_mysql = MySQLConnect.from_uri( # "mysql+pymysql://" @@ -208,13 +214,15 @@ class ConnectManager: db_info.comment, ) # async embedding - thread = threading.Thread( - target=self.db_summary_client.db_summary_embedding( - db_info.db_name, db_info.db_type - ) + executor = CFG.SYSTEM_APP.get_component( + ComponentType.EXECUTOR_DEFAULT, ExecutorFactory + ).create() + executor.submit( + self.db_summary_client.db_summary_embedding, + db_info.db_name, + db_info.db_type, ) - thread.start() except Exception as e: - raise ValueError("Add db connect info error!" + str(e)) + raise ValueError("Add db connect info error!" + str(e)) return True diff --git a/pilot/server/component_configs.py b/pilot/server/component_configs.py index d2dded0a1..71ef797d9 100644 --- a/pilot/server/component_configs.py +++ b/pilot/server/component_configs.py @@ -4,6 +4,7 @@ import logging from typing import TYPE_CHECKING, Any, Type from pilot.component import ComponentType, SystemApp +from pilot.utils.executor_utils import DefaultExecutorFactory from pilot.embedding_engine.embedding_factory import EmbeddingFactory from pilot.server.base import WebWerverParameters @@ -22,6 +23,9 @@ def initialize_components( ): from pilot.model.cluster.controller.controller import controller + # Register global default executor factory first + system_app.register(DefaultExecutorFactory) + system_app.register_instance(controller) _initialize_embedding_model( diff --git a/pilot/server/knowledge/_cli/knowledge_client.py b/pilot/server/knowledge/_cli/knowledge_client.py index 902b3388a..4cfeea339 100644 --- a/pilot/server/knowledge/_cli/knowledge_client.py +++ b/pilot/server/knowledge/_cli/knowledge_client.py @@ -124,12 +124,13 @@ def knowledge_init( def upload(filename: str): try: logger.info(f"Begin upload document: {filename} to {space.name}") - return client.document_upload( + doc_id = client.document_upload( space.name, filename, KnowledgeType.DOCUMENT.value, filename ) + client.document_sync(space.name, DocumentSyncRequest(doc_ids=[doc_id])) except Exception as e: if skip_wrong_doc: - logger.warn(f"Warning: {str(e)}") + logger.warn(f"Upload {filename} to {space.name} failed: {str(e)}") else: raise e @@ -144,5 +145,3 @@ def knowledge_init( if not doc_ids: logger.warn("Warning: no document to sync") return - logger.info(f"Begin sync document: {doc_ids}") - client.document_sync(space.name, DocumentSyncRequest(doc_ids=doc_ids)) diff --git a/pilot/server/knowledge/service.py b/pilot/server/knowledge/service.py index fc07040c7..d574ffdfd 100644 --- a/pilot/server/knowledge/service.py +++ b/pilot/server/knowledge/service.py @@ -9,6 +9,9 @@ from pilot.configs.model_config import ( EMBEDDING_MODEL_CONFIG, KNOWLEDGE_UPLOAD_ROOT_PATH, ) +from pilot.component import ComponentType +from pilot.utils.executor_utils import ExecutorFactory + from pilot.logs import logger from pilot.server.knowledge.chunk_db import ( DocumentChunkEntity, @@ -227,10 +230,15 @@ class KnowledgeService: doc.gmt_modified = datetime.now() knowledge_document_dao.update_knowledge_document(doc) # async doc embeddings - thread = threading.Thread( - target=self.async_doc_embedding, args=(client, chunk_docs, doc) - ) - thread.start() + # thread = threading.Thread( + # target=self.async_doc_embedding, args=(client, chunk_docs, doc) + # ) + # thread.start() + executor = CFG.SYSTEM_APP.get_component( + ComponentType.EXECUTOR_DEFAULT, ExecutorFactory + ).create() + executor.submit(self.async_doc_embedding, client, chunk_docs, doc) + logger.info(f"begin save document chunks, doc:{doc.doc_name}") # save chunk details chunk_entities = [ diff --git a/pilot/utils/executor_utils.py b/pilot/utils/executor_utils.py new file mode 100644 index 000000000..dbc3dac81 --- /dev/null +++ b/pilot/utils/executor_utils.py @@ -0,0 +1,26 @@ +from abc import ABC, abstractmethod +from concurrent.futures import Executor, ThreadPoolExecutor + +from pilot.component import BaseComponent, ComponentType, SystemApp + + +class ExecutorFactory(BaseComponent, ABC): + name = ComponentType.EXECUTOR_DEFAULT.value + + @abstractmethod + def create(self) -> "Executor": + """Create executor""" + + +class DefaultExecutorFactory(ExecutorFactory): + def __init__(self, system_app: SystemApp | None = None, max_workers=None): + super().__init__(system_app) + self._executor = ThreadPoolExecutor( + max_workers=max_workers, thread_name_prefix=self.name + ) + + def init_app(self, system_app: SystemApp): + pass + + def create(self) -> Executor: + return self._executor From 6183fa08d2d20e6e1454a8c9bf06f0ede7ca5610 Mon Sep 17 00:00:00 2001 From: FangYin Cheng Date: Mon, 25 Sep 2023 17:38:21 +0800 Subject: [PATCH 4/4] chore: modify colorama==0.4.10 --- setup.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 64ae3e8b4..8fe47a614 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,6 @@ from enum import Enum import urllib.request from urllib.parse import urlparse, quote import re -from pip._internal.utils.appdirs import user_cache_dir import shutil from setuptools import find_packages @@ -67,6 +66,9 @@ def cache_package(package_url: str, package_name: str, is_windows: bool = False) safe_url, parsed_url = encode_url(package_url) if BUILD_NO_CACHE: return safe_url + + from pip._internal.utils.appdirs import user_cache_dir + filename = os.path.basename(parsed_url) cache_dir = os.path.join(user_cache_dir("pip"), "http", "wheels", package_name) os.makedirs(cache_dir, exist_ok=True) @@ -279,7 +281,7 @@ def core_requires(): "importlib-resources==5.12.0", "psutil==5.9.4", "python-dotenv==1.0.0", - "colorama", + "colorama==0.4.10", "prettytable", "cachetools", ]