Documents assignment with departments

This commit is contained in:
Saurab-Shrestha 2024-03-07 13:23:59 +05:45
parent 6818cba858
commit f011bb6a7a
17 changed files with 253 additions and 148 deletions

View File

@ -14,8 +14,10 @@ from private_gpt.users.models.role import Role
from private_gpt.users.models.user_role import UserRole
from private_gpt.users.models.subscription import Subscription
from private_gpt.users.models.company import Company
from private_gpt.users.models.document import Document
from private_gpt.users.models.department import Department
from private_gpt.users.models.document_department import document_department_association
from private_gpt.users.models.audit import Audit
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.

View File

@ -0,0 +1,30 @@
"""Documents association
Revision ID: 2f490371bf6c
Revises: f2978211af18
Create Date: 2024-03-06 17:17:54.701414
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '2f490371bf6c'
down_revision: Union[str, None] = 'f2978211af18'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# op.create_unique_constraint('unique_user_role', 'user_roles', ['user_id', 'role_id', 'company_id'])
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# op.drop_constraint('unique_user_role', 'user_roles', type_='unique')
# ### end Alembic commands ###

View File

@ -0,0 +1,41 @@
"""Documents association with multiple departments
Revision ID: f2978211af18
Revises: caa694775d4e
Create Date: 2024-03-06 16:43:16.492578
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'f2978211af18'
down_revision: Union[str, None] = 'caa694775d4e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('document_department_association',
sa.Column('department_id', sa.Integer(), nullable=True),
sa.Column('document_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['department_id'], ['departments.id'], ),
sa.ForeignKeyConstraint(['document_id'], ['document.id'], )
)
op.drop_constraint('document_department_id_fkey', 'document', type_='foreignkey')
op.drop_column('document', 'department_id')
# op.create_unique_constraint('unique_user_role', 'user_roles', ['user_id', 'role_id', 'company_id'])
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# op.drop_constraint('unique_user_role', 'user_roles', type_='unique')
op.add_column('document', sa.Column('department_id', sa.INTEGER(), autoincrement=False, nullable=False))
op.create_foreign_key('document_department_id_fkey', 'document', 'departments', ['department_id'], ['id'])
op.drop_table('document_department_association')
# ### end Alembic commands ###

View File

