mirror of
https://github.com/imartinez/privateGPT.git
synced 2025-07-16 08:26:20 +00:00
Event listener for updating total documents
This commit is contained in:
parent
bf135b1692
commit
8bc7fb0039
4
.env
4
.env
@ -12,8 +12,8 @@ SUPER_ADMIN_PASSWORD=supersecretpassword
|
|||||||
SUPER_ADMIN_ACCOUNT_NAME=superaccount
|
SUPER_ADMIN_ACCOUNT_NAME=superaccount
|
||||||
|
|
||||||
SECRET_KEY=ba9dc3f976cf8fb40519dcd152a8d7d21c0b7861d841711cdb2602be8e85fd7c
|
SECRET_KEY=ba9dc3f976cf8fb40519dcd152a8d7d21c0b7861d841711cdb2602be8e85fd7c
|
||||||
ACCESS_TOKEN_EXPIRE_MINUTES=60
|
ACCESS_TOKEN_EXPIRE_MINUTES=720
|
||||||
REFRESH_TOKEN_EXPIRE_MINUTES=120
|
REFRESH_TOKEN_EXPIRE_MINUTES=1400
|
||||||
|
|
||||||
SMTP_SERVER=mail.gibl.com.np
|
SMTP_SERVER=mail.gibl.com.np
|
||||||
SMTP_PORT=25
|
SMTP_PORT=25
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
"""Added chat items
|
"""Added chat item index
|
||||||
|
|
||||||
Revision ID: bdfec4101648
|
Revision ID: a715b89210b0
|
||||||
Revises:
|
Revises:
|
||||||
Create Date: 2024-04-09 15:55:47.769162
|
Create Date: 2024-04-15 16:10:00.510135
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Sequence, Union
|
from typing import Sequence, Union
|
||||||
@ -12,7 +12,7 @@ import sqlalchemy as sa
|
|||||||
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision: str = 'bdfec4101648'
|
revision: str = 'a715b89210b0'
|
||||||
down_revision: Union[str, None] = None
|
down_revision: Union[str, None] = None
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
@ -32,6 +32,7 @@ def upgrade() -> None:
|
|||||||
)
|
)
|
||||||
op.create_table('chat_items',
|
op.create_table('chat_items',
|
||||||
sa.Column('id', sa.Integer(), nullable=False),
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('index', sa.Integer(), nullable=False),
|
||||||
sa.Column('sender', sa.String(length=225), nullable=False),
|
sa.Column('sender', sa.String(length=225), nullable=False),
|
||||||
sa.Column('content', sa.JSON(), nullable=True),
|
sa.Column('content', sa.JSON(), nullable=True),
|
||||||
sa.Column('created_at', sa.DateTime(), nullable=True),
|
sa.Column('created_at', sa.DateTime(), nullable=True),
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Binary file not shown.
@ -1,10 +1,5 @@
|
|||||||
# start a fastapi server with uvicorn
|
# start a fastapi server with uvicorn
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
from fastapi.middleware import Middleware
|
|
||||||
from private_gpt.users.db.session import SessionLocal
|
|
||||||
from private_gpt.users.models import Audit, User, Department, Document
|
|
||||||
from private_gpt.users.api.deps import get_audit_logger, get_db
|
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
from private_gpt.main import app
|
from private_gpt.main import app
|
||||||
|
@ -51,7 +51,7 @@ def create_app(root_injector: Injector) -> FastAPI:
|
|||||||
allow_credentials=True,
|
allow_credentials=True,
|
||||||
allow_origins=["http://localhost:3000/", "http://10.1.101.125:80", "http://quickgpt.gibl.com.np:80", "http://127.0.0.1",
|
allow_origins=["http://localhost:3000/", "http://10.1.101.125:80", "http://quickgpt.gibl.com.np:80", "http://127.0.0.1",
|
||||||
"http://10.1.101.125", "http://quickgpt.gibl.com.np", "http://localhost:8000", "http://192.168.1.93", "http://192.168.1.93:88",
|
"http://10.1.101.125", "http://quickgpt.gibl.com.np", "http://localhost:8000", "http://192.168.1.93", "http://192.168.1.93:88",
|
||||||
"http://192.168.1.98", "http://192.168.1.98:5173", "http://localhost:3000","https://globaldocquery.gibl.com.np/", "http://127.0.0.1/", "http://localhost/",
|
"http://192.168.1.98", "http://192.168.1.98:3000", "http://localhost:3000","https://globaldocquery.gibl.com.np/", "http://127.0.0.1/", "http://localhost/",
|
||||||
"http://localhost:80", "http://192.168.1.131", 'http://192.168.1.131:3000'
|
"http://localhost:80", "http://192.168.1.131", 'http://192.168.1.131:3000'
|
||||||
, "http://192.168.1.127", 'http://192.168.1.127:3000'
|
, "http://192.168.1.127", 'http://192.168.1.127:3000'
|
||||||
, "http://192.168.1.70", 'http://192.168.1.70:3000'
|
, "http://192.168.1.70", 'http://192.168.1.70:3000'
|
||||||
|
@ -129,7 +129,9 @@ class ChatService:
|
|||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
system_prompt = (
|
system_prompt = (
|
||||||
"You can only answer questions about the provided context. If you know the answer but it is not based in the provided context, don't provide the answer, just state the answer is not in the context provided."
|
chat_engine_input.system_message.content
|
||||||
|
if chat_engine_input.system_message
|
||||||
|
else None
|
||||||
)
|
)
|
||||||
chat_history = (
|
chat_history = (
|
||||||
chat_engine_input.chat_history if chat_engine_input.chat_history else None
|
chat_engine_input.chat_history if chat_engine_input.chat_history else None
|
||||||
@ -163,8 +165,18 @@ class ChatService:
|
|||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
system_prompt = (
|
system_prompt = (
|
||||||
"You can only answer questions about the provided context. If you know the answer but it is not based in the provided context, don't provide the answer, just state the answer is not in the context provided."
|
"""
|
||||||
|
You should answer questions only in English or Nepali.
|
||||||
|
Responses should be based on the context documents provided
|
||||||
|
and should be relevant, informative, and easy to understand.
|
||||||
|
You should aim to deliver high-quality responses that are
|
||||||
|
respectful and helpful, using clear and concise language.
|
||||||
|
Avoid providing information outside of the context documents unless
|
||||||
|
it is necessary for clarity or completeness. Focus on providing
|
||||||
|
accurate and reliable answers based on the given context.
|
||||||
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
chat_history = (
|
chat_history = (
|
||||||
chat_engine_input.chat_history if chat_engine_input.chat_history else None
|
chat_engine_input.chat_history if chat_engine_input.chat_history else None
|
||||||
)
|
)
|
||||||
|
@ -26,7 +26,7 @@ def list_chat_histories(
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
chat_histories = crud.chat.get_chat_history(
|
chat_histories = crud.chat.get_chat_history(
|
||||||
db, skip=skip, limit=limit)
|
db, user_id=current_user.id, skip=skip, limit=limit)
|
||||||
return chat_histories
|
return chat_histories
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(traceback.format_exc())
|
print(traceback.format_exc())
|
||||||
|
@ -1,16 +1,15 @@
|
|||||||
import traceback
|
import traceback
|
||||||
from typing import Any, List, Optional
|
from typing import Any, List
|
||||||
|
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from pydantic.networks import EmailStr
|
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
from fastapi.encoders import jsonable_encoder
|
from fastapi.encoders import jsonable_encoder
|
||||||
from fastapi import APIRouter, Body, Depends, HTTPException, Security, status, Path, Request
|
from fastapi import APIRouter, Body, Depends, HTTPException, Security, status, Path, Request
|
||||||
|
|
||||||
from private_gpt.users.api import deps
|
from private_gpt.users.api import deps
|
||||||
from private_gpt.users.constants.role import Role
|
from private_gpt.users.constants.role import Role
|
||||||
from private_gpt.users.core.config import settings
|
|
||||||
from private_gpt.users import crud, models, schemas
|
from private_gpt.users import crud, models, schemas
|
||||||
|
from private_gpt.users.utils.utils import validate_password
|
||||||
from private_gpt.users.core.security import verify_password, get_password_hash
|
from private_gpt.users.core.security import verify_password, get_password_hash
|
||||||
|
|
||||||
router = APIRouter(prefix="/users", tags=["users"])
|
router = APIRouter(prefix="/users", tags=["users"])
|
||||||
@ -182,6 +181,7 @@ def change_password(
|
|||||||
"""
|
"""
|
||||||
Change current user's password.
|
Change current user's password.
|
||||||
"""
|
"""
|
||||||
|
validate_password(new_password)
|
||||||
if not verify_password(old_password, current_user.hashed_password):
|
if not verify_password(old_password, current_user.hashed_password):
|
||||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Old password is incorrect")
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Old password is incorrect")
|
||||||
|
|
||||||
|
@ -11,19 +11,25 @@ from private_gpt.users.schemas.chat import ChatHistoryCreate, ChatHistoryCreate,
|
|||||||
|
|
||||||
class CRUDChat(CRUDBase[ChatHistory, ChatHistoryCreate, ChatHistoryCreate]):
|
class CRUDChat(CRUDBase[ChatHistory, ChatHistoryCreate, ChatHistoryCreate]):
|
||||||
def get_by_id(self, db: Session, *, id: uuid.UUID) -> Optional[ChatHistory]:
|
def get_by_id(self, db: Session, *, id: uuid.UUID) -> Optional[ChatHistory]:
|
||||||
return db.query(self.model).filter(ChatHistory.conversation_id == id).first()
|
return (
|
||||||
|
db.query(self.model)
|
||||||
|
.filter(ChatHistory.conversation_id == id)
|
||||||
|
.order_by(asc(getattr(ChatHistory, 'created_at')))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
|
||||||
def get_chat_history(
|
def get_chat_history(
|
||||||
self, db: Session, *, skip: int = 0, limit: int = 100
|
self, db: Session, *,user_id:int, skip: int = 0, limit: int = 100
|
||||||
) -> List[ChatHistory]:
|
) -> List[ChatHistory]:
|
||||||
return (
|
return (
|
||||||
db.query(self.model)
|
db.query(self.model)
|
||||||
|
.filter(ChatHistory.user_id == user_id)
|
||||||
.order_by(desc(getattr(ChatHistory, 'created_at')))
|
.order_by(desc(getattr(ChatHistory, 'created_at')))
|
||||||
.offset(skip)
|
.offset(skip)
|
||||||
.limit(limit)
|
.limit(limit)
|
||||||
.all()
|
.all()
|
||||||
)
|
)
|
||||||
|
|
||||||
class CRUDChatItem(CRUDBase[ChatItem, ChatItemCreate, ChatItemUpdate]):
|
class CRUDChatItem(CRUDBase[ChatItem, ChatItemCreate, ChatItemUpdate]):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -5,11 +5,11 @@ from sqlalchemy.exc import IntegrityError
|
|||||||
from sqlalchemy.orm.util import object_mapper
|
from sqlalchemy.orm.util import object_mapper
|
||||||
|
|
||||||
from private_gpt.users.crud.base import CRUDBase
|
from private_gpt.users.crud.base import CRUDBase
|
||||||
from private_gpt.users.models.chat_history import ChatHistory
|
from private_gpt.users.models.chat import ChatHistory
|
||||||
from private_gpt.users.schemas.chat_history import ChatCreate, ChatUpdate
|
from private_gpt.users.schemas.chat import ChatHistoryCreate, ChatHistoryUpdate
|
||||||
|
|
||||||
|
|
||||||
class CRUDChat(CRUDBase[ChatHistory, ChatCreate, ChatUpdate]):
|
class CRUDChat(CRUDBase[ChatHistory, ChatHistoryCreate, ChatHistoryUpdate]):
|
||||||
def get_by_id(self, db: Session, *, id: int) -> Optional[ChatHistory]:
|
def get_by_id(self, db: Session, *, id: int) -> Optional[ChatHistory]:
|
||||||
return db.query(self.model).filter(ChatHistory.conversation_id == id).first()
|
return db.query(self.model).filter(ChatHistory.conversation_id == id).first()
|
||||||
|
|
||||||
@ -18,7 +18,7 @@ class CRUDChat(CRUDBase[ChatHistory, ChatCreate, ChatUpdate]):
|
|||||||
db: Session,
|
db: Session,
|
||||||
*,
|
*,
|
||||||
db_obj: ChatHistory,
|
db_obj: ChatHistory,
|
||||||
obj_in: Union[ChatUpdate, Dict[str, Any]]
|
obj_in: Union[ChatHistoryUpdate, Dict[str, Any]]
|
||||||
) -> ChatHistory:
|
) -> ChatHistory:
|
||||||
try:
|
try:
|
||||||
obj_data = object_mapper(db_obj).data
|
obj_data = object_mapper(db_obj).data
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from sqlalchemy.orm import relationship
|
from sqlalchemy.orm import relationship, Session
|
||||||
from sqlalchemy.dialects.postgresql import UUID
|
from sqlalchemy.dialects.postgresql import UUID
|
||||||
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text, Boolean, event, JSON
|
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text, Boolean, event, JSON
|
||||||
|
|
||||||
@ -48,6 +48,7 @@ class ChatItem(Base):
|
|||||||
__tablename__ = "chat_items"
|
__tablename__ = "chat_items"
|
||||||
|
|
||||||
id = Column(Integer, nullable=False, primary_key=True)
|
id = Column(Integer, nullable=False, primary_key=True)
|
||||||
|
index = Column(Integer, nullable=False)
|
||||||
sender = Column(String(225), nullable=False)
|
sender = Column(String(225), nullable=False)
|
||||||
content = Column(JSON, nullable=True)
|
content = Column(JSON, nullable=True)
|
||||||
created_at = Column(DateTime, default=datetime.now)
|
created_at = Column(DateTime, default=datetime.now)
|
||||||
@ -63,6 +64,23 @@ class ChatItem(Base):
|
|||||||
return f"<ChatItem {self.id!r}>"
|
return f"<ChatItem {self.id!r}>"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_next_index(db: Session, conversation_id: uuid.UUID) -> int:
|
||||||
|
"""Get the next index value for the given conversation_id."""
|
||||||
|
max_index = db.query(ChatItem).filter(ChatItem.conversation_id == conversation_id).order_by(ChatItem.index.desc()).first()
|
||||||
|
if max_index is None:
|
||||||
|
return 0
|
||||||
|
return max_index.index + 1
|
||||||
|
|
||||||
|
|
||||||
|
@event.listens_for(ChatItem, "before_insert")
|
||||||
|
def receive_before_insert(mapper, connection, target):
|
||||||
|
"""Set the index value before inserting a new ChatItem."""
|
||||||
|
if target.conversation_id:
|
||||||
|
session = Session.object_session(target)
|
||||||
|
target.index = get_next_index(session, target.conversation_id)
|
||||||
|
|
||||||
|
|
||||||
@event.listens_for(ChatHistory, "after_insert")
|
@event.listens_for(ChatHistory, "after_insert")
|
||||||
def receive_after_insert(mapper, connection, target):
|
def receive_after_insert(mapper, connection, target):
|
||||||
"""Update title after insertion to reflect the conversation_id"""
|
"""Update title after insertion to reflect the conversation_id"""
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
from fastapi_filter.contrib.sqlalchemy import Filter
|
from fastapi_filter.contrib.sqlalchemy import Filter
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from sqlalchemy.orm import relationship
|
from private_gpt.users.models.department import Department
|
||||||
|
from sqlalchemy.orm import relationship, Session
|
||||||
from sqlalchemy import Boolean, event, select, func, update
|
from sqlalchemy import Boolean, event, select, func, update
|
||||||
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
|
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
|
||||||
|
|
||||||
@ -58,37 +59,48 @@ class Document(Base):
|
|||||||
|
|
||||||
action_type = Column(Enum(MakerCheckerActionType), nullable=False,
|
action_type = Column(Enum(MakerCheckerActionType), nullable=False,
|
||||||
default=MakerCheckerActionType.INSERT) # 'insert' or 'update' or 'delete'
|
default=MakerCheckerActionType.INSERT) # 'insert' or 'update' or 'delete'
|
||||||
# 'pending', 'approved', or 'rejected'
|
|
||||||
status = Column(Enum(MakerCheckerStatus), nullable=False,
|
status = Column(Enum(MakerCheckerStatus), nullable=False,
|
||||||
default=MakerCheckerStatus.PENDING)
|
default=MakerCheckerStatus.PENDING) # 'pending', 'approved', or 'rejected'
|
||||||
|
|
||||||
verified_at = Column(DateTime, nullable=True)
|
verified_at = Column(DateTime, nullable=True)
|
||||||
verified_by = Column(Integer, ForeignKey("users.id"), nullable=True)
|
verified_by = Column(Integer, ForeignKey("users.id"), nullable=True)
|
||||||
|
|
||||||
|
|
||||||
departments = relationship(
|
departments = relationship(
|
||||||
"Department",
|
"Department",
|
||||||
secondary=document_department_association,
|
secondary=document_department_association,
|
||||||
back_populates="documents"
|
back_populates="documents"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# def get_associated_department(db: Session, document_id: int) -> list:
|
||||||
|
# print(db.query(document_department_association.c.department_id).all())
|
||||||
|
# print("DOcument:", document_id)
|
||||||
|
# associated_departments = db.query(document_department_association).filter(
|
||||||
|
# document_department_association.c.document_id == document_id
|
||||||
|
# ).all()
|
||||||
|
# print("HELLO",associated_departments)
|
||||||
|
# associated_departments_ids = [department.id for department in associated_departments]
|
||||||
|
|
||||||
|
# return associated_departments_ids
|
||||||
|
|
||||||
|
|
||||||
# @event.listens_for(Document, 'after_insert')
|
# @event.listens_for(Document, 'after_insert')
|
||||||
# @event.listens_for(Document, 'after_delete')
|
# @event.listens_for(Document, 'after_delete')
|
||||||
# def update_total_documents(mapper, connection, target):
|
# def update_total_documents(mapper, connection, target):
|
||||||
# total_documents = connection.execute(
|
# session = Session(connection)
|
||||||
# func.count().select().select_from(document_department_association).where(
|
# print("Session object: ", session)
|
||||||
# document_department_association.c.document_id == target.id)
|
# # Get the department IDs associated with the target document
|
||||||
# ).scalar()
|
# associated_department_ids = get_associated_department(session, target.id)
|
||||||
|
# print('Department: ', associated_department_ids)
|
||||||
# department_ids = [assoc.department_id for assoc in connection.execute(
|
|
||||||
# select([document_department_association.c.department_id]).where(
|
|
||||||
# document_department_association.c.document_id == target.id)
|
|
||||||
# )]
|
|
||||||
|
|
||||||
# # Update total_documents for each associated department
|
# # Update total_documents for each associated department
|
||||||
# for department_id in department_ids:
|
# for department_id in associated_department_ids:
|
||||||
# connection.execute(
|
# department = session.query(Department).get(department_id)
|
||||||
# update(Department).values(total_documents=total_documents).where(
|
# department.total_documents = session.query(func.count()).select_from(document_department_association).filter(
|
||||||
# Department.id == department_id)
|
# document_department_association.document_id
|
||||||
# )
|
# ).scalar()
|
||||||
|
|
||||||
|
# session.commit()
|
||||||
|
# session.close()
|
@ -8,7 +8,7 @@ from sqlalchemy import (
|
|||||||
ForeignKey,
|
ForeignKey,
|
||||||
DateTime
|
DateTime
|
||||||
)
|
)
|
||||||
from sqlalchemy.orm import relationship
|
from sqlalchemy.orm import relationship, Session
|
||||||
from sqlalchemy import event, func, select, update
|
from sqlalchemy import event, func, select, update
|
||||||
|
|
||||||
from private_gpt.users.db.base_class import Base
|
from private_gpt.users.db.base_class import Base
|
||||||
@ -60,21 +60,26 @@ class User(Base):
|
|||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
"""Returns string representation of model instance"""
|
"""Returns string representation of model instance"""
|
||||||
return "<User {username!r}>".format(username=self.username)
|
return "<User {username!r}>".format(username=self.username)
|
||||||
|
|
||||||
|
|
||||||
|
@event.listens_for(User, 'after_insert')
|
||||||
|
@event.listens_for(User, 'after_delete')
|
||||||
|
def update_total_users(mapper, connection, target):
|
||||||
|
session = Session.object_session(target)
|
||||||
|
department_id = target.department_id
|
||||||
|
|
||||||
# Event listeners
|
total_users_subquery = (
|
||||||
# @event.listens_for(User, 'after_insert')
|
select([func.count(User.id).label('total_users')])
|
||||||
# @event.listens_for(User, 'after_delete')
|
.where(User.department_id == department_id)
|
||||||
# def update_total_users(mapper, connection, target):
|
.scalar_subquery()
|
||||||
# department_id = target.department_id
|
)
|
||||||
# total_users = connection.execute(
|
update_stmt = (
|
||||||
# select([func.count()]).select_from(User).where(
|
update(Department)
|
||||||
# User.department_id == department_id)
|
.values(total_users=total_users_subquery)
|
||||||
# ).scalar()
|
.where(Department.id == department_id)
|
||||||
# connection.execute(
|
)
|
||||||
# update(Department).values(total_users=total_users).where(
|
session.execute(update_stmt)
|
||||||
# Department.id == department_id)
|
session.commit()
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
@event.listens_for(User, 'before_insert')
|
@event.listens_for(User, 'before_insert')
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
|
import re
|
||||||
import smtplib
|
import smtplib
|
||||||
from email.mime.text import MIMEText
|
from email.mime.text import MIMEText
|
||||||
from email.mime.multipart import MIMEMultipart
|
from email.mime.multipart import MIMEMultipart
|
||||||
from private_gpt.users.core.config import settings
|
from private_gpt.users.core.config import settings
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException, status
|
||||||
|
|
||||||
def send_registration_email(fullname: str, email: str, random_password: str) -> None:
|
def send_registration_email(fullname: str, email: str, random_password: str) -> None:
|
||||||
"""
|
"""
|
||||||
@ -46,4 +47,42 @@ def send_registration_email(fullname: str, email: str, random_password: str) ->
|
|||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=500,
|
status_code=500,
|
||||||
detail=f"Unable to send email."
|
detail=f"Unable to send email."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_password(password: str) -> None:
|
||||||
|
"""
|
||||||
|
Validate the password according to the defined criteria.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
password (str): The new password to validate.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: If the password does not meet the criteria.
|
||||||
|
"""
|
||||||
|
# Define the password validation criteria
|
||||||
|
min_length = 6
|
||||||
|
require_upper = re.compile(r'[A-Z]')
|
||||||
|
require_lower = re.compile(r'[a-z]')
|
||||||
|
require_digit = re.compile(r'\d')
|
||||||
|
require_special = re.compile(r'[!@#$%^&*()_+=-]') # Add special characters as needed
|
||||||
|
|
||||||
|
# Check password length
|
||||||
|
if len(password) < min_length:
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Password must be at least {min_length} characters long.")
|
||||||
|
|
||||||
|
# Check for uppercase letter
|
||||||
|
if not require_upper.search(password):
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Password must contain at least one uppercase letter.")
|
||||||
|
|
||||||
|
# Check for lowercase letter
|
||||||
|
if not require_lower.search(password):
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Password must contain at least one lowercase letter.")
|
||||||
|
|
||||||
|
# Check for digit
|
||||||
|
if not require_digit.search(password):
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Password must contain at least one digit.")
|
||||||
|
|
||||||
|
# Check for special character
|
||||||
|
if not require_special.search(password):
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Password must contain at least one special character (e.g., !@#$%^&*()_+=-).")
|
||||||
|
Loading…
Reference in New Issue
Block a user