feat(core): The tracer FileSpanStorage support file roll over

This commit is contained in:
FangYin Cheng 2023-10-18 18:47:47 +08:00
parent 79846db65e
commit 2e750f6b9c
2 changed files with 67 additions and 0 deletions

View File

@ -1,6 +1,7 @@
import os import os
import json import json
import time import time
import datetime
import threading import threading
import queue import queue
import logging import logging
@ -27,6 +28,13 @@ class FileSpanStorage(SpanStorage):
def __init__(self, filename: str, batch_size=10, flush_interval=10): def __init__(self, filename: str, batch_size=10, flush_interval=10):
super().__init__() super().__init__()
self.filename = filename self.filename = filename
# Split filename into prefix and suffix
self.filename_prefix, self.filename_suffix = os.path.splitext(filename)
if not self.filename_suffix:
self.filename_suffix = ".log"
self.last_date = (
datetime.datetime.now().date()
) # Store the current date for checking date changes
self.queue = queue.Queue() self.queue = queue.Queue()
self.batch_size = batch_size self.batch_size = batch_size
self.flush_interval = flush_interval self.flush_interval = flush_interval
@ -52,7 +60,21 @@ class FileSpanStorage(SpanStorage):
except queue.Full: except queue.Full:
pass # If the signal queue is full, it's okay. The flush thread will handle it. pass # If the signal queue is full, it's okay. The flush thread will handle it.
def _get_dated_filename(self, date: datetime.date) -> str:
"""Return the filename based on a specific date."""
date_str = date.strftime("%Y-%m-%d")
return f"{self.filename_prefix}_{date_str}{self.filename_suffix}"
def _roll_over_if_needed(self):
"""Checks if a day has changed since the last write, and if so, renames the current file."""
current_date = datetime.datetime.now().date()
if current_date != self.last_date:
if os.path.exists(self.filename):
os.rename(self.filename, self._get_dated_filename(self.last_date))
self.last_date = current_date
def _write_to_file(self): def _write_to_file(self):
self._roll_over_if_needed()
spans_to_write = [] spans_to_write = []
while not self.queue.empty(): while not self.queue.empty():
spans_to_write.append(self.queue.get()) spans_to_write.append(self.queue.get())

View File

@ -4,6 +4,8 @@ import asyncio
import json import json
import tempfile import tempfile
import time import time
from unittest.mock import patch
from datetime import datetime, timedelta
from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType
@ -122,3 +124,46 @@ def test_non_existent_file(storage: SpanStorage):
assert len(spans_in_file) == 2 assert len(spans_in_file) == 2
assert spans_in_file[0]["trace_id"] == "1" assert spans_in_file[0]["trace_id"] == "1"
assert spans_in_file[1]["trace_id"] == "2" assert spans_in_file[1]["trace_id"] == "2"
@pytest.mark.parametrize(
"storage", [{"batch_size": 1, "file_does_not_exist": True}], indirect=True
)
def test_log_rollover(storage: SpanStorage):
# mock start date
mock_start_date = datetime(2023, 10, 18, 23, 59)
with patch("datetime.datetime") as mock_datetime:
mock_datetime.now.return_value = mock_start_date
span1 = Span("1", "a", SpanType.BASE, "b", "op1")
storage.append_span(span1)
time.sleep(0.1)
# mock new day
mock_datetime.now.return_value = mock_start_date + timedelta(minutes=1)
span2 = Span("2", "c", SpanType.BASE, "d", "op2")
storage.append_span(span2)
time.sleep(0.1)
# origin filename need exists
assert os.path.exists(storage.filename)
# get roll over filename
dated_filename = os.path.join(
os.path.dirname(storage.filename),
f"{os.path.basename(storage.filename).split('.')[0]}_2023-10-18.jsonl",
)
assert os.path.exists(dated_filename)
# check origin filename just include the second span
spans_in_original_file = read_spans_from_file(storage.filename)
assert len(spans_in_original_file) == 1
assert spans_in_original_file[0]["trace_id"] == "2"
# check the roll over filename just include the first span
spans_in_dated_file = read_spans_from_file(dated_filename)
assert len(spans_in_dated_file) == 1
assert spans_in_dated_file[0]["trace_id"] == "1"