@ -1,26 +1,24 @@
import os
import aiofiles
import fitz
import aiofiles
import requests
from docx import Document
from fastapi import HTTPException, status, File, UploadFile, APIRouter, Request, Security, Depends
from sqlalchemy.orm import Session
from private_gpt.users import models
from private_gpt.users.api import deps
from private_gpt.users.constants.role import Role
from private_gpt.server.ingest.ingest_router import common_ingest_logic, IngestResponse
from private_gpt.constants import OCR_UPLOAD
from private_gpt.components.ocr_components.TextExtraction import ImageToTable
from private_gpt.components.ocr_components.table_ocr import GetOCRText
import traceback
from docx import Document
from sqlalchemy.orm import Session
from fastapi import HTTPException, status, File, UploadFile, APIRouter, Request, Security, Depends
from private_gpt.users.api import deps
from private_gpt.constants import OCR_UPLOAD
from private_gpt.users import models, schemas
from private_gpt.users.constants.role import Role
from private_gpt.components.ocr_components.table_ocr import GetOCRText
from private_gpt.components.ocr_components.TextExtraction import ImageToTable
from private_gpt.server.ingest.ingest_router import common_ingest_logic, IngestResponse
pdf_router = APIRouter(prefix="/v1", tags=["ocr"])
async def save_uploaded_file(file: UploadFile, upload_dir: str):
file_path = os.path.join(upload_dir, file.filename)
print("The file name is: ",file.filename);
print("THe file path is: ", file_path)
try:
contents = await file.read()
async with aiofiles.open(file_path, 'wb') as f:
@ -31,11 +29,7 @@ async def save_uploaded_file(file: UploadFile, upload_dir: str):
detail=f"There was an error uploading the file."
)
finally:
await file.close()
# with open(file_path, "wb") as f:
# f.write(file.file.read())
await file.close()
return file_path
@ -44,8 +38,6 @@ async def process_images_and_generate_doc(request: Request, pdf_path: str, uploa
ocr = request.state.injector.get(GetOCRText)
img_tab = request.state.injector.get(ImageToTable)
# ocr = GetOCRText()
# img_tab = ImageToTable()
pdf_doc = fitz.open(pdf_path)
for page_index in range(len(pdf_doc)):
@ -83,7 +75,8 @@ async def process_pdf_ocr(
db: Session,
file: UploadFile,
current_user: models.User,
log_audit: models.Audit
log_audit: models.Audit,
departments: schemas.DepartmentList = Depends()
):
UPLOAD_DIR = OCR_UPLOAD
try:
@ -92,7 +85,7 @@ async def process_pdf_ocr(
print("The file path: ", pdf_path)
ocr_doc_path = await process_images_and_generate_doc(request, pdf_path, UPLOAD_DIR)
ingested_documents = await common_ingest_logic(
request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=None, log_audit=log_audit
request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=None, log_audit=log_audit, departments=departments
)
return IngestResponse(object="list", model="private-gpt", data=ingested_documents)
@ -108,14 +101,15 @@ async def process_both(
db: Session,
file: UploadFile,
current_user: models.User,
log_audit: models.Audit
log_audit: models.Audit,
departments: schemas.DepartmentList = Depends()
):
UPLOAD_DIR = OCR_UPLOAD
try:
pdf_path = await save_uploaded_file(file, UPLOAD_DIR)
ocr_doc_path = await process_images_and_generate_doc(request, pdf_path, UPLOAD_DIR)
ingested_documents = await common_ingest_logic(
request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=pdf_path, log_audit=log_audit
request=request, db=db, ocr_file=ocr_doc_path, current_user=current_user, original_file=pdf_path, log_audit=log_audit, departments=departments
)
return IngestResponse(object="list", model="private-gpt", data=ingested_documents)
@ -131,6 +125,7 @@ async def process_both(
@pdf_router.post("/pdf_ocr")
async def get_pdf_ocr_wrapper(
request: Request,
departments: schemas.DepartmentList = Depends(),
db: Session = Depends(deps.get_db),
log_audit: models.Audit = Depends(deps.get_audit_logger),
file: UploadFile = File(...),
@ -139,12 +134,13 @@ async def get_pdf_ocr_wrapper(
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
)
):
return await process_pdf_ocr(request, db, file, current_user, log_audit)
return await process_pdf_ocr(request, db, file, current_user, log_audit, departments)
@pdf_router.post("/both")
async def get_both_wrapper(
request: Request,
departments: schemas.DepartmentList = Depends(),
db: Session = Depends(deps.get_db),
log_audit: models.Audit = Depends(deps.get_audit_logger),
file: UploadFile = File(...),
@ -153,4 +149,4 @@ async def get_both_wrapper(
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
)
):
return await process_both(request, db, file, current_user, log_audit)
return await process_both(request, db, file, current_user, log_audit, departments)

View File

@ -3,11 +3,11 @@ import logging
import traceback
from pathlib import Path
from typing import Literal, Optional, List
from typing import Literal
import aiofiles
from sqlalchemy.orm import Session
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status, Security, Body
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status, Security, Body, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
@ -136,7 +136,7 @@ def delete_file(
db: Session = Depends(deps.get_db),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"], Role.OPERATOR["name"]],
)) -> dict:
"""Delete the specified filename.
@ -168,6 +168,9 @@ def delete_file(
user_id=current_user.id
)
crud.documents.remove(db=db, id=document.id)
db.execute(models.document_department_association.delete().where(
models.document_department_association.c.document_id == document.id
))
return {"status": "SUCCESS", "message": f"{filename}' deleted successfully."}
except Exception as e:
print(traceback.print_exc())
@ -177,12 +180,53 @@ def delete_file(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal Server Error")
async def create_documents(
db: Session,
file_name: str = None,
current_user: models.User = None,
departments: schemas.DepartmentList = Depends(),
log_audit: models.Audit = None,
):
"""
Create documents in the `Document` table and update the \n
`Document Department Association` table with the departments ids for the documents.
"""
department_ids = departments.departments_ids
print("Department IDS: ", department_ids)
file_ingested = crud.documents.get_by_filename(
db, file_name=file_name)
if file_ingested:
raise HTTPException(
status_code=409,
detail="File already exists. Choose a different file.",
)
docs_in = schemas.DocumentCreate(
filename=file_name, uploaded_by=current_user.id
)
document = crud.documents.create(db=db, obj_in=docs_in)
department_ids = [int(number) for number in department_ids.split(",")]
for department_id in department_ids:
db.execute(models.document_department_association.insert().values(document_id=document.id, department_id=department_id))
log_audit(
model='Document',
action='create',
details={
'detail': f"{file_name} uploaded successfully",
'user': f"{current_user.fullname}",
'departments': f"{department_ids}"
},
user_id=current_user.id
)
return document
async def common_ingest_logic(
request: Request,
db: Session,
ocr_file,
original_file: str = None,
current_user: models.User = None,
departments: schemas.DepartmentList = Depends(),
log_audit: models.Audit = None,
):
service = request.state.injector.get(IngestService)
@ -191,38 +235,18 @@ async def common_ingest_logic(
file_name = Path(ocr_file).name
upload_path = Path(f"{UPLOAD_DIR}/{file_name}")
file_ingested = crud.documents.get_by_filename(
db, file_name=file_name)
if file_ingested:
raise HTTPException(
status_code=409,
detail="File already exists. Choose a different file.",
)
if file_name is None:
raise HTTPException(
status_code=400,
detail="No file name provided",
)
docs_in = schemas.DocumentCreate(
filename=file_name, uploaded_by=current_user.id, department_id=current_user.department_id)
crud.documents.create(db=db, obj_in=docs_in)
await create_documents(db, file_name, current_user, departments, log_audit)
with open(upload_path, "wb") as f:
f.write(file.read())
file.seek(0)
ingested_documents = service.ingest_bin_data(file_name, file)
log_audit(
model='Document',
action='create',
details={
'detail': f"{file_name} uploaded successfully",
'user': f"{current_user.fullname}"
},
user_id=current_user.id
)
# Handling Original File
if original_file:
try:
@ -230,23 +254,13 @@ async def common_ingest_logic(
file_name = Path(original_file).name
upload_path = Path(f"{UPLOAD_DIR}/{file_name}")
file_ingested = crud.documents.get_by_filename(
db, file_name=file_name)
if file_ingested:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="File already exists. Choose a different file.",
)
if file_name is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No file name provided",
)
docs_in = schemas.DocumentCreate(
filename=file_name, uploaded_by=current_user.id, department_id=current_user.department_id)
crud.documents.create(db=db, obj_in=docs_in)
await create_documents(db, file_name, current_user, departments, log_audit)
with open(upload_path, "wb") as f:
with open(original_file, "rb") as original_file_reader:
@ -280,20 +294,19 @@ async def common_ingest_logic(
)
@ingest_router.post("/ingest/file", response_model=IngestResponse, tags=["Ingestion"])
async def ingest_file(
request: Request,
departments: schemas.DepartmentList = Depends(),
file: UploadFile = File(...),
log_audit: models.Audit = Depends(deps.get_audit_logger),
db: Session = Depends(deps.get_db),
file: UploadFile = File(...),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"], Role.OPERATOR["name"]],
)) -> IngestResponse:
"""Ingests and processes a file, storing its chunks to be used as context."""
service = request.state.injector.get(IngestService)
try:
original_filename = file.filename
print("Original file name is:", original_filename)
@ -313,38 +326,14 @@ async def ingest_file(
detail="Internal Server Error: Unable to ingest file.",
)
file_ingested = crud.documents.get_by_filename(db, file_name=original_filename)
if file_ingested:
os.remove(upload_path)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="File already exists. Choose a different file.",
)
docs_in = schemas.DocumentCreate(
filename=original_filename, uploaded_by=current_user.id, department_id=current_user.department_id
)
crud.documents.create(db=db, obj_in=docs_in)
await create_documents(db, original_filename, current_user, departments, log_audit)
with open(upload_path, "rb") as f:
ingested_documents = service.ingest_bin_data(original_filename, f)
logger.info(f"{original_filename} is uploaded by {current_user.fullname}.")
logger.info(f"{original_filename} is uploaded by {current_user.fullname} in {departments.departments_ids}")
response = IngestResponse(
object="list", model="private-gpt", data=ingested_documents
)
log_audit(
model="Document",
action="create",
details={
"filename": f"{original_filename} uploaded successfully",
"user": current_user.fullname,
},
user_id=current_user.id,
)
return response
except HTTPException:

View File

@ -98,7 +98,6 @@ def ad_user_register(
Register a new user in the database. Company id is directly given here.
"""
user_in = schemas.UserCreate(email=email, password=password, fullname=fullname, company_id=1, department_id=department_id)
print("user is: ", user_in)
user = crud.user.create(db, obj_in=user_in)
user_role_name = Role.GUEST["name"]
company = crud.company.get(db, 1)
@ -190,8 +189,12 @@ def login_access_token(
"user": token_payload,
"token_type": "bearer",
}
log_audit(model='User', action='login',
details=token_payload, user_id=user.id)
log_audit(
model='User',
action='login',
details=token_payload,
user_id=user.id
)
return JSONResponse(content=response_dict)
@ -244,8 +247,12 @@ def register(
existing_user = crud.user.get_by_email(db, email=email)
if existing_user:
log_audit(model='User', action='creation',
details={"status": '409', 'detail': "The user with this email already exists!", }, user_id=current_user.id)
log_audit(
model='User',
action='creation',
details={"status": '409', 'detail': "The user with this email already exists!", },
user_id=current_user.id
)
raise HTTPException(
status_code=409,
detail="The user with this email already exists!",

View File

@ -24,7 +24,7 @@ def list_files(
limit: int = 100,
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.SUPER_ADMIN["name"], Role.ADMIN["name"]],
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"], Role.OPERATOR["name"]],
)
):
def get_department_name(db, id):
@ -34,21 +34,23 @@ def list_files(
def get_username(db, id):
user = crud.user.get_by_id(db=db, id=id)
return user.fullname
try:
role = current_user.user_role.role.name if current_user.user_role else None
if role == "SUPER_ADMIN":
if (role == "SUPER_ADMIN") or (role == "OPERATOR"):
docs = crud.documents.get_multi(db, skip=skip, limit=limit)
else:
docs = crud.documents.get_multi_documents(
db, department_id=current_user.department_id, skip=skip, limit=limit)
docs = [
schemas.Document(
id=doc.id,
filename=doc.filename,
uploaded_at=doc.uploaded_at,
uploaded_by=get_username(db, doc.uploaded_by),
department=get_department_name(db, doc.department_id)
# department=get_department_name(db, doc.department_id)
department="deparments"
)
for doc in docs
]

View File

@ -20,6 +20,7 @@ def assign_user_role(
deps.get_current_user,
scopes=[
Role.SUPER_ADMIN["name"],
Role.OPERATOR["name"],
],
),
) -> Any:
@ -49,6 +50,7 @@ def update_user_role(
deps.get_current_user,
scopes=[
Role.SUPER_ADMIN["name"],
Role.OPERATOR["name"],
],
),
) -> Any:

View File

@ -44,7 +44,7 @@ def read_users(
db: Session = Depends(deps.get_db),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"], Role.OPERATOR["name"]],
),
) -> Any:
"""
@ -219,7 +219,7 @@ def read_user_by_id(
user_id: int,
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"], Role.OPERATOR["name"]],
),
db: Session = Depends(deps.get_db),
) -> Any:
@ -299,7 +299,7 @@ def admin_change_password(
new_password: str = Body(..., embed=True),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"], Role.OPERATOR["name"]],
),
) -> Any:
"""
@ -402,7 +402,7 @@ def admin_update_user(
)
role = crud.role.get_by_name(db, name=user_update.role)
if role.id == 1:
if (role.id == 1) or (role.id == 4) :
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Cannot create SUPER ADMIN!",

