mirror of
https://github.com/imartinez/privateGPT.git
synced 2025-09-01 15:19:05 +00:00
Semaphore to limit docq async workers. ETA reporting
This commit is contained in:
@@ -20,6 +20,7 @@ from llama_index.core.storage import StorageContext
|
|||||||
from private_gpt.components.ingest.ingest_helper import IngestionHelper
|
from private_gpt.components.ingest.ingest_helper import IngestionHelper
|
||||||
from private_gpt.paths import local_data_path
|
from private_gpt.paths import local_data_path
|
||||||
from private_gpt.settings.settings import Settings
|
from private_gpt.settings.settings import Settings
|
||||||
|
from private_gpt.utils.eta import eta
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -358,7 +359,11 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
|
|||||||
# Using a shallow queue causes the filesystem parser to block
|
# Using a shallow queue causes the filesystem parser to block
|
||||||
# when it reaches capacity. This ensures it doesn't outpace the
|
# when it reaches capacity. This ensures it doesn't outpace the
|
||||||
# computationally intensive embeddings phase, avoiding unnecessary
|
# computationally intensive embeddings phase, avoiding unnecessary
|
||||||
# memory consumption
|
# memory consumption. The semaphore is used to bound the async worker
|
||||||
|
# embedding computations to cause the doc Q to fill and block.
|
||||||
|
self.doc_semaphore = multiprocessing.Semaphore(
|
||||||
|
self.count_workers
|
||||||
|
) # limit the doc queue to # items.
|
||||||
self.doc_q: Queue[tuple[str, str | None, list[Document] | None]] = Queue(20)
|
self.doc_q: Queue[tuple[str, str | None, list[Document] | None]] = Queue(20)
|
||||||
# node_q stores documents parsed into nodes (embeddings).
|
# node_q stores documents parsed into nodes (embeddings).
|
||||||
# Larger queue size so we don't block the embedding workers during a slow
|
# Larger queue size so we don't block the embedding workers during a slow
|
||||||
@@ -379,6 +384,8 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
|
|||||||
) # Documents for a file
|
) # Documents for a file
|
||||||
if cmd == "process":
|
if cmd == "process":
|
||||||
# Push CPU/GPU embedding work to the worker pool
|
# Push CPU/GPU embedding work to the worker pool
|
||||||
|
# Acquire semaphore to control access to worker pool
|
||||||
|
self.doc_semaphore.acquire()
|
||||||
pool.apply_async(
|
pool.apply_async(
|
||||||
self._doc_to_node_worker, (file_name, documents)
|
self._doc_to_node_worker, (file_name, documents)
|
||||||
)
|
)
|
||||||
@@ -398,6 +405,7 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
|
|||||||
)
|
)
|
||||||
self.node_q.put(("process", file_name, documents, nodes))
|
self.node_q.put(("process", file_name, documents, nodes))
|
||||||
finally:
|
finally:
|
||||||
|
self.doc_semaphore.release()
|
||||||
self.doc_q.task_done() # unblock Q joins
|
self.doc_q.task_done() # unblock Q joins
|
||||||
|
|
||||||
def _save_docs(
|
def _save_docs(
|
||||||
@@ -459,7 +467,7 @@ class PipelineIngestComponent(BaseIngestComponentWithIndex):
|
|||||||
|
|
||||||
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
|
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
|
||||||
docs = []
|
docs = []
|
||||||
for file_name, file_data in files:
|
for file_name, file_data in eta(files):
|
||||||
try:
|
try:
|
||||||
documents = IngestionHelper.transform_file_into_documents(
|
documents = IngestionHelper.transform_file_into_documents(
|
||||||
file_name, file_data
|
file_name, file_data
|
||||||
|
122
private_gpt/utils/eta.py
Normal file
122
private_gpt/utils/eta.py
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
from collections import deque
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def human_time(*args: Any, **kwargs: Any) -> str:
|
||||||
|
def timedelta_total_seconds(timedelta: datetime.timedelta) -> float:
|
||||||
|
return (
|
||||||
|
timedelta.microseconds
|
||||||
|
+ 0.0
|
||||||
|
+ (timedelta.seconds + timedelta.days * 24 * 3600) * 10**6
|
||||||
|
) / 10**6
|
||||||
|
|
||||||
|
secs = float(timedelta_total_seconds(datetime.timedelta(*args, **kwargs)))
|
||||||
|
# We want (ms) precision below 2 seconds
|
||||||
|
if secs < 2:
|
||||||
|
return f"{secs * 1000}ms"
|
||||||
|
units = [("y", 86400 * 365), ("d", 86400), ("h", 3600), ("m", 60), ("s", 1)]
|
||||||
|
parts = []
|
||||||
|
for unit, mul in units:
|
||||||
|
if secs / mul >= 1 or mul == 1:
|
||||||
|
if mul > 1:
|
||||||
|
n = int(math.floor(secs / mul))
|
||||||
|
secs -= n * mul
|
||||||
|
else:
|
||||||
|
# >2s we drop the (ms) component.
|
||||||
|
n = int(secs)
|
||||||
|
if n:
|
||||||
|
parts.append(f"{n}{unit}")
|
||||||
|
return " ".join(parts)
|
||||||
|
|
||||||
|
|
||||||
|
def eta(iterator: list[Any]) -> Any:
|
||||||
|
"""Report an ETA after 30s and every 60s thereafter."""
|
||||||
|
total = len(iterator)
|
||||||
|
_eta = ETA(total)
|
||||||
|
_eta.needReport(30)
|
||||||
|
for processed, data in enumerate(iterator, start=1):
|
||||||
|
yield data
|
||||||
|
_eta.update(processed)
|
||||||
|
if _eta.needReport(60):
|
||||||
|
logger.info(f"{processed}/{total} - ETA {_eta.human_time()}")
|
||||||
|
|
||||||
|
|
||||||
|
class ETA:
|
||||||
|
"""Predict how long something will take to complete."""
|
||||||
|
|
||||||
|
def __init__(self, total: int):
|
||||||
|
self.total: int = total # Total expected records.
|
||||||
|
self.rate: float = 0.0 # per second
|
||||||
|
self._timing_data: deque[tuple[float, int]] = deque(maxlen=100)
|
||||||
|
self.secondsLeft: float = 0.0
|
||||||
|
self.nexttime: float = 0.0
|
||||||
|
|
||||||
|
def human_time(self) -> str:
|
||||||
|
if self._calc():
|
||||||
|
return f"{human_time(seconds=self.secondsLeft)} @ {int(self.rate * 60)}/min"
|
||||||
|
return "(computing)"
|
||||||
|
|
||||||
|
def update(self, count: int) -> None:
|
||||||
|
# count should be in the range 0 to self.total
|
||||||
|
assert count > 0
|
||||||
|
assert count <= self.total
|
||||||
|
self._timing_data.append((time.time(), count)) # (X,Y) for pearson
|
||||||
|
|
||||||
|
def needReport(self, whenSecs: int) -> bool:
|
||||||
|
now = time.time()
|
||||||
|
if now > self.nexttime:
|
||||||
|
self.nexttime = now + whenSecs
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _calc(self) -> bool:
|
||||||
|
# A sample before a prediction. Need two points to compute slope!
|
||||||
|
if len(self._timing_data) < 3:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
|
||||||
|
# Calculate means and standard deviations.
|
||||||
|
samples = len(self._timing_data)
|
||||||
|
# column wise sum of the timing tuples to compute their mean.
|
||||||
|
mean_x, mean_y = (
|
||||||
|
sum(i) / samples for i in zip(*self._timing_data, strict=False)
|
||||||
|
)
|
||||||
|
std_x = math.sqrt(
|
||||||
|
sum(pow(i[0] - mean_x, 2) for i in self._timing_data) / (samples - 1)
|
||||||
|
)
|
||||||
|
std_y = math.sqrt(
|
||||||
|
sum(pow(i[1] - mean_y, 2) for i in self._timing_data) / (samples - 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Calculate coefficient.
|
||||||
|
sum_xy, sum_sq_v_x, sum_sq_v_y = 0.0, 0.0, 0
|
||||||
|
for x, y in self._timing_data:
|
||||||
|
x -= mean_x
|
||||||
|
y -= mean_y
|
||||||
|
sum_xy += x * y
|
||||||
|
sum_sq_v_x += pow(x, 2)
|
||||||
|
sum_sq_v_y += pow(y, 2)
|
||||||
|
pearson_r = sum_xy / math.sqrt(sum_sq_v_x * sum_sq_v_y)
|
||||||
|
|
||||||
|
# Calculate regression line.
|
||||||
|
# y = mx + b where m is the slope and b is the y-intercept.
|
||||||
|
m = self.rate = pearson_r * (std_y / std_x)
|
||||||
|
y = self.total
|
||||||
|
b = mean_y - m * mean_x
|
||||||
|
x = (y - b) / m
|
||||||
|
|
||||||
|
# Calculate fitted line (transformed/shifted regression line horizontally).
|
||||||
|
fitted_b = self._timing_data[-1][1] - (m * self._timing_data[-1][0])
|
||||||
|
fitted_x = (y - fitted_b) / m
|
||||||
|
_, count = self._timing_data[-1] # adjust last data point progress count
|
||||||
|
adjusted_x = ((fitted_x - x) * (count / self.total)) + x
|
||||||
|
eta_epoch = adjusted_x
|
||||||
|
|
||||||
|
self.secondsLeft = max([eta_epoch - time.time(), 0])
|
||||||
|
return True
|
Reference in New Issue
Block a user