From b9556626b15b682b8217fb2b0b1675bc27ee16b4 Mon Sep 17 00:00:00 2001 From: Saurab-Shrestha Date: Mon, 26 Feb 2024 16:58:11 +0545 Subject: [PATCH] Updated with chunk file upload --- .../186dc125c8c4_set_null_on_delete_audit.py | 34 +++ .../ocr_components/table_ocr_api.py | 138 ++++++--- private_gpt/server/ingest/ingest_router.py | 265 +++++++++++++----- private_gpt/users/api/v1/routers/auth.py | 4 +- private_gpt/users/models/audit.py | 2 +- private_gpt/users/models/department.py | 27 -- 6 files changed, 331 insertions(+), 139 deletions(-) create mode 100644 alembic/versions/186dc125c8c4_set_null_on_delete_audit.py diff --git a/alembic/versions/186dc125c8c4_set_null_on_delete_audit.py b/alembic/versions/186dc125c8c4_set_null_on_delete_audit.py new file mode 100644 index 00000000..f48e906e --- /dev/null +++ b/alembic/versions/186dc125c8c4_set_null_on_delete_audit.py @@ -0,0 +1,34 @@ +"""Set null on delete audit + +Revision ID: 186dc125c8c4 +Revises: 9aa759c05b19 +Create Date: 2024-02-26 15:43:49.759556 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '186dc125c8c4' +down_revision: Union[str, None] = '9aa759c05b19' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint('audit_user_id_fkey', 'audit', type_='foreignkey') + op.create_foreign_key(None, 'audit', 'users', ['user_id'], ['id'], ondelete='SET NULL') + # op.create_unique_constraint('unique_user_role', 'user_roles', ['user_id', 'role_id', 'company_id']) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + # op.drop_constraint('unique_user_role', 'user_roles', type_='unique') + op.drop_constraint(None, 'audit', type_='foreignkey') + op.create_foreign_key('audit_user_id_fkey', 'audit', 'users', ['user_id'], ['id']) + # ### end Alembic commands ### diff --git a/private_gpt/components/ocr_components/table_ocr_api.py b/private_gpt/components/ocr_components/table_ocr_api.py index 088d7b9c..dc5e4c01 100644 --- a/private_gpt/components/ocr_components/table_ocr_api.py +++ b/private_gpt/components/ocr_components/table_ocr_api.py @@ -1,51 +1,50 @@ import os +import aiofiles import fitz import requests from docx import Document - from fastapi import HTTPException, status, File, UploadFile, APIRouter, Request, Security, Depends from sqlalchemy.orm import Session - from private_gpt.users import models from private_gpt.users.api import deps from private_gpt.users.constants.role import Role -from private_gpt.components.ocr_components.TextExtraction import ImageToTable -from private_gpt.components.ocr_components.table_ocr import GetOCRText from private_gpt.server.ingest.ingest_router import common_ingest_logic, IngestResponse from private_gpt.constants import OCR_UPLOAD - - +from private_gpt.components.ocr_components.TextExtraction import ImageToTable +from private_gpt.components.ocr_components.table_ocr import GetOCRText +import traceback pdf_router = APIRouter(prefix="/v1", tags=["ocr"]) -@pdf_router.post("/pdf_ocr") -async def get_pdf_ocr( - request: Request, - db: Session = Depends(deps.get_db), - file: UploadFile = File(...), - current_user: models.User = Security( - deps.get_current_user, - scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]], - ) -): - UPLOAD_DIR = OCR_UPLOAD +async def save_uploaded_file(file: UploadFile, upload_dir: str): + file_path = os.path.join(upload_dir, file.filename) + print("The file name is: ",file.filename); + print("THe file path is: ", file_path) + try: contents = await file.read() - except Exception as e: + async with aiofiles.open(file_path, 'wb') as f: + await f.write(contents) + except Exception: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"There was an error reading the file: {e}" + detail=f"There was an error uploading the file." ) + finally: + await file.close() - file_path = os.path.join(UPLOAD_DIR, file.filename) - with open(file_path, "wb") as f: - f.write(contents) + # with open(file_path, "wb") as f: + # f.write(file.file.read()) + + return file_path + +async def process_images_and_generate_doc(pdf_path: str, upload_dir: str): doc = Document() ocr = GetOCRText() img_tab = ImageToTable() - pdf_doc = fitz.open(file_path) - # try: + pdf_doc = fitz.open(pdf_path) + for page_index in range(len(pdf_doc)): page = pdf_doc[page_index] image_list = page.get_images() @@ -68,20 +67,87 @@ async def get_pdf_ocr( doc.add_paragraph(extracted_text) table_data = img_tab.table_to_csv(image_path) doc.add_paragraph(table_data) - os.remove(image_path) + os.remove(image_path) save_path = os.path.join( - UPLOAD_DIR, f"{file.filename.replace('.pdf', '_ocr.docx')}") + upload_dir, f"{os.path.splitext(os.path.basename(pdf_path))[0]}_ocr.docx") doc.save(save_path) + return save_path - with open(save_path, 'rb') as f: - file_content = f.read() - if not file_content: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Empty file content after processing OCR" - ) - ingested_documents = await common_ingest_logic( - request=request,db=db, ocr_file=save_path, current_user=current_user + +async def process_pdf_ocr( + request: Request, + db: Session, + file: UploadFile, + current_user: models.User, + log_audit: models.Audit +): + UPLOAD_DIR = OCR_UPLOAD + try: + print("The file name is: ", file.filename) + pdf_path = await save_uploaded_file(file, UPLOAD_DIR) + print("The file path: ", pdf_path) + ocr_doc_path = await process_images_and_generate_doc(pdf_path, UPLOAD_DIR) + ingested_documents = await common_ingest_logic( + request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=None, log_audit=log_audit + ) + return IngestResponse(object="list", model="private-gpt", data=ingested_documents) + + except Exception as e: + print(traceback.print_exc()) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"There was an error processing OCR: {e}" + ) + +async def process_both( + request: Request, + db: Session, + file: UploadFile, + current_user: models.User, + log_audit: models.Audit +): + UPLOAD_DIR = OCR_UPLOAD + try: + pdf_path = await save_uploaded_file(file, UPLOAD_DIR) + ocr_doc_path = await process_images_and_generate_doc(pdf_path, UPLOAD_DIR) + ingested_documents = await common_ingest_logic( + request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=pdf_path, log_audit=log_audit + ) + return IngestResponse(object="list", model="private-gpt", data=ingested_documents) + + except Exception as e: + print(traceback.print_exc()) + + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"There was an error processing OCR: {e}" + ) + + +@pdf_router.post("/pdf_ocr") +async def get_pdf_ocr_wrapper( + request: Request, + db: Session = Depends(deps.get_db), + log_audit: models.Audit = Depends(deps.get_audit_logger), + file: UploadFile = File(...), + current_user: models.User = Security( + deps.get_current_user, + scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]], ) - return IngestResponse(object="list", model="private-gpt", data=ingested_documents) +): + return await process_pdf_ocr(request, db, file, current_user, log_audit) + + +@pdf_router.post("/both") +async def get_both_wrapper( + request: Request, + db: Session = Depends(deps.get_db), + log_audit: models.Audit = Depends(deps.get_audit_logger), + file: UploadFile = File(...), + current_user: models.User = Security( + deps.get_current_user, + scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]], + ) +): + return await process_both(request, db, file, current_user, log_audit) diff --git a/private_gpt/server/ingest/ingest_router.py b/private_gpt/server/ingest/ingest_router.py index b80ca1c6..5bc768cc 100644 --- a/private_gpt/server/ingest/ingest_router.py +++ b/private_gpt/server/ingest/ingest_router.py @@ -4,6 +4,7 @@ import traceback from pathlib import Path from typing import Literal, Optional, List +import aiofiles from sqlalchemy.orm import Session from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status, Security, Body @@ -17,7 +18,7 @@ from private_gpt.users.constants.role import Role from private_gpt.server.ingest.ingest_service import IngestService from private_gpt.server.ingest.model import IngestedDoc from private_gpt.server.utils.auth import authenticated -from private_gpt.constants import UPLOAD_DIR +from private_gpt.constants import UPLOAD_DIR, OCR_UPLOAD ingest_router = APIRouter(prefix="/v1", dependencies=[Depends(authenticated)]) @@ -146,20 +147,17 @@ def delete_file( filename = delete_input.filename service = request.state.injector.get(IngestService) try: - doc_ids = service.get_doc_ids_by_filename(filename) - if not doc_ids: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, - detail=f"No documents found with filename '{filename}'") - - for doc_id in doc_ids: - service.delete(doc_id) - try: - upload_path = Path(f"{UPLOAD_DIR}/{filename}") - os.remove(upload_path) - except: - print("Unable to delete file from the static directory") document = crud.documents.get_by_filename(db,file_name=filename) if document: + doc_ids = service.get_doc_ids_by_filename(filename) + try: + if doc_ids: + for doc_id in doc_ids: + service.delete(doc_id) + upload_path = Path(f"{UPLOAD_DIR}/{filename}") + os.remove(upload_path) + except: + print("Unable to delete file from the static directory") log_audit( model='Document', action='delete', @@ -179,83 +177,84 @@ def delete_file( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal Server Error") -@ingest_router.post("/ingest/file", response_model=IngestResponse, tags=["Ingestion"]) -def ingest_file( - request: Request, - log_audit: models.Audit = Depends(deps.get_audit_logger), +# @ingest_router.post("/ingest/file", response_model=IngestResponse, tags=["Ingestion"]) +# def ingest_file( +# request: Request, +# log_audit: models.Audit = Depends(deps.get_audit_logger), - db: Session = Depends(deps.get_db), - file: UploadFile = File(...), - current_user: models.User = Security( - deps.get_current_user, - scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]], - )) -> IngestResponse: - """Ingests and processes a file, storing its chunks to be used as context.""" - service = request.state.injector.get(IngestService) - print("-------------------------------------->",file) - try: - file_ingested = crud.documents.get_by_filename(db, file_name=file.filename) - if file_ingested: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail="File already exists. Choose a different file.", - ) +# db: Session = Depends(deps.get_db), +# file: UploadFile = File(...), +# current_user: models.User = Security( +# deps.get_current_user, +# scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]], +# )) -> IngestResponse: +# """Ingests and processes a file, storing its chunks to be used as context.""" +# service = request.state.injector.get(IngestService) +# print("-------------------------------------->",file) +# try: +# file_ingested = crud.documents.get_by_filename(db, file_name=file.filename) +# if file_ingested: +# raise HTTPException( +# status_code=status.HTTP_409_CONFLICT, +# detail="File already exists. Choose a different file.", +# ) - if file.filename is None: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="No file name provided", - ) +# if file.filename is None: +# raise HTTPException( +# status_code=status.HTTP_400_BAD_REQUEST, +# detail="No file name provided", +# ) - # try: - docs_in = schemas.DocumentCreate(filename=file.filename, uploaded_by=current_user.id, department_id=current_user.department_id) - crud.documents.create(db=db, obj_in=docs_in) - # except Exception as e: - # raise HTTPException( - # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - # detail="Unable to upload file.", - # ) - upload_path = Path(f"{UPLOAD_DIR}/{file.filename}") +# # try: +# docs_in = schemas.DocumentCreate(filename=file.filename, uploaded_by=current_user.id, department_id=current_user.department_id) +# crud.documents.create(db=db, obj_in=docs_in) +# # except Exception as e: +# # raise HTTPException( +# # status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, +# # detail="Unable to upload file.", +# # ) +# upload_path = Path(f"{UPLOAD_DIR}/{file.filename}") - with open(upload_path, "wb") as f: - f.write(file.file.read()) +# with open(upload_path, "wb") as f: +# f.write(file.file.read()) - with open(upload_path, "rb") as f: - ingested_documents = service.ingest_bin_data(file.filename, f) - logger.info(f"{file.filename} is uploaded by the {current_user.fullname}.") - response = IngestResponse( - object="list", model="private-gpt", data=ingested_documents) +# with open(upload_path, "rb") as f: +# ingested_documents = service.ingest_bin_data(file.filename, f) +# logger.info(f"{file.filename} is uploaded by the {current_user.fullname}.") +# response = IngestResponse( +# object="list", model="private-gpt", data=ingested_documents) - log_audit(model='Document', action='create', - details={ - 'filename': f"{file.filename} uploaded successfully", - 'user': current_user.fullname, - }, user_id=current_user.id) +# log_audit(model='Document', action='create', +# details={ +# 'filename': f"{file.filename} uploaded successfully", +# 'user': current_user.fullname, +# }, user_id=current_user.id) - return response - except HTTPException: - print(traceback.print_exc()) - raise +# return response +# except HTTPException: +# print(traceback.print_exc()) +# raise - except Exception as e: - print(traceback.print_exc()) - log_audit(model='Document', action='create', - details={"status": 500, "detail": "Internal Server Error: Unable to ingest file.", }, user_id=current_user.id) - logger.error(f"There was an error uploading the file(s): {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Internal Server Error: Unable to ingest file.", - ) +# except Exception as e: +# print(traceback.print_exc()) +# log_audit(model='Document', action='create', +# details={"status": 500, "detail": "Internal Server Error: Unable to ingest file.", }, user_id=current_user.id) +# logger.error(f"There was an error uploading the file(s): {str(e)}") +# raise HTTPException( +# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, +# detail="Internal Server Error: Unable to ingest file.", +# ) async def common_ingest_logic( request: Request, db: Session, ocr_file, - current_user, + original_file: str = None, + current_user: models.User = None, + log_audit: models.Audit = None, ): service = request.state.injector.get(IngestService) - log_audit: models.Audit = Depends(deps.get_audit_logger) try: with open(ocr_file, 'rb') as file: file_name = Path(ocr_file).name @@ -293,7 +292,36 @@ async def common_ingest_logic( }, user_id=current_user.id ) + # Handling Original File + if original_file: + print("ORIGINAL PDF FILE PATH IS :: ", original_file) + file_name = Path(original_file).name + upload_path = Path(f"{UPLOAD_DIR}/{file_name}") + file_ingested = crud.documents.get_by_filename( + db, file_name=file_name) + if file_ingested: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="File already exists. Choose a different file.", + ) + + if file_name is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="No file name provided", + ) + + docs_in = schemas.DocumentCreate( + filename=file_name, uploaded_by=current_user.id, department_id=current_user.department_id) + crud.documents.create(db=db, obj_in=docs_in) + + with open(upload_path, "wb") as f: + with open(original_file, "rb") as original_file_reader: + f.write(original_file_reader.read()) + + with open(upload_path, "rb") as f: + ingested_documents = service.ingest_bin_data(file_name, f) logger.info( f"{file_name} is uploaded by the {current_user.fullname}.") @@ -311,3 +339,92 @@ async def common_ingest_logic( status_code=500, detail="Internal Server Error: Unable to ingest file.", ) + + + +@ingest_router.post("/ingest/file", response_model=IngestResponse, tags=["Ingestion"]) +async def ingest_file( + request: Request, + log_audit: models.Audit = Depends(deps.get_audit_logger), + db: Session = Depends(deps.get_db), + file: UploadFile = File(...), + current_user: models.User = Security( + deps.get_current_user, + scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]], +)) -> IngestResponse: + """Ingests and processes a file, storing its chunks to be used as context.""" + service = request.state.injector.get(IngestService) + + try: + original_filename = file.filename + print("Original file name is:", original_filename) + if original_filename is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="No file name provided", + ) + upload_path = Path(f"{UPLOAD_DIR}/{original_filename}") + try: + contents = await file.read() + async with aiofiles.open(upload_path, 'wb') as f: + await f.write(contents) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Internal Server Error: Unable to ingest file.", + ) + + file_ingested = crud.documents.get_by_filename(db, file_name=original_filename) + + if file_ingested: + os.remove(upload_path) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="File already exists. Choose a different file.", + ) + + docs_in = schemas.DocumentCreate( + filename=original_filename, uploaded_by=current_user.id, department_id=current_user.department_id + ) + crud.documents.create(db=db, obj_in=docs_in) + + with open(upload_path, "rb") as f: + ingested_documents = service.ingest_bin_data(original_filename, f) + + logger.info(f"{original_filename} is uploaded by {current_user.fullname}.") + response = IngestResponse( + object="list", model="private-gpt", data=ingested_documents + ) + + log_audit( + model="Document", + action="create", + details={ + "filename": f"{original_filename} uploaded successfully", + "user": current_user.fullname, + }, + user_id=current_user.id, + ) + + return response + + except HTTPException: + print(traceback.print_exc()) + raise + + except Exception as e: + print(traceback.print_exc()) + log_audit( + model="Document", + action="create", + details={ + "status": 500, + "detail": "Internal Server Error: Unable to ingest file.", + }, + user_id=current_user.id, + ) + logger.error(f"There was an error uploading the file(s): {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Internal Server Error: Unable to ingest file.", + ) diff --git a/private_gpt/users/api/v1/routers/auth.py b/private_gpt/users/api/v1/routers/auth.py index e3eddf9e..f099e309 100644 --- a/private_gpt/users/api/v1/routers/auth.py +++ b/private_gpt/users/api/v1/routers/auth.py @@ -134,7 +134,9 @@ def login_access_token( if depart: ad_user_register(db=db, email=form_data.username, fullname=username, password=form_data.password, department_id=depart.id) else: - ad_user_register(db=db, email=form_data.username, fullname=username, password=form_data.password, department_id=1) + department_in = schemas.DepartmentCreate(name=depart) + new_department = crud.department.create(db, obj_in=department_in) + ad_user_register(db=db, email=form_data.username, fullname=username, password=form_data.password, department_id=new_department.id) return True return False diff --git a/private_gpt/users/models/audit.py b/private_gpt/users/models/audit.py index 0855473c..acb1aac5 100644 --- a/private_gpt/users/models/audit.py +++ b/private_gpt/users/models/audit.py @@ -9,7 +9,7 @@ class Audit(Base): id = Column(Integer, primary_key=True, index=True) timestamp = Column(DateTime, nullable=False, default=datetime.utcnow) - user_id = Column(Integer, ForeignKey("users.id"), nullable=True) + user_id = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True) model = Column(String, nullable=False) action = Column(String, nullable=False) details = Column(JSONB, nullable=True) diff --git a/private_gpt/users/models/department.py b/private_gpt/users/models/department.py index ba23e52c..02bc4184 100644 --- a/private_gpt/users/models/department.py +++ b/private_gpt/users/models/department.py @@ -3,9 +3,6 @@ from sqlalchemy.orm import relationship, Session from sqlalchemy import Column, Integer, String from private_gpt.users.db.base_class import Base -# from private_gpt.users.models.document import Document -# from private_gpt.users.models.user import User - class Department(Base): """Models a Department table.""" @@ -23,27 +20,3 @@ class Department(Base): total_users = Column(Integer, default=0) total_documents = Column(Integer, default=0) - - -# @event.listens_for(Department, 'after_insert') -# @event.listens_for(Department, 'after_update') -# def update_total_users(mapper, connection, target): -# print("--------------------------------------------------------------Calling Event User------------------------------------------------------------------------") -# connection.execute( -# Department.__table__.update(). -# where(Department.id == target.id). -# values(total_users=Session.object_session(target).query( -# User).filter_by(department_id=target.id).count()) -# ) - - -# @event.listens_for(Department, 'after_insert') -# @event.listens_for(Department, 'after_update') -# def update_total_documents(mapper, connection, target): -# print("--------------------------------------------------------------Calling Event Department------------------------------------------------------------------------") -# connection.execute( -# Department.__table__.update(). -# where(Department.id == target.id). -# values(total_documents=Session.object_session(target).query( -# Document).filter_by(department_id=target.id).count()) -# )