feat(core): Support use custom log file name and tracer file name (#690)

Close #689 
**Other:** The tracer FileSpanStorage support file roll over
This commit is contained in:
Aries-ckt 2023-10-18 20:34:14 +08:00 committed by GitHub
commit 05327c61c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 128 additions and 9 deletions

View File

@ -185,7 +185,7 @@ def run_model_controller():
setup_logging( setup_logging(
"pilot", "pilot",
logging_level=controller_params.log_level, logging_level=controller_params.log_level,
logger_filename="dbgpt_model_controller.log", logger_filename=controller_params.log_file,
) )
initialize_controller(host=controller_params.host, port=controller_params.port) initialize_controller(host=controller_params.host, port=controller_params.port)

View File

@ -1006,7 +1006,7 @@ def run_worker_manager(
setup_logging( setup_logging(
"pilot", "pilot",
logging_level=worker_params.log_level, logging_level=worker_params.log_level,
logger_filename="dbgpt_model_worker_manager.log", logger_filename=worker_params.log_file,
) )
embedded_mod = True embedded_mod = True
@ -1019,7 +1019,7 @@ def run_worker_manager(
system_app = SystemApp(app) system_app = SystemApp(app)
initialize_tracer( initialize_tracer(
system_app, system_app,
os.path.join(LOGDIR, "dbgpt_model_worker_manager_tracer.jsonl"), os.path.join(LOGDIR, worker_params.tracer_file),
root_operation_name="DB-GPT-WorkerManager-Entry", root_operation_name="DB-GPT-WorkerManager-Entry",
) )

View File

@ -46,6 +46,18 @@ class ModelControllerParameters(BaseParameters):
], ],
}, },
) )
log_file: Optional[str] = field(
default="dbgpt_model_controller.log",
metadata={
"help": "The filename to store log",
},
)
tracer_file: Optional[str] = field(
default="dbgpt_model_controller_tracer.jsonl",
metadata={
"help": "The filename to store tracer span records",
},
)
@dataclass @dataclass
@ -122,6 +134,18 @@ class ModelWorkerParameters(BaseModelParameters):
], ],
}, },
) )
log_file: Optional[str] = field(
default="dbgpt_model_worker_manager.log",
metadata={
"help": "The filename to store log",
},
)
tracer_file: Optional[str] = field(
default="dbgpt_model_worker_manager_tracer.jsonl",
metadata={
"help": "The filename to store tracer span records",
},
)
@dataclass @dataclass

View File

@ -95,6 +95,19 @@ class WebWerverParameters(BaseParameters):
daemon: Optional[bool] = field( daemon: Optional[bool] = field(
default=False, metadata={"help": "Run Webserver in background"} default=False, metadata={"help": "Run Webserver in background"}
) )
controller_addr: Optional[str] = field(
default=None,
metadata={
"help": "The Model controller address to connect. If None, read model controller address from environment key `MODEL_SERVER`."
},
)
model_name: str = field(
default=None,
metadata={
"help": "The default model name to use. If None, read model name from environment key `LLM_MODEL`.",
"tags": "fixed",
},
)
share: Optional[bool] = field( share: Optional[bool] = field(
default=False, default=False,
metadata={ metadata={
@ -123,3 +136,15 @@ class WebWerverParameters(BaseParameters):
}, },
) )
light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"}) light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"})
log_file: Optional[str] = field(
default="dbgpt_webserver.log",
metadata={
"help": "The filename to store log",
},
)
tracer_file: Optional[str] = field(
default="dbgpt_webserver_tracer.jsonl",
metadata={
"help": "The filename to store tracer span records",
},
)

View File

