Merge remote-tracking branch 'origin/main' into Agent_Hub_Dev

This commit is contained in:
aries_ckt 2023-10-18 20:34:44 +08:00
commit 936094f2ce
7 changed files with 128 additions and 9 deletions

View File

@ -185,7 +185,7 @@ def run_model_controller():
setup_logging(
"pilot",
logging_level=controller_params.log_level,
logger_filename="dbgpt_model_controller.log",
logger_filename=controller_params.log_file,
)
initialize_controller(host=controller_params.host, port=controller_params.port)

View File

@ -1006,7 +1006,7 @@ def run_worker_manager(
setup_logging(
"pilot",
logging_level=worker_params.log_level,
logger_filename="dbgpt_model_worker_manager.log",
logger_filename=worker_params.log_file,
)
embedded_mod = True
@ -1019,7 +1019,7 @@ def run_worker_manager(
system_app = SystemApp(app)
initialize_tracer(
system_app,
os.path.join(LOGDIR, "dbgpt_model_worker_manager_tracer.jsonl"),
os.path.join(LOGDIR, worker_params.tracer_file),
root_operation_name="DB-GPT-WorkerManager-Entry",
)

View File

@ -46,6 +46,18 @@ class ModelControllerParameters(BaseParameters):
],
},
)
log_file: Optional[str] = field(
default="dbgpt_model_controller.log",
metadata={
"help": "The filename to store log",
},
)
tracer_file: Optional[str] = field(
default="dbgpt_model_controller_tracer.jsonl",
metadata={
"help": "The filename to store tracer span records",
},
)
@dataclass
@ -122,6 +134,18 @@ class ModelWorkerParameters(BaseModelParameters):
],
},
)
log_file: Optional[str] = field(
default="dbgpt_model_worker_manager.log",
metadata={
"help": "The filename to store log",
},
)
tracer_file: Optional[str] = field(
default="dbgpt_model_worker_manager_tracer.jsonl",
metadata={
"help": "The filename to store tracer span records",
},
)
@dataclass

View File

@ -94,6 +94,19 @@ class WebWerverParameters(BaseParameters):
daemon: Optional[bool] = field(
default=False, metadata={"help": "Run Webserver in background"}
)
controller_addr: Optional[str] = field(
default=None,
metadata={
"help": "The Model controller address to connect. If None, read model controller address from environment key `MODEL_SERVER`."
},
)
model_name: str = field(
default=None,
metadata={
"help": "The default model name to use. If None, read model name from environment key `LLM_MODEL`.",
"tags": "fixed",
},
)
share: Optional[bool] = field(
default=False,
metadata={
@ -122,3 +135,15 @@ class WebWerverParameters(BaseParameters):
},
)
light: Optional[bool] = field(default=False, metadata={"help": "enable light mode"})
log_file: Optional[str] = field(
default="dbgpt_webserver.log",
metadata={
"help": "The filename to store log",
},
)
tracer_file: Optional[str] = field(
default="dbgpt_webserver_tracer.jsonl",
metadata={
"help": "The filename to store tracer span records",
},
)

View File