View File

@ -5,13 +5,18 @@ class Role:
GUEST = {
"name": "GUEST",
"description": "A Guest Account",
"description": "General users",
}
ADMIN = {
"name": "ADMIN",
"description": "Admin of Application Ecosystem",
"description": "Admin of Department Ecosystem",
}
SUPER_ADMIN = {
"name": "SUPER_ADMIN",
"description": "Super Administrator of Application Ecosystem",
}
}
OPERATOR = {
"name": "OPERATOR",
"description": "Document Administrator",
}

View File

@ -5,4 +5,5 @@ from .role import Role
from .document import Document
from .subscription import Subscription
from .department import Department
from .audit import Audit
from .audit import Audit
from .document_department import document_department_association

View File

@ -1,8 +1,10 @@
from sqlalchemy import ForeignKey, event
from sqlalchemy.orm import relationship, Session
from sqlalchemy import Column, Integer, String
from sqlalchemy import Column, Integer, String, Table
from private_gpt.users.db.base_class import Base
from private_gpt.users.models.document_department import document_department_association
class Department(Base):
"""Models a Department table."""
@ -16,7 +18,8 @@ class Department(Base):
company = relationship("Company", back_populates="departments")
users = relationship("User", back_populates="department")
documents = relationship("Document", back_populates="department")
documents = relationship("Document", secondary=document_department_association, back_populates="departments")
total_users = Column(Integer, default=0)
total_documents = Column(Integer, default=0)

