Updated with chunk file upload

This commit is contained in:
Saurab-Shrestha 2024-02-26 16:58:11 +05:45
parent e307e73b03
commit b9556626b1
6 changed files with 331 additions and 139 deletions

View File

@ -0,0 +1,34 @@
"""Set null on delete audit
Revision ID: 186dc125c8c4
Revises: 9aa759c05b19
Create Date: 2024-02-26 15:43:49.759556
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '186dc125c8c4'
down_revision: Union[str, None] = '9aa759c05b19'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint('audit_user_id_fkey', 'audit', type_='foreignkey')
op.create_foreign_key(None, 'audit', 'users', ['user_id'], ['id'], ondelete='SET NULL')
# op.create_unique_constraint('unique_user_role', 'user_roles', ['user_id', 'role_id', 'company_id'])
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# op.drop_constraint('unique_user_role', 'user_roles', type_='unique')
op.drop_constraint(None, 'audit', type_='foreignkey')
op.create_foreign_key('audit_user_id_fkey', 'audit', 'users', ['user_id'], ['id'])
# ### end Alembic commands ###

View File

@ -1,51 +1,50 @@
import os import os
import aiofiles
import fitz import fitz
import requests import requests
from docx import Document from docx import Document
from fastapi import HTTPException, status, File, UploadFile, APIRouter, Request, Security, Depends from fastapi import HTTPException, status, File, UploadFile, APIRouter, Request, Security, Depends
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from private_gpt.users import models from private_gpt.users import models
from private_gpt.users.api import deps from private_gpt.users.api import deps
from private_gpt.users.constants.role import Role from private_gpt.users.constants.role import Role
from private_gpt.components.ocr_components.TextExtraction import ImageToTable
from private_gpt.components.ocr_components.table_ocr import GetOCRText
from private_gpt.server.ingest.ingest_router import common_ingest_logic, IngestResponse from private_gpt.server.ingest.ingest_router import common_ingest_logic, IngestResponse
from private_gpt.constants import OCR_UPLOAD from private_gpt.constants import OCR_UPLOAD
from private_gpt.components.ocr_components.TextExtraction import ImageToTable
from private_gpt.components.ocr_components.table_ocr import GetOCRText
import traceback
pdf_router = APIRouter(prefix="/v1", tags=["ocr"]) pdf_router = APIRouter(prefix="/v1", tags=["ocr"])
@pdf_router.post("/pdf_ocr") async def save_uploaded_file(file: UploadFile, upload_dir: str):
async def get_pdf_ocr( file_path = os.path.join(upload_dir, file.filename)
request: Request, print("The file name is: ",file.filename);
db: Session = Depends(deps.get_db), print("THe file path is: ", file_path)
file: UploadFile = File(...),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
)
):
UPLOAD_DIR = OCR_UPLOAD
try: try:
contents = await file.read() contents = await file.read()
except Exception as e: async with aiofiles.open(file_path, 'wb') as f:
await f.write(contents)
except Exception:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"There was an error reading the file: {e}" detail=f"There was an error uploading the file."
) )
finally:
await file.close()
file_path = os.path.join(UPLOAD_DIR, file.filename) # with open(file_path, "wb") as f:
with open(file_path, "wb") as f: # f.write(file.file.read())
f.write(contents)
return file_path
async def process_images_and_generate_doc(pdf_path: str, upload_dir: str):
doc = Document() doc = Document()
ocr = GetOCRText() ocr = GetOCRText()
img_tab = ImageToTable() img_tab = ImageToTable()
pdf_doc = fitz.open(file_path) pdf_doc = fitz.open(pdf_path)
# try:
for page_index in range(len(pdf_doc)): for page_index in range(len(pdf_doc)):
page = pdf_doc[page_index] page = pdf_doc[page_index]
image_list = page.get_images() image_list = page.get_images()
@ -68,20 +67,87 @@ async def get_pdf_ocr(
doc.add_paragraph(extracted_text) doc.add_paragraph(extracted_text)
table_data = img_tab.table_to_csv(image_path) table_data = img_tab.table_to_csv(image_path)
doc.add_paragraph(table_data) doc.add_paragraph(table_data)
os.remove(image_path) os.remove(image_path)
save_path = os.path.join( save_path = os.path.join(
UPLOAD_DIR, f"{file.filename.replace('.pdf', '_ocr.docx')}") upload_dir, f"{os.path.splitext(os.path.basename(pdf_path))[0]}_ocr.docx")
doc.save(save_path) doc.save(save_path)
return save_path
with open(save_path, 'rb') as f:
file_content = f.read() async def process_pdf_ocr(
if not file_content: request: Request,
raise HTTPException( db: Session,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, file: UploadFile,
detail="Empty file content after processing OCR" current_user: models.User,
) log_audit: models.Audit
ingested_documents = await common_ingest_logic( ):
request=request,db=db, ocr_file=save_path, current_user=current_user UPLOAD_DIR = OCR_UPLOAD
try:
print("The file name is: ", file.filename)
pdf_path = await save_uploaded_file(file, UPLOAD_DIR)
print("The file path: ", pdf_path)
ocr_doc_path = await process_images_and_generate_doc(pdf_path, UPLOAD_DIR)
ingested_documents = await common_ingest_logic(
request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=None, log_audit=log_audit
)
return IngestResponse(object="list", model="private-gpt", data=ingested_documents)
except Exception as e:
print(traceback.print_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"There was an error processing OCR: {e}"
)
async def process_both(
request: Request,
db: Session,
file: UploadFile,
current_user: models.User,
log_audit: models.Audit
):
UPLOAD_DIR = OCR_UPLOAD
try:
pdf_path = await save_uploaded_file(file, UPLOAD_DIR)
ocr_doc_path = await process_images_and_generate_doc(pdf_path, UPLOAD_DIR)
ingested_documents = await common_ingest_logic(
request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=pdf_path, log_audit=log_audit
)
return IngestResponse(object="list", model="private-gpt", data=ingested_documents)
except Exception as e:
print(traceback.print_exc())
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"There was an error processing OCR: {e}"
)
@pdf_router.post("/pdf_ocr")
async def get_pdf_ocr_wrapper(
request: Request,
db: Session = Depends(deps.get_db),
log_audit: models.Audit = Depends(deps.get_audit_logger),
file: UploadFile = File(...),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
) )
return IngestResponse(object="list", model="private-gpt", data=ingested_documents) ):
return await process_pdf_ocr(request, db, file, current_user, log_audit)
@pdf_router.post("/both")
async def get_both_wrapper(
request: Request,
db: Session = Depends(deps.get_db),
log_audit: models.Audit = Depends(deps.get_audit_logger),
file: UploadFile = File(...),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
)
):
return await process_both(request, db, file, current_user, log_audit)

View File

@ -4,6 +4,7 @@ import traceback
from pathlib import Path from pathlib import Path
from typing import Literal, Optional, List from typing import Literal, Optional, List
import aiofiles
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status, Security, Body from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status, Security, Body
@ -17,7 +18,7 @@ from private_gpt.users.constants.role import Role
from private_gpt.server.ingest.ingest_service import IngestService from private_gpt.server.ingest.ingest_service import IngestService
from private_gpt.server.ingest.model import IngestedDoc from private_gpt.server.ingest.model import IngestedDoc
from private_gpt.server.utils.auth import authenticated from private_gpt.server.utils.auth import authenticated
from private_gpt.constants import UPLOAD_DIR from private_gpt.constants import UPLOAD_DIR, OCR_UPLOAD
ingest_router = APIRouter(prefix="/v1", dependencies=[Depends(authenticated)]) ingest_router = APIRouter(prefix="/v1", dependencies=[Depends(authenticated)])
@ -146,20 +147,17 @@ def delete_file(
filename = delete_input.filename filename = delete_input.filename
service = request.state.injector.get(IngestService) service = request.state.injector.get(IngestService)
try: try:
doc_ids = service.get_doc_ids_by_filename(filename)
if not doc_ids:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"No documents found with filename '{filename}'")
for doc_id in doc_ids:
service.delete(doc_id)
try:
upload_path = Path(f"{UPLOAD_DIR}/{filename}")
os.remove(upload_path)
except:
print("Unable to delete file from the static directory")
document = crud.documents.get_by_filename(db,file_name=filename) document = crud.documents.get_by_filename(db,file_name=filename)
if document: if document:
doc_ids = service.get_doc_ids_by_filename(filename)
try:
if doc_ids:
for doc_id in doc_ids:
service.delete(doc_id)
upload_path = Path(f"{UPLOAD_DIR}/{filename}")
os.remove(upload_path)
except:
print("Unable to delete file from the static directory")
log_audit( log_audit(
model='Document', model='Document',
action='delete', action='delete',
@ -179,83 +177,84 @@ def delete_file(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal Server Error") status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal Server Error")
@ingest_router.post("/ingest/file", response_model=IngestResponse, tags=["Ingestion"]) # @ingest_router.post("/ingest/file", response_model=IngestResponse, tags=["Ingestion"])
def ingest_file( # def ingest_file(
request: Request, # request: Request,
log_audit: models.Audit = Depends(deps.get_audit_logger), # log_audit: models.Audit = Depends(deps.get_audit_logger),
db: Session = Depends(deps.get_db), # db: Session = Depends(deps.get_db),
file: UploadFile = File(...), # file: UploadFile = File(...),
current_user: models.User = Security( # current_user: models.User = Security(
deps.get_current_user, # deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]], # scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
)) -> IngestResponse: # )) -> IngestResponse:
"""Ingests and processes a file, storing its chunks to be used as context.""" # """Ingests and processes a file, storing its chunks to be used as context."""
service = request.state.injector.get(IngestService) # service = request.state.injector.get(IngestService)
print("-------------------------------------->",file) # print("-------------------------------------->",file)
try: # try:
file_ingested = crud.documents.get_by_filename(db, file_name=file.filename) # file_ingested = crud.documents.get_by_filename(db, file_name=file.filename)
if file_ingested: # if file_ingested:
raise HTTPException( # raise HTTPException(
status_code=status.HTTP_409_CONFLICT, # status_code=status.HTTP_409_CONFLICT,
detail="File already exists. Choose a different file.", # detail="File already exists. Choose a different file.",
) # )
if file.filename is None: # if file.filename is None:
raise HTTPException( # raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, # status_code=status.HTTP_400_BAD_REQUEST,
detail="No file name provided", # detail="No file name provided",
) # )
# try: # # try:
docs_in = schemas.DocumentCreate(filename=file.filename, uploaded_by=current_user.id, department_id=current_user.department_id) # docs_in = schemas.DocumentCreate(filename=file.filename, uploaded_by=current_user.id, department_id=current_user.department_id)
crud.documents.create(db=db, obj_in=docs_in) # crud.documents.create(db=db, obj_in=docs_in)
# except Exception as e: # # except Exception as e:
# raise HTTPException( # # raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="Unable to upload file.", # # detail="Unable to upload file.",
# ) # # )
upload_path = Path(f"{UPLOAD_DIR}/{file.filename}") # upload_path = Path(f"{UPLOAD_DIR}/{file.filename}")
with open(upload_path, "wb") as f: # with open(upload_path, "wb") as f:
f.write(file.file.read()) # f.write(file.file.read())
with open(upload_path, "rb") as f: # with open(upload_path, "rb") as f:
ingested_documents = service.ingest_bin_data(file.filename, f) # ingested_documents = service.ingest_bin_data(file.filename, f)
logger.info(f"{file.filename} is uploaded by the {current_user.fullname}.") # logger.info(f"{file.filename} is uploaded by the {current_user.fullname}.")
response = IngestResponse( # response = IngestResponse(
object="list", model="private-gpt", data=ingested_documents) # object="list", model="private-gpt", data=ingested_documents)
log_audit(model='Document', action='create', # log_audit(model='Document', action='create',
details={ # details={
'filename': f"{file.filename} uploaded successfully", # 'filename': f"{file.filename} uploaded successfully",
'user': current_user.fullname, # 'user': current_user.fullname,
}, user_id=current_user.id) # }, user_id=current_user.id)
return response # return response
except HTTPException: # except HTTPException:
print(traceback.print_exc()) # print(traceback.print_exc())
raise # raise
except Exception as e: # except Exception as e:
print(traceback.print_exc()) # print(traceback.print_exc())
log_audit(model='Document', action='create', # log_audit(model='Document', action='create',
details={"status": 500, "detail": "Internal Server Error: Unable to ingest file.", }, user_id=current_user.id) # details={"status": 500, "detail": "Internal Server Error: Unable to ingest file.", }, user_id=current_user.id)
logger.error(f"There was an error uploading the file(s): {str(e)}") # logger.error(f"There was an error uploading the file(s): {str(e)}")
raise HTTPException( # raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal Server Error: Unable to ingest file.", # detail="Internal Server Error: Unable to ingest file.",
) # )
async def common_ingest_logic( async def common_ingest_logic(
request: Request, request: Request,
db: Session, db: Session,
ocr_file, ocr_file,
current_user, original_file: str = None,
current_user: models.User = None,
log_audit: models.Audit = None,
): ):
service = request.state.injector.get(IngestService) service = request.state.injector.get(IngestService)
log_audit: models.Audit = Depends(deps.get_audit_logger)
try: try:
with open(ocr_file, 'rb') as file: with open(ocr_file, 'rb') as file:
file_name = Path(ocr_file).name file_name = Path(ocr_file).name
@ -293,7 +292,36 @@ async def common_ingest_logic(
}, },
user_id=current_user.id user_id=current_user.id
) )
# Handling Original File
if original_file:
print("ORIGINAL PDF FILE PATH IS :: ", original_file)
file_name = Path(original_file).name
upload_path = Path(f"{UPLOAD_DIR}/{file_name}")
file_ingested = crud.documents.get_by_filename(
db, file_name=file_name)
if file_ingested:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="File already exists. Choose a different file.",
)
if file_name is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No file name provided",
)
docs_in = schemas.DocumentCreate(
filename=file_name, uploaded_by=current_user.id, department_id=current_user.department_id)
crud.documents.create(db=db, obj_in=docs_in)
with open(upload_path, "wb") as f:
with open(original_file, "rb") as original_file_reader:
f.write(original_file_reader.read())
with open(upload_path, "rb") as f:
ingested_documents = service.ingest_bin_data(file_name, f)
logger.info( logger.info(
f"{file_name} is uploaded by the {current_user.fullname}.") f"{file_name} is uploaded by the {current_user.fullname}.")
@ -311,3 +339,92 @@ async def common_ingest_logic(
status_code=500, status_code=500,
detail="Internal Server Error: Unable to ingest file.", detail="Internal Server Error: Unable to ingest file.",
) )
@ingest_router.post("/ingest/file", response_model=IngestResponse, tags=["Ingestion"])
async def ingest_file(
request: Request,
log_audit: models.Audit = Depends(deps.get_audit_logger),
db: Session = Depends(deps.get_db),
file: UploadFile = File(...),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
)) -> IngestResponse:
"""Ingests and processes a file, storing its chunks to be used as context."""
service = request.state.injector.get(IngestService)
try:
original_filename = file.filename
print("Original file name is:", original_filename)
if original_filename is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No file name provided",
)
upload_path = Path(f"{UPLOAD_DIR}/{original_filename}")
try:
contents = await file.read()
async with aiofiles.open(upload_path, 'wb') as f:
await f.write(contents)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal Server Error: Unable to ingest file.",
)
file_ingested = crud.documents.get_by_filename(db, file_name=original_filename)
if file_ingested:
os.remove(upload_path)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="File already exists. Choose a different file.",
)
docs_in = schemas.DocumentCreate(
filename=original_filename, uploaded_by=current_user.id, department_id=current_user.department_id
)
crud.documents.create(db=db, obj_in=docs_in)
with open(upload_path, "rb") as f:
ingested_documents = service.ingest_bin_data(original_filename, f)
logger.info(f"{original_filename} is uploaded by {current_user.fullname}.")
response = IngestResponse(
object="list", model="private-gpt", data=ingested_documents
)
log_audit(
model="Document",
action="create",
details={
"filename": f"{original_filename} uploaded successfully",
"user": current_user.fullname,
},
user_id=current_user.id,
)
return response
except HTTPException:
print(traceback.print_exc())
raise
except Exception as e:
print(traceback.print_exc())
log_audit(
model="Document",
action="create",
details={
"status": 500,
"detail": "Internal Server Error: Unable to ingest file.",
},
user_id=current_user.id,
)
logger.error(f"There was an error uploading the file(s): {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal Server Error: Unable to ingest file.",
)

View File

@ -134,7 +134,9 @@ def login_access_token(
if depart: if depart:
ad_user_register(db=db, email=form_data.username, fullname=username, password=form_data.password, department_id=depart.id) ad_user_register(db=db, email=form_data.username, fullname=username, password=form_data.password, department_id=depart.id)
else: else:
ad_user_register(db=db, email=form_data.username, fullname=username, password=form_data.password, department_id=1) department_in = schemas.DepartmentCreate(name=depart)
new_department = crud.department.create(db, obj_in=department_in)
ad_user_register(db=db, email=form_data.username, fullname=username, password=form_data.password, department_id=new_department.id)
return True return True
return False return False

View File

@ -9,7 +9,7 @@ class Audit(Base):
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
timestamp = Column(DateTime, nullable=False, default=datetime.utcnow) timestamp = Column(DateTime, nullable=False, default=datetime.utcnow)
user_id = Column(Integer, ForeignKey("users.id"), nullable=True) user_id = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
model = Column(String, nullable=False) model = Column(String, nullable=False)
action = Column(String, nullable=False) action = Column(String, nullable=False)
details = Column(JSONB, nullable=True) details = Column(JSONB, nullable=True)

View File

@ -3,9 +3,6 @@ from sqlalchemy.orm import relationship, Session
from sqlalchemy import Column, Integer, String from sqlalchemy import Column, Integer, String
from private_gpt.users.db.base_class import Base from private_gpt.users.db.base_class import Base
# from private_gpt.users.models.document import Document
# from private_gpt.users.models.user import User
class Department(Base): class Department(Base):
"""Models a Department table.""" """Models a Department table."""
@ -23,27 +20,3 @@ class Department(Base):
total_users = Column(Integer, default=0) total_users = Column(Integer, default=0)
total_documents = Column(Integer, default=0) total_documents = Column(Integer, default=0)
# @event.listens_for(Department, 'after_insert')
# @event.listens_for(Department, 'after_update')
# def update_total_users(mapper, connection, target):
# print("--------------------------------------------------------------Calling Event User------------------------------------------------------------------------")
# connection.execute(
# Department.__table__.update().
# where(Department.id == target.id).
# values(total_users=Session.object_session(target).query(
# User).filter_by(department_id=target.id).count())
# )
# @event.listens_for(Department, 'after_insert')
# @event.listens_for(Department, 'after_update')
# def update_total_documents(mapper, connection, target):
# print("--------------------------------------------------------------Calling Event Department------------------------------------------------------------------------")
# connection.execute(
# Department.__table__.update().
# where(Department.id == target.id).
# values(total_documents=Session.object_session(target).query(
# Document).filter_by(department_id=target.id).count())
# )