@ -122,7 +122,7 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
if not param.log_level:
param.log_level = _get_logging_level()
setup_logging(
"pilot", logging_level=param.log_level, logger_filename="dbgpt_webserver.log"
"pilot", logging_level=param.log_level, logger_filename=param.log_file
)
# Before start
system_app.before_start()
@ -136,14 +136,16 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
model_start_listener = _create_model_start_listener(system_app)
initialize_components(param, system_app, embedding_model_name, embedding_model_path)
model_path = LLM_MODEL_CONFIG.get(CFG.LLM_MODEL)
model_name = param.model_name or CFG.LLM_MODEL
model_path = LLM_MODEL_CONFIG.get(model_name)
if not param.light:
print("Model Unified Deployment Mode!")
if not param.remote_embedding:
embedding_model_name, embedding_model_path = None, None
initialize_worker_manager_in_client(
app=app,
model_name=CFG.LLM_MODEL,
model_name=model_name,
model_path=model_path,
local_port=param.port,
embedding_model_name=embedding_model_name,
@ -155,12 +157,13 @@ def initialize_app(param: WebWerverParameters = None, args: List[str] = None):
CFG.NEW_SERVER_MODE = True
else:
# MODEL_SERVER is controller address now
controller_addr = param.controller_addr or CFG.MODEL_SERVER
initialize_worker_manager_in_client(
app=app,
model_name=CFG.LLM_MODEL,
model_name=model_name,
model_path=model_path,
run_locally=False,
controller_addr=CFG.MODEL_SERVER,
controller_addr=controller_addr,
local_port=param.port,
start_listener=model_start_listener,
system_app=system_app,
@ -185,7 +188,7 @@ def run_uvicorn(param: WebWerverParameters):
def run_webserver(param: WebWerverParameters = None):
if not param:
param = _get_webserver_params()
initialize_tracer(system_app, os.path.join(LOGDIR, "dbgpt_webserver_tracer.jsonl"))
initialize_tracer(system_app, os.path.join(LOGDIR, param.tracer_file))
with root_tracer.start_span(
"run_webserver",

View File

@ -1,6 +1,7 @@
import os
import json
import time
import datetime
import threading
import queue
import logging
@ -27,6 +28,13 @@ class FileSpanStorage(SpanStorage):
def __init__(self, filename: str, batch_size=10, flush_interval=10):
super().__init__()
self.filename = filename
# Split filename into prefix and suffix
self.filename_prefix, self.filename_suffix = os.path.splitext(filename)
if not self.filename_suffix:
self.filename_suffix = ".log"
self.last_date = (
datetime.datetime.now().date()
) # Store the current date for checking date changes
self.queue = queue.Queue()
self.batch_size = batch_size
self.flush_interval = flush_interval
@ -52,7 +60,21 @@ class FileSpanStorage(SpanStorage):
except queue.Full:
pass # If the signal queue is full, it's okay. The flush thread will handle it.
def _get_dated_filename(self, date: datetime.date) -> str:
"""Return the filename based on a specific date."""
date_str = date.strftime("%Y-%m-%d")
return f"{self.filename_prefix}_{date_str}{self.filename_suffix}"
def _roll_over_if_needed(self):
"""Checks if a day has changed since the last write, and if so, renames the current file."""
current_date = datetime.datetime.now().date()
if current_date != self.last_date:
if os.path.exists(self.filename):
os.rename(self.filename, self._get_dated_filename(self.last_date))
self.last_date = current_date
def _write_to_file(self):
self._roll_over_if_needed()
spans_to_write = []
while not self.queue.empty():
spans_to_write.append(self.queue.get())

View File

@ -4,6 +4,8 @@ import asyncio
import json
import tempfile
import time
from unittest.mock import patch
from datetime import datetime, timedelta
from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType
@ -122,3 +124,46 @@ def test_non_existent_file(storage: SpanStorage):
assert len(spans_in_file) == 2
assert spans_in_file[0]["trace_id"] == "1"
assert spans_in_file[1]["trace_id"] == "2"
@pytest.mark.parametrize(
"storage", [{"batch_size": 1, "file_does_not_exist": True}], indirect=True
)
def test_log_rollover(storage: SpanStorage):
# mock start date
mock_start_date = datetime(2023, 10, 18, 23, 59)
with patch("datetime.datetime") as mock_datetime:
mock_datetime.now.return_value = mock_start_date
span1 = Span("1", "a", SpanType.BASE, "b", "op1")
storage.append_span(span1)
time.sleep(0.1)
# mock new day
mock_datetime.now.return_value = mock_start_date + timedelta(minutes=1)
span2 = Span("2", "c", SpanType.BASE, "d", "op2")
storage.append_span(span2)
time.sleep(0.1)
# origin filename need exists
assert os.path.exists(storage.filename)
# get roll over filename
dated_filename = os.path.join(
os.path.dirname(storage.filename),
f"{os.path.basename(storage.filename).split('.')[0]}_2023-10-18.jsonl",
)
assert os.path.exists(dated_filename)
# check origin filename just include the second span
spans_in_original_file = read_spans_from_file(storage.filename)
assert len(spans_in_original_file) == 1
assert spans_in_original_file[0]["trace_id"] == "2"
# check the roll over filename just include the first span
spans_in_dated_file = read_spans_from_file(dated_filename)
assert len(spans_in_dated_file) == 1
assert spans_in_dated_file[0]["trace_id"] == "1"