View File

@ -1,16 +1,17 @@
from private_gpt.users.db.base_class import Base
from datetime import datetime
from sqlalchemy import event, select, func, update
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Table
from private_gpt.users.models.department import Department
from private_gpt.users.db.base_class import Base
from private_gpt.users.models.document_department import document_department_association
class Document(Base):
"""Models a document table"""
__tablename__ = "document"
__tablename__ = "document"
id = Column(Integer, primary_key=True, index=True)
filename = Column(String(225), nullable=False, unique=True)
uploaded_by = Column(
@ -25,30 +26,31 @@ class Document(Base):
)
uploaded_by_user = relationship(
"User", back_populates="uploaded_documents")
department_id = Column(Integer, ForeignKey(
"departments.id"), nullable=False)
department = relationship("Department", back_populates="documents")
# Use document_department_association as the secondary for the relationship
departments = relationship(
"Department",
secondary=document_department_association,
back_populates="documents"
)
# Event listeners for updating total_documents in Department
@event.listens_for(Document, 'after_insert')
@event.listens_for(Document, 'after_delete')
def update_total_documents(mapper, connection, target):
department_id = target.department_id
print(f"Department ID is: {department_id}")
# Use SQLAlchemy's ORM constructs for better readability and maintainability:
total_documents = connection.execute(
select([func.count()]).select_from(Document).where(
Document.department_id == department_id)
select([func.count()]).select_from(document_department_association).where(
document_department_association.c.document_id == target.id)
).scalar()
print(f"Total documents is: {total_documents}")
print("Updating total documents")
department_ids = [assoc.department_id for assoc in connection.execute(
select([document_department_association.c.department_id]).where(
document_department_association.c.document_id == target.id)
)]
# Use the correct update construct for SQLAlchemy:
connection.execute(
update(Department).values(total_documents=total_documents).where(
Department.id == department_id)
)
# Update total_documents for each associated department
for department_id in department_ids:
connection.execute(
update(Department).values(total_documents=total_documents).where(
Department.id == department_id)
)

View File

@ -0,0 +1,10 @@
from private_gpt.users.db.base_class import Base
from sqlalchemy import Column, Integer, String, Table, ForeignKey
document_department_association = Table(
"document_department_association",
Base.metadata,
Column("department_id", Integer, ForeignKey("departments.id")),
Column("document_id", Integer, ForeignKey("document.id")),
extend_existing=True
)

View File

@ -5,5 +5,5 @@ from .user_role import UserRole, UserRoleCreate, UserRoleInDB, UserRoleUpdate
from .subscription import Subscription, SubscriptionBase, SubscriptionCreate, SubscriptionUpdate
from .company import Company, CompanyBase, CompanyCreate, CompanyUpdate
from .documents import Document, DocumentCreate, DocumentsBase, DocumentUpdate, DocumentList
from .department import Department, DepartmentCreate, DepartmentUpdate, DepartmentAdminCreate, DepartmentDelete
from .department import Department, DepartmentCreate, DepartmentUpdate, DepartmentAdminCreate, DepartmentDelete, DepartmentList
from .audit import AuditBase, AuditCreate, AuditUpdate, Audit, GetAudit

View File

@ -1,5 +1,6 @@
from typing import List, Optional
from pydantic import BaseModel
from pydantic import BaseModel, EmailStr
from fastapi import Form
class DepartmentBase(BaseModel):
@ -13,11 +14,11 @@ class DepartmentCreate(DepartmentBase):
class DepartmentUpdate(DepartmentBase):
id: int
class DepartmentDelete(BaseModel):
id: int
class DepartmentInDB(DepartmentBase):
id: int
company_id: int
@ -27,16 +28,28 @@ class DepartmentInDB(DepartmentBase):
class Config:
orm_mode = True
class DepartmentAdminCreate(DepartmentBase):
company_id: int
class Config:
orm_mode = True
class Department(BaseModel):
class DepartmentList(DepartmentBase):
id: int
name: str
total_users: Optional[int]
total_documents: Optional[int]
class Department(DepartmentBase):
id: int
total_users: Optional[int]
total_documents: Optional[int]
class Config:
orm_mode = True
class DepartmentList(BaseModel):
departments_ids: str = Form(...)

View File

@ -1,31 +1,33 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
from typing import List
class DocumentsBase(BaseModel):
filename: str
class DepartmentList(BaseModel):
id: int
name: str
class DocumentCreate(DocumentsBase):
uploaded_by: int
department_id: int
class DocumentUpdate(DocumentsBase):
pass
class DocumentList(DocumentsBase):
id: int
uploaded_by: int
uploaded_at: datetime
departments: List[DepartmentList] = []
class Document(DocumentsBase):
id: int
uploaded_by: str
filename: str
uploaded_by: int
uploaded_at: datetime
department: str
departments: List[DepartmentList] = []
class Config:
orm_mode = True