1
0
mirror of https://github.com/imartinez/privateGPT.git synced 2025-05-06 23:38:21 +00:00

Update ingest_helper.py to use chardet

Fixes errors related to character mapping in existing code.
This commit is contained in:
kpcrash 2025-02-04 18:13:36 -05:00 committed by GitHub
parent b7ee43788d
commit 5f8b29c571
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -77,6 +77,7 @@ class IngestionHelper:
IngestionHelper._exclude_metadata(documents)
return documents
#Modification to provide better support for file ingest with charmap issues
@staticmethod
def _load_file_to_documents(file_name: str, file_data: Path) -> list[Document]:
logger.debug("Transforming file_name=%s into documents", file_name)
@ -89,17 +90,61 @@ class IngestionHelper:
)
# Read as a plain text
string_reader = StringIterableReader()
return string_reader.load_data([file_data.read_text()])
return string_reader.load_data([IngestionHelper._read_all_text(file_data)])
logger.debug("Specific reader found for extension=%s", extension)
documents = reader_cls().load_data(file_data)
# Sanitize NUL bytes in text which can't be stored in Postgres
for i in range(len(documents)):
documents[i].text = documents[i].text.replace("\u0000", "")
return documents
#new method being called from _load_file_to_documents
@staticmethod
def _read_all_text(file_data: Path) -> str:
try:
# Read raw bytes first
raw_bytes = file_data.read_bytes()
# Use chardet to detect encoding
detected = chardet.detect(raw_bytes)
encoding = detected["encoding"] or 'utf-8' # Fallback to utf-8 if detection fails
confidence = detected.get('confidence', 0)
logger.debug(f"Detected encoding {encoding} with confidence {confidence} for {file_data}")
# Try the detected encoding first
try:
text = raw_bytes.decode(encoding)
except UnicodeDecodeError:
# If detected encoding fails, try common encodings
for fallback_encoding in ['utf-8', 'cp1252', 'iso-8859-1', 'latin1']:
try:
text = raw_bytes.decode(fallback_encoding)
logger.debug(f"Successfully decoded with fallback encoding: {fallback_encoding}")
break
except UnicodeDecodeError:
continue
else:
# If all encodings fail, use 'replace' error handler with utf-8
text = raw_bytes.decode('utf-8', errors='replace')
logger.warning(f"Falling back to UTF-8 with replacement for {file_data}")
# Clean up the text
cleaned_text = (text.encode('utf-8', errors='replace')
.decode('utf-8')
.replace('\udc58', '') # Remove specific problematic Unicode chars
.replace('\x00', '') # Remove null bytes
.replace('\ufffd', '') # Remove replacement character
.strip()) # Remove leading/trailing whitespace
if not cleaned_text:
logger.warning(f"Cleaned text is empty for {file_data}")
return cleaned_text
except Exception as e:
logger.error(f"Error processing file {file_data}: {str(e)}")
raise
@staticmethod
def _exclude_metadata(documents: list[Document]) -> None:
logger.debug("Excluding metadata from count=%s documents", len(documents))