From 2e750f6b9c4ffee051e3bf785175e82ec87b2c07 Mon Sep 17 00:00:00 2001 From: FangYin Cheng Date: Wed, 18 Oct 2023 18:47:47 +0800 Subject: [PATCH] feat(core): The tracer FileSpanStorage support file roll over --- pilot/utils/tracer/span_storage.py | 22 +++++++++ pilot/utils/tracer/tests/test_span_storage.py | 45 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/pilot/utils/tracer/span_storage.py b/pilot/utils/tracer/span_storage.py index 914aa0126..3070fb834 100644 --- a/pilot/utils/tracer/span_storage.py +++ b/pilot/utils/tracer/span_storage.py @@ -1,6 +1,7 @@ import os import json import time +import datetime import threading import queue import logging @@ -27,6 +28,13 @@ class FileSpanStorage(SpanStorage): def __init__(self, filename: str, batch_size=10, flush_interval=10): super().__init__() self.filename = filename + # Split filename into prefix and suffix + self.filename_prefix, self.filename_suffix = os.path.splitext(filename) + if not self.filename_suffix: + self.filename_suffix = ".log" + self.last_date = ( + datetime.datetime.now().date() + ) # Store the current date for checking date changes self.queue = queue.Queue() self.batch_size = batch_size self.flush_interval = flush_interval @@ -52,7 +60,21 @@ class FileSpanStorage(SpanStorage): except queue.Full: pass # If the signal queue is full, it's okay. The flush thread will handle it. + def _get_dated_filename(self, date: datetime.date) -> str: + """Return the filename based on a specific date.""" + date_str = date.strftime("%Y-%m-%d") + return f"{self.filename_prefix}_{date_str}{self.filename_suffix}" + + def _roll_over_if_needed(self): + """Checks if a day has changed since the last write, and if so, renames the current file.""" + current_date = datetime.datetime.now().date() + if current_date != self.last_date: + if os.path.exists(self.filename): + os.rename(self.filename, self._get_dated_filename(self.last_date)) + self.last_date = current_date + def _write_to_file(self): + self._roll_over_if_needed() spans_to_write = [] while not self.queue.empty(): spans_to_write.append(self.queue.get()) diff --git a/pilot/utils/tracer/tests/test_span_storage.py b/pilot/utils/tracer/tests/test_span_storage.py index 0c63992a6..9ca727995 100644 --- a/pilot/utils/tracer/tests/test_span_storage.py +++ b/pilot/utils/tracer/tests/test_span_storage.py @@ -4,6 +4,8 @@ import asyncio import json import tempfile import time +from unittest.mock import patch +from datetime import datetime, timedelta from pilot.utils.tracer import SpanStorage, FileSpanStorage, Span, SpanType @@ -122,3 +124,46 @@ def test_non_existent_file(storage: SpanStorage): assert len(spans_in_file) == 2 assert spans_in_file[0]["trace_id"] == "1" assert spans_in_file[1]["trace_id"] == "2" + + +@pytest.mark.parametrize( + "storage", [{"batch_size": 1, "file_does_not_exist": True}], indirect=True +) +def test_log_rollover(storage: SpanStorage): + # mock start date + mock_start_date = datetime(2023, 10, 18, 23, 59) + + with patch("datetime.datetime") as mock_datetime: + mock_datetime.now.return_value = mock_start_date + + span1 = Span("1", "a", SpanType.BASE, "b", "op1") + storage.append_span(span1) + time.sleep(0.1) + + # mock new day + mock_datetime.now.return_value = mock_start_date + timedelta(minutes=1) + + span2 = Span("2", "c", SpanType.BASE, "d", "op2") + storage.append_span(span2) + time.sleep(0.1) + + # origin filename need exists + assert os.path.exists(storage.filename) + + # get roll over filename + dated_filename = os.path.join( + os.path.dirname(storage.filename), + f"{os.path.basename(storage.filename).split('.')[0]}_2023-10-18.jsonl", + ) + + assert os.path.exists(dated_filename) + + # check origin filename just include the second span + spans_in_original_file = read_spans_from_file(storage.filename) + assert len(spans_in_original_file) == 1 + assert spans_in_original_file[0]["trace_id"] == "2" + + # check the roll over filename just include the first span + spans_in_dated_file = read_spans_from_file(dated_filename) + assert len(spans_in_dated_file) == 1 + assert spans_in_dated_file[0]["trace_id"] == "1"