feat(core): Support custom trace storage

This commit is contained in:
FangYin Cheng
2023-11-06 18:56:25 +08:00
parent 47b1d0dad4
commit 5accaa44be
9 changed files with 189 additions and 85 deletions

View File

@@ -1021,6 +1021,7 @@ def run_worker_manager(
system_app, system_app,
os.path.join(LOGDIR, worker_params.tracer_file), os.path.join(LOGDIR, worker_params.tracer_file),
root_operation_name="DB-GPT-WorkerManager-Entry", root_operation_name="DB-GPT-WorkerManager-Entry",
tracer_storage_cls=worker_params.tracer_storage_cls,
) )
_start_local_worker(worker_manager, worker_params) _start_local_worker(worker_manager, worker_params)

View File

@@ -88,6 +88,12 @@ class ModelControllerParameters(BaseParameters):
"help": "The filename to store tracer span records", "help": "The filename to store tracer span records",
}, },
) )
tracer_storage_cls: Optional[str] = field(
default=None,
metadata={
"help": "The storage class to storage tracer span records",
},
)
@dataclass @dataclass
@@ -138,6 +144,12 @@ class ModelAPIServerParameters(BaseParameters):
"help": "The filename to store tracer span records", "help": "The filename to store tracer span records",
}, },
) )
tracer_storage_cls: Optional[str] = field(
default=None,
metadata={
"help": "The storage class to storage tracer span records",
},
)
@dataclass @dataclass
@@ -226,6 +238,12 @@ class ModelWorkerParameters(BaseModelParameters):
"help": "The filename to store tracer span records", "help": "The filename to store tracer span records",
}, },
) )
tracer_storage_cls: Optional[str] = field(
default=None,
metadata={
"help": "The storage class to storage tracer span records",
},
)
@dataclass @dataclass

View File

