From 5accaa44beeb2f560ba19d7f4f2ac562b4a9b4f3 Mon Sep 17 00:00:00 2001 From: FangYin Cheng Date: Mon, 6 Nov 2023 18:56:25 +0800 Subject: [PATCH] feat(core): Support custom trace storage --- pilot/model/cluster/worker/manager.py | 1 + pilot/model/parameter.py | 18 +++ pilot/server/base.py | 6 + pilot/server/dbgpt_server.py | 6 +- pilot/utils/tracer/__init__.py | 7 +- pilot/utils/tracer/base.py | 7 +- pilot/utils/tracer/span_storage.py | 117 ++++++++++++------ pilot/utils/tracer/tests/test_span_storage.py | 95 +++++++------- pilot/utils/tracer/tracer_impl.py | 17 ++- 9 files changed, 189 insertions(+), 85 deletions(-) diff --git a/pilot/model/cluster/worker/manager.py b/pilot/model/cluster/worker/manager.py index 2dcfb086e..d67519f59 100644 --- a/pilot/model/cluster/worker/manager.py +++ b/pilot/model/cluster/worker/manager.py @@ -1021,6 +1021,7 @@ def run_worker_manager( system_app, os.path.join(LOGDIR, worker_params.tracer_file), root_operation_name="DB-GPT-WorkerManager-Entry", + tracer_storage_cls=worker_params.tracer_storage_cls, ) _start_local_worker(worker_manager, worker_params) diff --git a/pilot/model/parameter.py b/pilot/model/parameter.py index e21de1c42..79558e02c 100644 --- a/pilot/model/parameter.py +++ b/pilot/model/parameter.py @@ -88,6 +88,12 @@ class ModelControllerParameters(BaseParameters): "help": "The filename to store tracer span records", }, ) + tracer_storage_cls: Optional[str] = field( + default=None, + metadata={ + "help": "The storage class to storage tracer span records", + }, + ) @dataclass @@ -138,6 +144,12 @@ class ModelAPIServerParameters(BaseParameters): "help": "The filename to store tracer span records", }, ) + tracer_storage_cls: Optional[str] = field( + default=None, + metadata={ + "help": "The storage class to storage tracer span records", + }, + ) @dataclass @@ -226,6 +238,12 @@ class ModelWorkerParameters(BaseModelParameters): "help": "The filename to store tracer span records", }, ) + tracer_storage_cls: Optional[str] = field( + default=None, + metadata={ + "help": "The storage class to storage tracer span records", + }, + ) @dataclass diff --git a/pilot/server/base.py b/pilot/server/base.py index 71faeb821..488c919c3 100644 --- a/pilot/server/base.py +++ b/pilot/server/base.py @@ -147,6 +147,12 @@ class WebWerverParameters(BaseParameters): "help": "The filename to store tracer span records", }, ) + tracer_storage_cls: Optional[str] = field( + default=None, + metadata={ + "help": "The storage class to storage tracer span records", + }, + ) disable_alembic_upgrade: Optional[bool] = field( default=False, metadata={ diff --git a/pilot/server/dbgpt_server.py b/pilot/server/dbgpt_server.py index e94526b9a..c26803cfd 100644 --- a/pilot/server/dbgpt_server.py +++ b/pilot/server/dbgpt_server.py @@ -195,7 +195,11 @@ def run_uvicorn(param: WebWerverParameters): def run_webserver(param: WebWerverParameters = None): if not param: param = _get_webserver_params() - initialize_tracer(system_app, os.path.join(LOGDIR, param.tracer_file)) + initialize_tracer( + system_app, + os.path.join(LOGDIR, param.tracer_file), + tracer_storage_cls=param.tracer_storage_cls, + ) with root_tracer.start_span( "run_webserver", diff --git a/pilot/utils/tracer/__init__.py b/pilot/utils/tracer/__init__.py index cdb536f79..6f77cfd6c 100644 --- a/pilot/utils/tracer/__init__.py +++ b/pilot/utils/tracer/__init__.py @@ -7,7 +7,11 @@ from pilot.utils.tracer.base import ( SpanStorageType, TracerContext, ) -from pilot.utils.tracer.span_storage import MemorySpanStorage, FileSpanStorage +from pilot.utils.tracer.span_storage import ( + MemorySpanStorage, + FileSpanStorage, + SpanStorageContainer, +) from pilot.utils.tracer.tracer_impl import ( root_tracer, trace, @@ -26,6 +30,7 @@ __all__ = [ "TracerContext", "MemorySpanStorage", "FileSpanStorage", + "SpanStorageContainer", "root_tracer", "trace", "initialize_tracer", diff --git a/pilot/utils/tracer/base.py b/pilot/utils/tracer/base.py index e227d6314..625e9aabd 100644 --- a/pilot/utils/tracer/base.py +++ b/pilot/utils/tracer/base.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Dict, Callable, Optional +from typing import Dict, Callable, Optional, List from dataclasses import dataclass from abc import ABC, abstractmethod from enum import Enum @@ -121,6 +121,11 @@ class SpanStorage(BaseComponent, ABC): def append_span(self, span: Span): """Store the given span. This needs to be implemented by subclasses.""" + def append_span_batch(self, spans: List[Span]): + """Store the span batch""" + for span in spans: + self.append_span(span) + class Tracer(BaseComponent, ABC): """Abstract base class for tracing operations. diff --git a/pilot/utils/tracer/span_storage.py b/pilot/utils/tracer/span_storage.py index 3070fb834..57321a316 100644 --- a/pilot/utils/tracer/span_storage.py +++ b/pilot/utils/tracer/span_storage.py @@ -5,11 +5,12 @@ import datetime import threading import queue import logging +from typing import Optional, List +from concurrent.futures import Executor, ThreadPoolExecutor from pilot.component import SystemApp from pilot.utils.tracer.base import Span, SpanStorage - logger = logging.getLogger(__name__) @@ -24,8 +25,81 @@ class MemorySpanStorage(SpanStorage): self.spans.append(span) +class SpanStorageContainer(SpanStorage): + def __init__( + self, + system_app: SystemApp | None = None, + batch_size=10, + flush_interval=10, + executor: Executor = None, + ): + super().__init__(system_app) + if not executor: + executor = ThreadPoolExecutor(thread_name_prefix="trace_storage_sync_") + self.executor = executor + self.storages: List[SpanStorage] = [] + self.last_date = ( + datetime.datetime.now().date() + ) # Store the current date for checking date changes + self.queue = queue.Queue() + self.batch_size = batch_size + self.flush_interval = flush_interval + self.last_flush_time = time.time() + self.flush_signal_queue = queue.Queue() + self.flush_thread = threading.Thread( + target=self._flush_to_storages, daemon=True + ) + self.flush_thread.start() + + def append_storage(self, storage: SpanStorage): + """Append sotrage to container + + Args: + storage ([`SpanStorage`]): The storage to be append to current container + """ + self.storages.append(storage) + + def append_span(self, span: Span): + self.queue.put(span) + if self.queue.qsize() >= self.batch_size: + try: + self.flush_signal_queue.put_nowait(True) + except queue.Full: + pass # If the signal queue is full, it's okay. The flush thread will handle it. + + def _flush_to_storages(self): + while True: + interval = time.time() - self.last_flush_time + if interval < self.flush_interval: + try: + self.flush_signal_queue.get( + block=True, timeout=self.flush_interval - interval + ) + except Exception: + # Timeout + pass + + spans_to_write = [] + while not self.queue.empty(): + spans_to_write.append(self.queue.get()) + for s in self.storages: + + def append_and_ignore_error( + storage: SpanStorage, spans_to_write: List[SpanStorage] + ): + try: + storage.append_span_batch(spans_to_write) + except Exception as e: + logger.warn( + f"Append spans to storage {str(storage)} failed: {str(e)}, span_data: {spans_to_write}" + ) + + self.executor.submit(append_and_ignore_error, s, spans_to_write) + self.last_flush_time = time.time() + + class FileSpanStorage(SpanStorage): - def __init__(self, filename: str, batch_size=10, flush_interval=10): + def __init__(self, filename: str): super().__init__() self.filename = filename # Split filename into prefix and suffix @@ -36,29 +110,18 @@ class FileSpanStorage(SpanStorage): datetime.datetime.now().date() ) # Store the current date for checking date changes self.queue = queue.Queue() - self.batch_size = batch_size - self.flush_interval = flush_interval - self.last_flush_time = time.time() - self.flush_signal_queue = queue.Queue() if not os.path.exists(filename): # New file if not exist os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "a"): pass - self.flush_thread = threading.Thread(target=self._flush_to_file, daemon=True) - self.flush_thread.start() def append_span(self, span: Span): - span_data = span.to_dict() - logger.debug(f"append span: {span_data}") - self.queue.put(span_data) + self._write_to_file([span]) - if self.queue.qsize() >= self.batch_size: - try: - self.flush_signal_queue.put_nowait(True) - except queue.Full: - pass # If the signal queue is full, it's okay. The flush thread will handle it. + def append_span_batch(self, spans: List[Span]): + self._write_to_file(spans) def _get_dated_filename(self, date: datetime.date) -> str: """Return the filename based on a specific date.""" @@ -73,31 +136,15 @@ class FileSpanStorage(SpanStorage): os.rename(self.filename, self._get_dated_filename(self.last_date)) self.last_date = current_date - def _write_to_file(self): + def _write_to_file(self, spans: List[Span]): self._roll_over_if_needed() - spans_to_write = [] - while not self.queue.empty(): - spans_to_write.append(self.queue.get()) with open(self.filename, "a") as file: - for span_data in spans_to_write: + for span in spans: + span_data = span.to_dict() try: file.write(json.dumps(span_data, ensure_ascii=False) + "\n") except Exception as e: logger.warning( f"Write span to file failed: {str(e)}, span_data: {span_data}" ) - - def _flush_to_file(self): - while True: - interval = time.time() - self.last_flush_time - if interval < self.flush_interval: - try: - self.flush_signal_queue.get( - block=True, timeout=self.flush_interval - interval - ) - except Exception: - # Timeout - pass - self._write_to_file() - self.last_flush_time = time.time() diff --git a/pilot/utils/tracer/tests/test_span_storage.py b/pilot/utils/tracer/tests/test_span_storage.py index 9ca727995..6da0797fc 100644 --- a/pilot/utils/tracer/tests/test_span_storage.py +++ b/pilot/utils/tracer/tests/test_span_storage.py @@ -7,44 +7,53 @@ import time from unittest.mock import patch from datetime import datetime, timedelta -from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType +from pilot.utils.tracer import ( + SpanStorage, + FileSpanStorage, + Span, + SpanType, + SpanStorageContainer, +) @pytest.fixture def storage(request): if not request or not hasattr(request, "param"): - batch_size = 10 - flush_interval = 10 file_does_not_exist = False else: - batch_size = request.param.get("batch_size", 10) - flush_interval = request.param.get("flush_interval", 10) file_does_not_exist = request.param.get("file_does_not_exist", False) if file_does_not_exist: with tempfile.TemporaryDirectory() as tmp_dir: filename = os.path.join(tmp_dir, "non_existent_file.jsonl") - storage_instance = FileSpanStorage( - filename, batch_size=batch_size, flush_interval=flush_interval - ) + storage_instance = FileSpanStorage(filename) yield storage_instance else: with tempfile.NamedTemporaryFile(delete=True) as tmp_file: filename = tmp_file.name - storage_instance = FileSpanStorage( - filename, batch_size=batch_size, flush_interval=flush_interval - ) + storage_instance = FileSpanStorage(filename) yield storage_instance +@pytest.fixture +def storage_container(request): + if not request or not hasattr(request, "param"): + batch_size = 10 + flush_interval = 10 + else: + batch_size = request.param.get("batch_size", 10) + flush_interval = request.param.get("flush_interval", 10) + storage_container = SpanStorageContainer( + batch_size=batch_size, flush_interval=flush_interval + ) + yield storage_container + + def read_spans_from_file(filename): with open(filename, "r") as f: return [json.loads(line) for line in f.readlines()] -@pytest.mark.parametrize( - "storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True -) def test_write_span(storage: SpanStorage): span = Span("1", "a", SpanType.BASE, "b", "op1") storage.append_span(span) @@ -55,9 +64,6 @@ def test_write_span(storage: SpanStorage): assert spans_in_file[0]["trace_id"] == "1" -@pytest.mark.parametrize( - "storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True -) def test_incremental_write(storage: SpanStorage): span1 = Span("1", "a", SpanType.BASE, "b", "op1") span2 = Span("2", "c", SpanType.BASE, "d", "op2") @@ -70,9 +76,6 @@ def test_incremental_write(storage: SpanStorage): assert len(spans_in_file) == 2 -@pytest.mark.parametrize( - "storage", [{"batch_size": 2, "flush_interval": 5}], indirect=True -) def test_sync_and_async_append(storage: SpanStorage): span = Span("1", "a", SpanType.BASE, "b", "op1") @@ -88,27 +91,7 @@ def test_sync_and_async_append(storage: SpanStorage): assert len(spans_in_file) == 2 -@pytest.mark.asyncio -async def test_flush_policy(storage: SpanStorage): - span = Span("1", "a", SpanType.BASE, "b", "op1") - - for _ in range(storage.batch_size - 1): - storage.append_span(span) - - spans_in_file = read_spans_from_file(storage.filename) - assert len(spans_in_file) == 0 - - # Trigger batch write - storage.append_span(span) - await asyncio.sleep(0.1) - - spans_in_file = read_spans_from_file(storage.filename) - assert len(spans_in_file) == storage.batch_size - - -@pytest.mark.parametrize( - "storage", [{"batch_size": 2, "file_does_not_exist": True}], indirect=True -) +@pytest.mark.parametrize("storage", [{"file_does_not_exist": True}], indirect=True) def test_non_existent_file(storage: SpanStorage): span = Span("1", "a", SpanType.BASE, "b", "op1") span2 = Span("2", "c", SpanType.BASE, "d", "op2") @@ -116,7 +99,7 @@ def test_non_existent_file(storage: SpanStorage): time.sleep(0.1) spans_in_file = read_spans_from_file(storage.filename) - assert len(spans_in_file) == 0 + assert len(spans_in_file) == 1 storage.append_span(span2) time.sleep(0.1) @@ -126,9 +109,7 @@ def test_non_existent_file(storage: SpanStorage): assert spans_in_file[1]["trace_id"] == "2" -@pytest.mark.parametrize( - "storage", [{"batch_size": 1, "file_does_not_exist": True}], indirect=True -) +@pytest.mark.parametrize("storage", [{"file_does_not_exist": True}], indirect=True) def test_log_rollover(storage: SpanStorage): # mock start date mock_start_date = datetime(2023, 10, 18, 23, 59) @@ -167,3 +148,27 @@ def test_log_rollover(storage: SpanStorage): spans_in_dated_file = read_spans_from_file(dated_filename) assert len(spans_in_dated_file) == 1 assert spans_in_dated_file[0]["trace_id"] == "1" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("storage_container", [{"batch_size": 5}], indirect=True) +async def test_container_flush_policy( + storage_container: SpanStorageContainer, storage: FileSpanStorage +): + storage_container.append_storage(storage) + span = Span("1", "a", SpanType.BASE, "b", "op1") + + filename = storage.filename + + for _ in range(storage_container.batch_size - 1): + storage_container.append_span(span) + + spans_in_file = read_spans_from_file(filename) + assert len(spans_in_file) == 0 + + # Trigger batch write + storage_container.append_span(span) + await asyncio.sleep(0.1) + + spans_in_file = read_spans_from_file(filename) + assert len(spans_in_file) == storage_container.batch_size diff --git a/pilot/utils/tracer/tracer_impl.py b/pilot/utils/tracer/tracer_impl.py index 2358863bf..6bbad084c 100644 --- a/pilot/utils/tracer/tracer_impl.py +++ b/pilot/utils/tracer/tracer_impl.py @@ -3,6 +3,7 @@ from contextvars import ContextVar from functools import wraps import asyncio import inspect +import logging from pilot.component import SystemApp, ComponentType @@ -15,6 +16,9 @@ from pilot.utils.tracer.base import ( TracerContext, ) from pilot.utils.tracer.span_storage import MemorySpanStorage +from pilot.utils.module_utils import import_from_checked_string + +logger = logging.getLogger(__name__) class DefaultTracer(Tracer): @@ -197,10 +201,11 @@ def initialize_tracer( system_app: SystemApp, tracer_filename: str, root_operation_name: str = "DB-GPT-Web-Entry", + tracer_storage_cls: str = None, ): if not system_app: return - from pilot.utils.tracer.span_storage import FileSpanStorage + from pilot.utils.tracer.span_storage import FileSpanStorage, SpanStorageContainer trace_context_var = ContextVar( "trace_context", @@ -208,7 +213,15 @@ def initialize_tracer( ) tracer = DefaultTracer(system_app) - system_app.register_instance(FileSpanStorage(tracer_filename)) + storage_container = SpanStorageContainer(system_app) + storage_container.append_storage(FileSpanStorage(tracer_filename)) + + if tracer_storage_cls: + logger.info(f"Begin parse storage class {tracer_storage_cls}") + storage = import_from_checked_string(tracer_storage_cls, SpanStorage) + storage_container.append_storage(storage()) + + system_app.register_instance(storage_container) system_app.register_instance(tracer) root_tracer.initialize(system_app, trace_context_var) if system_app.app: