From b0197e702793fa6c415321151ced4d1bf712a6e3 Mon Sep 17 00:00:00 2001 From: Brett England Date: Tue, 19 Mar 2024 11:36:43 -0400 Subject: [PATCH] Semaphore to limit docq async workers. ETA reporting --- .../components/ingest/ingest_component.py | 12 +- private_gpt/utils/eta.py | 122 ++++++++++++++++++ 2 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 private_gpt/utils/eta.py diff --git a/private_gpt/components/ingest/ingest_component.py b/private_gpt/components/ingest/ingest_component.py index c8b830a2..5ed03959 100644 --- a/private_gpt/components/ingest/ingest_component.py +++ b/private_gpt/components/ingest/ingest_component.py @@ -20,6 +20,7 @@ from llama_index.core.storage import StorageContext from private_gpt.components.ingest.ingest_helper import IngestionHelper from private_gpt.paths import local_data_path from private_gpt.settings.settings import Settings +from private_gpt.utils.eta import eta logger = logging.getLogger(__name__) @@ -358,7 +359,11 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex): # Using a shallow queue causes the filesystem parser to block # when it reaches capacity. This ensures it doesn't outpace the # computationally intensive embeddings phase, avoiding unnecessary - # memory consumption + # memory consumption. The semaphore is used to bound the async worker + # embedding computations to cause the doc Q to fill and block. + self.doc_semaphore = multiprocessing.Semaphore( + self.count_workers + ) # limit the doc queue to # items. self.doc_q: Queue[tuple[str, str | None, list[Document] | None]] = Queue(20) # node_q stores documents parsed into nodes (embeddings). # Larger queue size so we don't block the embedding workers during a slow @@ -379,6 +384,8 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex): ) # Documents for a file if cmd == "process": # Push CPU/GPU embedding work to the worker pool + # Acquire semaphore to control access to worker pool + self.doc_semaphore.acquire() pool.apply_async( self._doc_to_node_worker, (file_name, documents) ) @@ -398,6 +405,7 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex): ) self.node_q.put(("process", file_name, documents, nodes)) finally: + self.doc_semaphore.release() self.doc_q.task_done() # unblock Q joins def _save_docs( @@ -459,7 +467,7 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex): def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: docs = [] - for file_name, file_data in files: + for file_name, file_data in eta(files): try: documents = IngestionHelper.transform_file_into_documents( file_name, file_data diff --git a/private_gpt/utils/eta.py b/private_gpt/utils/eta.py new file mode 100644 index 00000000..9315334f --- /dev/null +++ b/private_gpt/utils/eta.py @@ -0,0 +1,122 @@ +import datetime +import logging +import math +import time +from collections import deque +from typing import Any + +logger = logging.getLogger(__name__) + + +def human_time(*args: Any, **kwargs: Any) -> str: + def timedelta_total_seconds(timedelta: datetime.timedelta) -> float: + return ( + timedelta.microseconds + + 0.0 + + (timedelta.seconds + timedelta.days * 24 * 3600) * 10**6 + ) / 10**6 + + secs = float(timedelta_total_seconds(datetime.timedelta(*args, **kwargs))) + # We want (ms) precision below 2 seconds + if secs < 2: + return f"{secs * 1000}ms" + units = [("y", 86400 * 365), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)] + parts = [] + for unit, mul in units: + if secs / mul >= 1 or mul == 1: + if mul > 1: + n = int(math.floor(secs / mul)) + secs -= n * mul + else: + # >2s we drop the (ms) component. + n = int(secs) + if n: + parts.append(f"{n}{unit}") + return " ".join(parts) + + +def eta(iterator: list[Any]) -> Any: + """Report an ETA after 30s and every 60s thereafter.""" + total = len(iterator) + _eta = ETA(total) + _eta.needReport(30) + for processed, data in enumerate(iterator, start=1): + yield data + _eta.update(processed) + if _eta.needReport(60): + logger.info(f"{processed}/{total} - ETA {_eta.human_time()}") + + +class ETA: + """Predict how long something will take to complete.""" + + def __init__(self, total: int): + self.total: int = total # Total expected records. + self.rate: float = 0.0 # per second + self._timing_data: deque[tuple[float, int]] = deque(maxlen=100) + self.secondsLeft: float = 0.0 + self.nexttime: float = 0.0 + + def human_time(self) -> str: + if self._calc(): + return f"{human_time(seconds=self.secondsLeft)} @ {int(self.rate * 60)}/min" + return "(computing)" + + def update(self, count: int) -> None: + # count should be in the range 0 to self.total + assert count > 0 + assert count <= self.total + self._timing_data.append((time.time(), count)) # (X,Y) for pearson + + def needReport(self, whenSecs: int) -> bool: + now = time.time() + if now > self.nexttime: + self.nexttime = now + whenSecs + return True + return False + + def _calc(self) -> bool: + # A sample before a prediction. Need two points to compute slope! + if len(self._timing_data) < 3: + return False + + # http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient + # Calculate means and standard deviations. + samples = len(self._timing_data) + # column wise sum of the timing tuples to compute their mean. + mean_x, mean_y = ( + sum(i) / samples for i in zip(*self._timing_data, strict=False) + ) + std_x = math.sqrt( + sum(pow(i[0] - mean_x, 2) for i in self._timing_data) / (samples - 1) + ) + std_y = math.sqrt( + sum(pow(i[1] - mean_y, 2) for i in self._timing_data) / (samples - 1) + ) + + # Calculate coefficient. + sum_xy, sum_sq_v_x, sum_sq_v_y = 0.0, 0.0, 0 + for x, y in self._timing_data: + x -= mean_x + y -= mean_y + sum_xy += x * y + sum_sq_v_x += pow(x, 2) + sum_sq_v_y += pow(y, 2) + pearson_r = sum_xy / math.sqrt(sum_sq_v_x * sum_sq_v_y) + + # Calculate regression line. + # y = mx + b where m is the slope and b is the y-intercept. + m = self.rate = pearson_r * (std_y / std_x) + y = self.total + b = mean_y - m * mean_x + x = (y - b) / m + + # Calculate fitted line (transformed/shifted regression line horizontally). + fitted_b = self._timing_data[-1][1] - (m * self._timing_data[-1][0]) + fitted_x = (y - fitted_b) / m + _, count = self._timing_data[-1] # adjust last data point progress count + adjusted_x = ((fitted_x - x) * (count / self.total)) + x + eta_epoch = adjusted_x + + self.secondsLeft = max([eta_epoch - time.time(), 0]) + return True