@ -119,7 +119,7 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
if not param.log_level: if not param.log_level:
param.log_level = _get_logging_level() param.log_level = _get_logging_level()
setup_logging( setup_logging(
"pilot", logging_level=param.log_level, logger_filename="dbgpt_webserver.log" "pilot", logging_level=param.log_level, logger_filename=param.log_file
) )
# Before start # Before start
system_app.before_start() system_app.before_start()
@ -133,14 +133,16 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
model_start_listener = _create_model_start_listener(system_app) model_start_listener = _create_model_start_listener(system_app)
initialize_components(param, system_app, embedding_model_name, embedding_model_path) initialize_components(param, system_app, embedding_model_name, embedding_model_path)
model_path = LLM_MODEL_CONFIG.get(CFG.LLM_MODEL) model_name = param.model_name or CFG.LLM_MODEL
model_path = LLM_MODEL_CONFIG.get(model_name)
if not param.light: if not param.light:
print("Model Unified Deployment Mode!") print("Model Unified Deployment Mode!")
if not param.remote_embedding: if not param.remote_embedding:
embedding_model_name, embedding_model_path = None, None embedding_model_name, embedding_model_path = None, None
initialize_worker_manager_in_client( initialize_worker_manager_in_client(
app=app, app=app,
model_name=CFG.LLM_MODEL, model_name=model_name,
model_path=model_path, model_path=model_path,
local_port=param.port, local_port=param.port,
embedding_model_name=embedding_model_name, embedding_model_name=embedding_model_name,
@ -152,12 +154,13 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
CFG.NEW_SERVER_MODE = True CFG.NEW_SERVER_MODE = True
else: else:
# MODEL_SERVER is controller address now # MODEL_SERVER is controller address now
controller_addr = param.controller_addr or CFG.MODEL_SERVER
initialize_worker_manager_in_client( initialize_worker_manager_in_client(
app=app, app=app,
model_name=CFG.LLM_MODEL, model_name=model_name,
model_path=model_path, model_path=model_path,
run_locally=False, run_locally=False,
controller_addr=CFG.MODEL_SERVER, controller_addr=controller_addr,
local_port=param.port, local_port=param.port,
start_listener=model_start_listener, start_listener=model_start_listener,
system_app=system_app, system_app=system_app,
@ -182,7 +185,7 @@ def run_uvicorn(param: WebWerverParameters):
def run_webserver(param: WebWerverParameters = None): def run_webserver(param: WebWerverParameters = None):
if not param: if not param:
param = _get_webserver_params() param = _get_webserver_params()
initialize_tracer(system_app, os.path.join(LOGDIR, "dbgpt_webserver_tracer.jsonl")) initialize_tracer(system_app, os.path.join(LOGDIR, param.tracer_file))
with root_tracer.start_span( with root_tracer.start_span(
"run_webserver", "run_webserver",

View File

@ -1,6 +1,7 @@
import os import os
import json import json
import time import time
import datetime
import threading import threading
import queue import queue
import logging import logging
@ -27,6 +28,13 @@ class FileSpanStorage(SpanStorage):
def __init__(self, filename: str, batch_size=10, flush_interval=10): def __init__(self, filename: str, batch_size=10, flush_interval=10):
super().__init__() super().__init__()
self.filename = filename self.filename = filename
# Split filename into prefix and suffix
self.filename_prefix, self.filename_suffix = os.path.splitext(filename)
if not self.filename_suffix:
self.filename_suffix = ".log"
self.last_date = (
datetime.datetime.now().date()
) # Store the current date for checking date changes
self.queue = queue.Queue() self.queue = queue.Queue()
self.batch_size = batch_size self.batch_size = batch_size
self.flush_interval = flush_interval self.flush_interval = flush_interval
@ -52,7 +60,21 @@ class FileSpanStorage(SpanStorage):
except queue.Full: except queue.Full:
pass # If the signal queue is full, it's okay. The flush thread will handle it. pass # If the signal queue is full, it's okay. The flush thread will handle it.
def _get_dated_filename(self, date: datetime.date) -> str:
"""Return the filename based on a specific date."""
date_str = date.strftime("%Y-%m-%d")
return f"{self.filename_prefix}_{date_str}{self.filename_suffix}"
def _roll_over_if_needed(self):
"""Checks if a day has changed since the last write, and if so, renames the current file."""
current_date = datetime.datetime.now().date()
if current_date != self.last_date:
if os.path.exists(self.filename):
os.rename(self.filename, self._get_dated_filename(self.last_date))
self.last_date = current_date
def _write_to_file(self): def _write_to_file(self):
self._roll_over_if_needed()
spans_to_write = [] spans_to_write = []
while not self.queue.empty(): while not self.queue.empty():
spans_to_write.append(self.queue.get()) spans_to_write.append(self.queue.get())

View File

@ -4,6 +4,8 @@ import asyncio
import json import json
import tempfile import tempfile
import time import time
from unittest.mock import patch
from datetime import datetime, timedelta
from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType
@ -122,3 +124,46 @@ def test_non_existent_file(storage: SpanStorage):
assert len(spans_in_file) == 2 assert len(spans_in_file) == 2
assert spans_in_file[0]["trace_id"] == "1" assert spans_in_file[0]["trace_id"] == "1"
assert spans_in_file[1]["trace_id"] == "2" assert spans_in_file[1]["trace_id"] == "2"
@pytest.mark.parametrize(
"storage", [{"batch_size": 1, "file_does_not_exist": True}], indirect=True
)
def test_log_rollover(storage: SpanStorage):
# mock start date
mock_start_date = datetime(2023, 10, 18, 23, 59)
with patch("datetime.datetime") as mock_datetime:
mock_datetime.now.return_value = mock_start_date
span1 = Span("1", "a", SpanType.BASE, "b", "op1")
storage.append_span(span1)
time.sleep(0.1)
# mock new day
mock_datetime.now.return_value = mock_start_date + timedelta(minutes=1)
span2 = Span("2", "c", SpanType.BASE, "d", "op2")
storage.append_span(span2)
time.sleep(0.1)
# origin filename need exists
assert os.path.exists(storage.filename)
# get roll over filename
dated_filename = os.path.join(
os.path.dirname(storage.filename),
f"{os.path.basename(storage.filename).split('.')[0]}_2023-10-18.jsonl",
)
assert os.path.exists(dated_filename)
# check origin filename just include the second span
spans_in_original_file = read_spans_from_file(storage.filename)
assert len(spans_in_original_file) == 1
assert spans_in_original_file[0]["trace_id"] == "2"
# check the roll over filename just include the first span
spans_in_dated_file = read_spans_from_file(dated_filename)
assert len(spans_in_dated_file) == 1
assert spans_in_dated_file[0]["trace_id"] == "1"