diff --git a/pilot/model/cluster/controller/controller.py b/pilot/model/cluster/controller/controller.py index 826ffef03..1ec3965dc 100644 --- a/pilot/model/cluster/controller/controller.py +++ b/pilot/model/cluster/controller/controller.py @@ -185,7 +185,7 @@ def run_model_controller(): setup_logging( "pilot", logging_level=controller_params.log_level, - logger_filename="dbgpt_model_controller.log", + logger_filename=controller_params.log_file, ) initialize_controller(host=controller_params.host, port=controller_params.port) diff --git a/pilot/model/cluster/worker/manager.py b/pilot/model/cluster/worker/manager.py index 815f07bf2..cc5ef97d6 100644 --- a/pilot/model/cluster/worker/manager.py +++ b/pilot/model/cluster/worker/manager.py @@ -1006,7 +1006,7 @@ def run_worker_manager( setup_logging( "pilot", logging_level=worker_params.log_level, - logger_filename="dbgpt_model_worker_manager.log", + logger_filename=worker_params.log_file, ) embedded_mod = True @@ -1019,7 +1019,7 @@ def run_worker_manager( system_app = SystemApp(app) initialize_tracer( system_app, - os.path.join(LOGDIR, "dbgpt_model_worker_manager_tracer.jsonl"), + os.path.join(LOGDIR, worker_params.tracer_file), root_operation_name="DB-GPT-WorkerManager-Entry", ) diff --git a/pilot/model/parameter.py b/pilot/model/parameter.py index 0ad048c24..ba0000435 100644 --- a/pilot/model/parameter.py +++ b/pilot/model/parameter.py @@ -46,6 +46,18 @@ class ModelControllerParameters(BaseParameters): ], }, ) + log_file: Optional[str] = field( + default="dbgpt_model_controller.log", + metadata={ + "help": "The filename to store log", + }, + ) + tracer_file: Optional[str] = field( + default="dbgpt_model_controller_tracer.jsonl", + metadata={ + "help": "The filename to store tracer span records", + }, + ) @dataclass @@ -122,6 +134,18 @@ class ModelWorkerParameters(BaseModelParameters): ], }, ) + log_file: Optional[str] = field( + default="dbgpt_model_worker_manager.log", + metadata={ + "help": "The filename to store log", + }, + ) + tracer_file: Optional[str] = field( + default="dbgpt_model_worker_manager_tracer.jsonl", + metadata={ + "help": "The filename to store tracer span records", + }, + ) @dataclass diff --git a/pilot/server/base.py b/pilot/server/base.py index 3b2d7010b..d34b14b28 100644 --- a/pilot/server/base.py +++ b/pilot/server/base.py @@ -95,6 +95,19 @@ class WebWerverParameters(BaseParameters): daemon: Optional[bool] = field( default=False, metadata={"help": "Run Webserver in background"} ) + controller_addr: Optional[str] = field( + default=None, + metadata={ + "help": "The Model controller address to connect. If None, read model controller address from environment key `MODEL_SERVER`." + }, + ) + model_name: str = field( + default=None, + metadata={ + "help": "The default model name to use. If None, read model name from environment key `LLM_MODEL`.", + "tags": "fixed", + }, + ) share: Optional[bool] = field( default=False, metadata={ @@ -123,3 +136,15 @@ class WebWerverParameters(BaseParameters): }, ) light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"}) + log_file: Optional[str] = field( + default="dbgpt_webserver.log", + metadata={ + "help": "The filename to store log", + }, + ) + tracer_file: Optional[str] = field( + default="dbgpt_webserver_tracer.jsonl", + metadata={ + "help": "The filename to store tracer span records", + }, + ) diff --git a/pilot/server/dbgpt_server.py b/pilot/server/dbgpt_server.py index 2b35eaf10..6762fd32a 100644 --- a/pilot/server/dbgpt_server.py +++ b/pilot/server/dbgpt_server.py @@ -119,7 +119,7 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None): if not param.log_level: param.log_level = _get_logging_level() setup_logging( - "pilot", logging_level=param.log_level, logger_filename="dbgpt_webserver.log" + "pilot", logging_level=param.log_level, logger_filename=param.log_file ) # Before start system_app.before_start() @@ -133,14 +133,16 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None): model_start_listener = _create_model_start_listener(system_app) initialize_components(param, system_app, embedding_model_name, embedding_model_path) - model_path = LLM_MODEL_CONFIG.get(CFG.LLM_MODEL) + model_name = param.model_name or CFG.LLM_MODEL + + model_path = LLM_MODEL_CONFIG.get(model_name) if not param.light: print("Model Unified Deployment Mode!") if not param.remote_embedding: embedding_model_name, embedding_model_path = None, None initialize_worker_manager_in_client( app=app, - model_name=CFG.LLM_MODEL, + model_name=model_name, model_path=model_path, local_port=param.port, embedding_model_name=embedding_model_name, @@ -152,12 +154,13 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None): CFG.NEW_SERVER_MODE = True else: # MODEL_SERVER is controller address now + controller_addr = param.controller_addr or CFG.MODEL_SERVER initialize_worker_manager_in_client( app=app, - model_name=CFG.LLM_MODEL, + model_name=model_name, model_path=model_path, run_locally=False, - controller_addr=CFG.MODEL_SERVER, + controller_addr=controller_addr, local_port=param.port, start_listener=model_start_listener, system_app=system_app, @@ -182,7 +185,7 @@ def run_uvicorn(param: WebWerverParameters): def run_webserver(param: WebWerverParameters = None): if not param: param = _get_webserver_params() - initialize_tracer(system_app, os.path.join(LOGDIR, "dbgpt_webserver_tracer.jsonl")) + initialize_tracer(system_app, os.path.join(LOGDIR, param.tracer_file)) with root_tracer.start_span( "run_webserver", diff --git a/pilot/utils/tracer/span_storage.py b/pilot/utils/tracer/span_storage.py index 914aa0126..3070fb834 100644 --- a/pilot/utils/tracer/span_storage.py +++ b/pilot/utils/tracer/span_storage.py @@ -1,6 +1,7 @@ import os import json import time +import datetime import threading import queue import logging @@ -27,6 +28,13 @@ class FileSpanStorage(SpanStorage): def __init__(self, filename: str, batch_size=10, flush_interval=10): super().__init__() self.filename = filename + # Split filename into prefix and suffix + self.filename_prefix, self.filename_suffix = os.path.splitext(filename) + if not self.filename_suffix: + self.filename_suffix = ".log" + self.last_date = ( + datetime.datetime.now().date() + ) # Store the current date for checking date changes self.queue = queue.Queue() self.batch_size = batch_size self.flush_interval = flush_interval @@ -52,7 +60,21 @@ class FileSpanStorage(SpanStorage): except queue.Full: pass # If the signal queue is full, it's okay. The flush thread will handle it. + def _get_dated_filename(self, date: datetime.date) -> str: + """Return the filename based on a specific date.""" + date_str = date.strftime("%Y-%m-%d") + return f"{self.filename_prefix}_{date_str}{self.filename_suffix}" + + def _roll_over_if_needed(self): + """Checks if a day has changed since the last write, and if so, renames the current file.""" + current_date = datetime.datetime.now().date() + if current_date != self.last_date: + if os.path.exists(self.filename): + os.rename(self.filename, self._get_dated_filename(self.last_date)) + self.last_date = current_date + def _write_to_file(self): + self._roll_over_if_needed() spans_to_write = [] while not self.queue.empty(): spans_to_write.append(self.queue.get()) diff --git a/pilot/utils/tracer/tests/test_span_storage.py b/pilot/utils/tracer/tests/test_span_storage.py index 0c63992a6..9ca727995 100644 --- a/pilot/utils/tracer/tests/test_span_storage.py +++ b/pilot/utils/tracer/tests/test_span_storage.py @@ -4,6 +4,8 @@ import asyncio import json import tempfile import time +from unittest.mock import patch +from datetime import datetime, timedelta from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType @@ -122,3 +124,46 @@ def test_non_existent_file(storage: SpanStorage): assert len(spans_in_file) == 2 assert spans_in_file[0]["trace_id"] == "1" assert spans_in_file[1]["trace_id"] == "2" + + +@pytest.mark.parametrize( + "storage", [{"batch_size": 1, "file_does_not_exist": True}], indirect=True +) +def test_log_rollover(storage: SpanStorage): + # mock start date + mock_start_date = datetime(2023, 10, 18, 23, 59) + + with patch("datetime.datetime") as mock_datetime: + mock_datetime.now.return_value = mock_start_date + + span1 = Span("1", "a", SpanType.BASE, "b", "op1") + storage.append_span(span1) + time.sleep(0.1) + + # mock new day + mock_datetime.now.return_value = mock_start_date + timedelta(minutes=1) + + span2 = Span("2", "c", SpanType.BASE, "d", "op2") + storage.append_span(span2) + time.sleep(0.1) + + # origin filename need exists + assert os.path.exists(storage.filename) + + # get roll over filename + dated_filename = os.path.join( + os.path.dirname(storage.filename), + f"{os.path.basename(storage.filename).split('.')[0]}_2023-10-18.jsonl", + ) + + assert os.path.exists(dated_filename) + + # check origin filename just include the second span + spans_in_original_file = read_spans_from_file(storage.filename) + assert len(spans_in_original_file) == 1 + assert spans_in_original_file[0]["trace_id"] == "2" + + # check the roll over filename just include the first span + spans_in_dated_file = read_spans_from_file(dated_filename) + assert len(spans_in_dated_file) == 1 + assert spans_in_dated_file[0]["trace_id"] == "1"