mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-08-03 01:12:15 +00:00
164 lines
5.6 KiB
Python
164 lines
5.6 KiB
Python
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
from concurrent.futures import Executor, ThreadPoolExecutor
|
|
from typing import List, Optional
|
|
|
|
from dbgpt.component import SystemApp
|
|
from dbgpt.util.tracer.base import Span, SpanStorage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MemorySpanStorage(SpanStorage):
|
|
def __init__(self, system_app: SystemApp | None = None):
|
|
super().__init__(system_app)
|
|
self.spans = []
|
|
self._lock = threading.Lock()
|
|
|
|
def append_span(self, span: Span):
|
|
with self._lock:
|
|
self.spans.append(span)
|
|
|
|
|
|
class SpanStorageContainer(SpanStorage):
|
|
def __init__(
|
|
self,
|
|
system_app: SystemApp | None = None,
|
|
batch_size=10,
|
|
flush_interval=10,
|
|
executor: Executor = None,
|
|
):
|
|
super().__init__(system_app)
|
|
if not executor:
|
|
executor = ThreadPoolExecutor(thread_name_prefix="trace_storage_sync_")
|
|
self.executor = executor
|
|
self.storages: List[SpanStorage] = []
|
|
self.last_date = (
|
|
datetime.datetime.now().date()
|
|
) # Store the current date for checking date changes
|
|
self.queue = queue.Queue()
|
|
self.batch_size = batch_size
|
|
self.flush_interval = flush_interval
|
|
self.last_flush_time = time.time()
|
|
self.flush_signal_queue = queue.Queue()
|
|
self.flush_thread = threading.Thread(
|
|
target=self._flush_to_storages, daemon=True
|
|
)
|
|
self._stop_event = threading.Event()
|
|
self.flush_thread.start()
|
|
self._stop_event.clear()
|
|
|
|
def append_storage(self, storage: SpanStorage):
|
|
"""Append sotrage to container
|
|
|
|
Args:
|
|
storage ([`SpanStorage`]): The storage to be append to current container
|
|
"""
|
|
self.storages.append(storage)
|
|
|
|
def append_span(self, span: Span):
|
|
self.queue.put(span)
|
|
if self.queue.qsize() >= self.batch_size:
|
|
try:
|
|
self.flush_signal_queue.put_nowait(True)
|
|
except queue.Full:
|
|
pass # If the signal queue is full, it's okay. The flush thread will handle it.
|
|
|
|
def _flush_to_storages(self):
|
|
while not self._stop_event.is_set():
|
|
interval = time.time() - self.last_flush_time
|
|
if interval < self.flush_interval:
|
|
try:
|
|
self.flush_signal_queue.get(
|
|
block=True, timeout=self.flush_interval - interval
|
|
)
|
|
except Exception:
|
|
# Timeout
|
|
pass
|
|
|
|
spans_to_write = []
|
|
while not self.queue.empty():
|
|
spans_to_write.append(self.queue.get())
|
|
for s in self.storages:
|
|
|
|
def append_and_ignore_error(
|
|
storage: SpanStorage, spans_to_write: List[SpanStorage]
|
|
):
|
|
try:
|
|
storage.append_span_batch(spans_to_write)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Append spans to storage {str(storage)} failed: {str(e)}, span_data: {spans_to_write}"
|
|
)
|
|
|
|
try:
|
|
self.executor.submit(append_and_ignore_error, s, spans_to_write)
|
|
except RuntimeError:
|
|
append_and_ignore_error(s, spans_to_write)
|
|
self.last_flush_time = time.time()
|
|
|
|
def before_stop(self):
|
|
try:
|
|
self.flush_signal_queue.put(True)
|
|
self._stop_event.set()
|
|
self.flush_thread.join()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class FileSpanStorage(SpanStorage):
|
|
def __init__(self, filename: str):
|
|
super().__init__()
|
|
self.filename = filename
|
|
# Split filename into prefix and suffix
|
|
self.filename_prefix, self.filename_suffix = os.path.splitext(filename)
|
|
if not self.filename_suffix:
|
|
self.filename_suffix = ".log"
|
|
self.last_date = (
|
|
datetime.datetime.now().date()
|
|
) # Store the current date for checking date changes
|
|
self.queue = queue.Queue()
|
|
|
|
if not os.path.exists(filename):
|
|
# New file if not exist
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
with open(filename, "a"):
|
|
pass
|
|
|
|
def append_span(self, span: Span):
|
|
self._write_to_file([span])
|
|
|
|
def append_span_batch(self, spans: List[Span]):
|
|
self._write_to_file(spans)
|
|
|
|
def _get_dated_filename(self, date: datetime.date) -> str:
|
|
"""Return the filename based on a specific date."""
|
|
date_str = date.strftime("%Y-%m-%d")
|
|
return f"{self.filename_prefix}_{date_str}{self.filename_suffix}"
|
|
|
|
def _roll_over_if_needed(self):
|
|
"""Checks if a day has changed since the last write, and if so, renames the current file."""
|
|
current_date = datetime.datetime.now().date()
|
|
if current_date != self.last_date:
|
|
if os.path.exists(self.filename):
|
|
os.rename(self.filename, self._get_dated_filename(self.last_date))
|
|
self.last_date = current_date
|
|
|
|
def _write_to_file(self, spans: List[Span]):
|
|
self._roll_over_if_needed()
|
|
|
|
with open(self.filename, "a", encoding="utf8") as file:
|
|
for span in spans:
|
|
span_data = span.to_dict()
|
|
try:
|
|
file.write(json.dumps(span_data, ensure_ascii=False) + "\n")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Write span to file failed: {str(e)}, span_data: {span_data}"
|
|
)
|