@@ -147,6 +147,12 @@ class WebWerverParameters(BaseParameters):
"help": "The filename to store tracer span records", "help": "The filename to store tracer span records",
}, },
) )
tracer_storage_cls: Optional[str] = field(
default=None,
metadata={
"help": "The storage class to storage tracer span records",
},
)
disable_alembic_upgrade: Optional[bool] = field( disable_alembic_upgrade: Optional[bool] = field(
default=False, default=False,
metadata={ metadata={

View File

@@ -195,7 +195,11 @@ def run_uvicorn(param: WebWerverParameters):
def run_webserver(param: WebWerverParameters = None): def run_webserver(param: WebWerverParameters = None):
if not param: if not param:
param = _get_webserver_params() param = _get_webserver_params()
initialize_tracer(system_app, os.path.join(LOGDIR, param.tracer_file)) initialize_tracer(
system_app,
os.path.join(LOGDIR, param.tracer_file),
tracer_storage_cls=param.tracer_storage_cls,
)
with root_tracer.start_span( with root_tracer.start_span(
"run_webserver", "run_webserver",

View File

@@ -7,7 +7,11 @@ from pilot.utils.tracer.base import (
SpanStorageType, SpanStorageType,
TracerContext, TracerContext,
) )
from pilot.utils.tracer.span_storage import MemorySpanStorage, FileSpanStorage from pilot.utils.tracer.span_storage import (
MemorySpanStorage,
FileSpanStorage,
SpanStorageContainer,
)
from pilot.utils.tracer.tracer_impl import ( from pilot.utils.tracer.tracer_impl import (
root_tracer, root_tracer,
trace, trace,
@@ -26,6 +30,7 @@ __all__ = [
"TracerContext", "TracerContext",
"MemorySpanStorage", "MemorySpanStorage",
"FileSpanStorage", "FileSpanStorage",
"SpanStorageContainer",
"root_tracer", "root_tracer",
"trace", "trace",
"initialize_tracer", "initialize_tracer",

View File

@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Dict, Callable, Optional from typing import Dict, Callable, Optional, List
from dataclasses import dataclass from dataclasses import dataclass
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
@@ -121,6 +121,11 @@ class SpanStorage(BaseComponent, ABC):
def append_span(self, span: Span): def append_span(self, span: Span):
"""Store the given span. This needs to be implemented by subclasses.""" """Store the given span. This needs to be implemented by subclasses."""
def append_span_batch(self, spans: List[Span]):
"""Store the span batch"""
for span in spans:
self.append_span(span)
class Tracer(BaseComponent, ABC): class Tracer(BaseComponent, ABC):
"""Abstract base class for tracing operations. """Abstract base class for tracing operations.

View File

@@ -5,11 +5,12 @@ import datetime
import threading import threading
import queue import queue
import logging import logging
from typing import Optional, List
from concurrent.futures import Executor, ThreadPoolExecutor
from pilot.component import SystemApp from pilot.component import SystemApp
from pilot.utils.tracer.base import Span, SpanStorage from pilot.utils.tracer.base import Span, SpanStorage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -24,8 +25,81 @@ class MemorySpanStorage(SpanStorage):
self.spans.append(span) self.spans.append(span)
class SpanStorageContainer(SpanStorage):
def __init__(
self,
system_app: SystemApp | None = None,
batch_size=10,
flush_interval=10,
executor: Executor = None,
):
super().__init__(system_app)
if not executor:
executor = ThreadPoolExecutor(thread_name_prefix="trace_storage_sync_")
self.executor = executor
self.storages: List[SpanStorage] = []
self.last_date = (
datetime.datetime.now().date()
) # Store the current date for checking date changes
self.queue = queue.Queue()
self.batch_size = batch_size
self.flush_interval = flush_interval
self.last_flush_time = time.time()
self.flush_signal_queue = queue.Queue()
self.flush_thread = threading.Thread(
target=self._flush_to_storages, daemon=True
)
self.flush_thread.start()
def append_storage(self, storage: SpanStorage):
"""Append sotrage to container
Args:
storage ([`SpanStorage`]): The storage to be append to current container
"""
self.storages.append(storage)
def append_span(self, span: Span):
self.queue.put(span)
if self.queue.qsize() >= self.batch_size:
try:
self.flush_signal_queue.put_nowait(True)
except queue.Full:
pass # If the signal queue is full, it's okay. The flush thread will handle it.
def _flush_to_storages(self):
while True:
interval = time.time() - self.last_flush_time
if interval < self.flush_interval:
try:
self.flush_signal_queue.get(
block=True, timeout=self.flush_interval - interval
)
except Exception:
# Timeout
pass
spans_to_write = []
while not self.queue.empty():
spans_to_write.append(self.queue.get())
for s in self.storages:
def append_and_ignore_error(
storage: SpanStorage, spans_to_write: List[SpanStorage]
):
try:
storage.append_span_batch(spans_to_write)
except Exception as e:
logger.warn(
f"Append spans to storage {str(storage)} failed: {str(e)}, span_data: {spans_to_write}"
)
self.executor.submit(append_and_ignore_error, s, spans_to_write)
self.last_flush_time = time.time()
class FileSpanStorage(SpanStorage): class FileSpanStorage(SpanStorage):
def __init__(self, filename: str, batch_size=10, flush_interval=10): def __init__(self, filename: str):
super().__init__() super().__init__()
self.filename = filename self.filename = filename
# Split filename into prefix and suffix # Split filename into prefix and suffix
@@ -36,29 +110,18 @@ class FileSpanStorage(SpanStorage):
datetime.datetime.now().date() datetime.datetime.now().date()
) # Store the current date for checking date changes ) # Store the current date for checking date changes
self.queue = queue.Queue() self.queue = queue.Queue()
self.batch_size = batch_size
self.flush_interval = flush_interval
self.last_flush_time = time.time()
self.flush_signal_queue = queue.Queue()
if not os.path.exists(filename): if not os.path.exists(filename):
# New file if not exist # New file if not exist
os.makedirs(os.path.dirname(filename), exist_ok=True) os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "a"): with open(filename, "a"):
pass pass
self.flush_thread = threading.Thread(target=self._flush_to_file, daemon=True)
self.flush_thread.start()
def append_span(self, span: Span): def append_span(self, span: Span):
span_data = span.to_dict() self._write_to_file([span])
logger.debug(f"append span: {span_data}")
self.queue.put(span_data)
if self.queue.qsize() >= self.batch_size: def append_span_batch(self, spans: List[Span]):
try: self._write_to_file(spans)
self.flush_signal_queue.put_nowait(True)
except queue.Full:
pass # If the signal queue is full, it's okay. The flush thread will handle it.
def _get_dated_filename(self, date: datetime.date) -> str: def _get_dated_filename(self, date: datetime.date) -> str:
"""Return the filename based on a specific date.""" """Return the filename based on a specific date."""
@@ -73,31 +136,15 @@ class FileSpanStorage(SpanStorage):
os.rename(self.filename, self._get_dated_filename(self.last_date)) os.rename(self.filename, self._get_dated_filename(self.last_date))
self.last_date = current_date self.last_date = current_date
def _write_to_file(self): def _write_to_file(self, spans: List[Span]):
self._roll_over_if_needed() self._roll_over_if_needed()
spans_to_write = []
while not self.queue.empty():
spans_to_write.append(self.queue.get())
with open(self.filename, "a") as file: with open(self.filename, "a") as file:
for span_data in spans_to_write: for span in spans:
span_data = span.to_dict()
try: try:
file.write(json.dumps(span_data, ensure_ascii=False) + "\n") file.write(json.dumps(span_data, ensure_ascii=False) + "\n")
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Write span to file failed: {str(e)}, span_data: {span_data}" f"Write span to file failed: {str(e)}, span_data: {span_data}"
) )
def _flush_to_file(self):
while True:
interval = time.time() - self.last_flush_time
if interval < self.flush_interval:
try:
self.flush_signal_queue.get(
block=True, timeout=self.flush_interval - interval
)
except Exception:
# Timeout
pass
self._write_to_file()
self.last_flush_time = time.time()

View File

@@ -7,44 +7,53 @@ import time
from unittest.mock import patch from unittest.mock import patch
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType from pilot.utils.tracer import (
SpanStorage,
FileSpanStorage,
Span,
SpanType,
SpanStorageContainer,
)
@pytest.fixture @pytest.fixture
def storage(request): def storage(request):
if not request or not hasattr(request, "param"): if not request or not hasattr(request, "param"):
batch_size = 10
flush_interval = 10
file_does_not_exist = False file_does_not_exist = False
else: else:
batch_size = request.param.get("batch_size", 10)
flush_interval = request.param.get("flush_interval", 10)
file_does_not_exist = request.param.get("file_does_not_exist", False) file_does_not_exist = request.param.get("file_does_not_exist", False)
if file_does_not_exist: if file_does_not_exist:
with tempfile.TemporaryDirectory() as tmp_dir: with tempfile.TemporaryDirectory() as tmp_dir:
filename = os.path.join(tmp_dir, "non_existent_file.jsonl") filename = os.path.join(tmp_dir, "non_existent_file.jsonl")
storage_instance = FileSpanStorage( storage_instance = FileSpanStorage(filename)
filename, batch_size=batch_size, flush_interval=flush_interval
)
yield storage_instance yield storage_instance
else: else:
with tempfile.NamedTemporaryFile(delete=True) as tmp_file: with tempfile.NamedTemporaryFile(delete=True) as tmp_file:
filename = tmp_file.name filename = tmp_file.name
storage_instance = FileSpanStorage( storage_instance = FileSpanStorage(filename)
filename, batch_size=batch_size, flush_interval=flush_interval
)
yield storage_instance yield storage_instance
@pytest.fixture
def storage_container(request):
if not request or not hasattr(request, "param"):
batch_size = 10
flush_interval = 10
else:
batch_size = request.param.get("batch_size", 10)
flush_interval = request.param.get("flush_interval", 10)
storage_container = SpanStorageContainer(
batch_size=batch_size, flush_interval=flush_interval
)
yield storage_container
def read_spans_from_file(filename): def read_spans_from_file(filename):
with open(filename, "r") as f: with open(filename, "r") as f:
return [json.loads(line) for line in f.readlines()] return [json.loads(line) for line in f.readlines()]
@pytest.mark.parametrize(
"storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True
)
def test_write_span(storage: SpanStorage): def test_write_span(storage: SpanStorage):
span = Span("1", "a", SpanType.BASE, "b", "op1") span = Span("1", "a", SpanType.BASE, "b", "op1")
storage.append_span(span) storage.append_span(span)
@@ -55,9 +64,6 @@ def test_write_span(storage: SpanStorage):
assert spans_in_file[0]["trace_id"] == "1" assert spans_in_file[0]["trace_id"] == "1"
@pytest.mark.parametrize(
"storage", [{"batch_size": 1, "flush_interval": 5}], indirect=True
)
def test_incremental_write(storage: SpanStorage): def test_incremental_write(storage: SpanStorage):
span1 = Span("1", "a", SpanType.BASE, "b", "op1") span1 = Span("1", "a", SpanType.BASE, "b", "op1")
span2 = Span("2", "c", SpanType.BASE, "d", "op2") span2 = Span("2", "c", SpanType.BASE, "d", "op2")
@@ -70,9 +76,6 @@ def test_incremental_write(storage: SpanStorage):
assert len(spans_in_file) == 2 assert len(spans_in_file) == 2
@pytest.mark.parametrize(
"storage", [{"batch_size": 2, "flush_interval": 5}], indirect=True
)
def test_sync_and_async_append(storage: SpanStorage): def test_sync_and_async_append(storage: SpanStorage):
span = Span("1", "a", SpanType.BASE, "b", "op1") span = Span("1", "a", SpanType.BASE, "b", "op1")
@@ -88,27 +91,7 @@ def test_sync_and_async_append(storage: SpanStorage):
assert len(spans_in_file) == 2 assert len(spans_in_file) == 2
@pytest.mark.asyncio @pytest.mark.parametrize("storage", [{"file_does_not_exist": True}], indirect=True)
async def test_flush_policy(storage: SpanStorage):
span = Span("1", "a", SpanType.BASE, "b", "op1")
for _ in range(storage.batch_size - 1):
storage.append_span(span)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 0
# Trigger batch write
storage.append_span(span)
await asyncio.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == storage.batch_size
@pytest.mark.parametrize(
"storage", [{"batch_size": 2, "file_does_not_exist": True}], indirect=True
)
def test_non_existent_file(storage: SpanStorage): def test_non_existent_file(storage: SpanStorage):
span = Span("1", "a", SpanType.BASE, "b", "op1") span = Span("1", "a", SpanType.BASE, "b", "op1")
span2 = Span("2", "c", SpanType.BASE, "d", "op2") span2 = Span("2", "c", SpanType.BASE, "d", "op2")
@@ -116,7 +99,7 @@ def test_non_existent_file(storage: SpanStorage):
time.sleep(0.1) time.sleep(0.1)
spans_in_file = read_spans_from_file(storage.filename) spans_in_file = read_spans_from_file(storage.filename)
assert len(spans_in_file) == 0 assert len(spans_in_file) == 1
storage.append_span(span2) storage.append_span(span2)
time.sleep(0.1) time.sleep(0.1)
@@ -126,9 +109,7 @@ def test_non_existent_file(storage: SpanStorage):
assert spans_in_file[1]["trace_id"] == "2" assert spans_in_file[1]["trace_id"] == "2"
@pytest.mark.parametrize( @pytest.mark.parametrize("storage", [{"file_does_not_exist": True}], indirect=True)
"storage", [{"batch_size": 1, "file_does_not_exist": True}], indirect=True
)
def test_log_rollover(storage: SpanStorage): def test_log_rollover(storage: SpanStorage):
# mock start date # mock start date
mock_start_date = datetime(2023, 10, 18, 23, 59) mock_start_date = datetime(2023, 10, 18, 23, 59)
@@ -167,3 +148,27 @@ def test_log_rollover(storage: SpanStorage):
spans_in_dated_file = read_spans_from_file(dated_filename) spans_in_dated_file = read_spans_from_file(dated_filename)
assert len(spans_in_dated_file) == 1 assert len(spans_in_dated_file) == 1
assert spans_in_dated_file[0]["trace_id"] == "1" assert spans_in_dated_file[0]["trace_id"] == "1"
@pytest.mark.asyncio
@pytest.mark.parametrize("storage_container", [{"batch_size": 5}], indirect=True)
async def test_container_flush_policy(
storage_container: SpanStorageContainer, storage: FileSpanStorage
):
storage_container.append_storage(storage)
span = Span("1", "a", SpanType.BASE, "b", "op1")
filename = storage.filename
for _ in range(storage_container.batch_size - 1):
storage_container.append_span(span)
spans_in_file = read_spans_from_file(filename)
assert len(spans_in_file) == 0
# Trigger batch write
storage_container.append_span(span)
await asyncio.sleep(0.1)
spans_in_file = read_spans_from_file(filename)
assert len(spans_in_file) == storage_container.batch_size

View File

@@ -3,6 +3,7 @@ from contextvars import ContextVar
from functools import wraps from functools import wraps
import asyncio import asyncio
import inspect import inspect
import logging
from pilot.component import SystemApp, ComponentType from pilot.component import SystemApp, ComponentType
@@ -15,6 +16,9 @@ from pilot.utils.tracer.base import (
TracerContext, TracerContext,
) )
from pilot.utils.tracer.span_storage import MemorySpanStorage from pilot.utils.tracer.span_storage import MemorySpanStorage
from pilot.utils.module_utils import import_from_checked_string
logger = logging.getLogger(__name__)
class DefaultTracer(Tracer): class DefaultTracer(Tracer):
@@ -197,10 +201,11 @@ def initialize_tracer(
system_app: SystemApp, system_app: SystemApp,
tracer_filename: str, tracer_filename: str,
root_operation_name: str = "DB-GPT-Web-Entry", root_operation_name: str = "DB-GPT-Web-Entry",
tracer_storage_cls: str = None,
): ):
if not system_app: if not system_app:
return return
from pilot.utils.tracer.span_storage import FileSpanStorage from pilot.utils.tracer.span_storage import FileSpanStorage, SpanStorageContainer
trace_context_var = ContextVar( trace_context_var = ContextVar(
"trace_context", "trace_context",
@@ -208,7 +213,15 @@ def initialize_tracer(
) )
tracer = DefaultTracer(system_app) tracer = DefaultTracer(system_app)
system_app.register_instance(FileSpanStorage(tracer_filename)) storage_container = SpanStorageContainer(system_app)
storage_container.append_storage(FileSpanStorage(tracer_filename))
if tracer_storage_cls:
logger.info(f"Begin parse storage class {tracer_storage_cls}")
storage = import_from_checked_string(tracer_storage_cls, SpanStorage)
storage_container.append_storage(storage())
system_app.register_instance(storage_container)
system_app.register_instance(tracer) system_app.register_instance(tracer)
root_tracer.initialize(system_app, trace_context_var) root_tracer.initialize(system_app, trace_context_var)
if system_app.app: if system_app.app: