Added change password

This commit is contained in:
Saurab-Shrestha 2024-01-14 15:19:44 +05:45
parent e5edaadf24
commit c66feb4187
7 changed files with 104 additions and 62 deletions

View File

@ -1,9 +1,12 @@
import logging
from private_gpt.users.core.config import settings
from private_gpt.users.constants.role import Role
from typing import Union, Any, Generator
from datetime import datetime
from private_gpt.users import crud, models, schemas
from private_gpt.users.db.session import SessionLocal
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from private_gpt.users.core.security import (
ALGORITHM,
JWT_SECRET_KEY
@ -14,11 +17,21 @@ from pydantic import ValidationError
from private_gpt.users.schemas.token import TokenPayload
from sqlalchemy.orm import Session
reuseable_oauth = OAuth2PasswordBearer(
tokenUrl="/login",
scheme_name="JWT"
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login",
scopes={
Role.GUEST["name"]: Role.GUEST["description"],
# Role.ACCOUNT_ADMIN["name"]: Role.ACCOUNT_ADMIN["description"],
# Role.ACCOUNT_MANAGER["name"]: Role.ACCOUNT_MANAGER["description"],
Role.ADMIN["name"]: Role.ADMIN["description"],
Role.SUPER_ADMIN["name"]: Role.SUPER_ADMIN["description"],
},
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_db() -> Generator:
try:
db = SessionLocal()
@ -27,39 +40,55 @@ def get_db() -> Generator:
db.close()
async def get_current_user(
security_scopes: SecurityScopes,
db: Session = Depends(get_db),
token: str = Depends(reuseable_oauth)
token: str = Depends(reusable_oauth2)
) -> models.User:
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=401,
detail="os",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(
token, JWT_SECRET_KEY, algorithms=[ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp) < datetime.now():
raise HTTPException(
status_code = status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
except(jwt.JWTError, ValidationError):
if payload.get("id") is None:
raise credentials_exception
print(payload)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
logger.error("Error Decoding Token", exc_info=True)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user: Union[dict[str, Any], None] = crud.user.get(db, id=token_data.id)
if user is None:
user = crud.user.get(db, id=token_data.id)
if not user:
raise credentials_exception
if security_scopes.scopes and not token_data.role:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Could not find user",
status_code=401,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
if (
security_scopes.scopes
and token_data.role not in security_scopes.scopes
):
raise HTTPException(
status_code=401,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: models.User = Security(get_current_user, scopes=[],),
) -> models.User:

View File

@ -64,7 +64,8 @@ def login_access_token(
),
"refresh_token": security.create_refresh_token(
token_payload, expires_delta=refresh_token_expires
)
),
"token_type": "bearer",
}
@ -95,7 +96,7 @@ def register(
user = crud.user.create(db, obj_in=user_in)
# get role
role = crud.role.get_by_name(db, name=Role.GUEST["name"])
role = crud.role.get_by_name(db, name=Role.SUPER_ADMIN["name"])
print("ROLE:", role)
# assign user_role
user_role_in = schemas.UserRoleCreate(
@ -127,5 +128,6 @@ def register(
),
"refresh_token": security.create_refresh_token(
token_payload, expires_delta=refresh_token_expires
)
),
"token_type": "bearer",
}

View File

@ -5,8 +5,10 @@ from private_gpt.users.api import deps
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
router = APIRouter(prefix='/roles', tags=['roles'])
@router.get("/", response_model=List[schemas.Role])
def get_roles(
db: Session = Depends(deps.get_db), skip: int = 0, limit: int = 100,

View File

@ -14,7 +14,7 @@ def assign_user_role(
*,
db: Session = Depends(deps.get_db),
user_role_in: schemas.UserRoleCreate,
current_user: models.User = Depends(deps.get_current_active_user),
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Assign a role to a user after creation of a user
@ -35,14 +35,13 @@ def update_user_role(
db: Session = Depends(deps.get_db),
user_id: int,
user_role_in: schemas.UserRoleUpdate,
# current_user: models.User = Security(
# deps.get_current_active_user,
# scopes=[
# Role.ADMIN["name"],
# Role.SUPER_ADMIN["name"],
# Role.ACCOUNT_ADMIN["name"],
# ],
# ),
current_user: models.User = Security(
deps.get_current_user,
scopes=[
Role.ADMIN["name"],
Role.SUPER_ADMIN["name"],
],
),
) -> Any:
"""
Update a users role.

View File

@ -1,9 +1,9 @@
from typing import Any, List
from private_gpt.users import crud, models, schemas
from private_gpt.users.api import deps
from private_gpt.users.constants.role import Role
from private_gpt.users.core.config import settings
from private_gpt.users.core.security import verify_password, get_password_hash
from fastapi import APIRouter, Body, Depends, HTTPException, Security
from fastapi.encoders import jsonable_encoder
from pydantic.networks import EmailStr
@ -17,10 +17,10 @@ def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
# current_user: models.User = Security(
# deps.get_current_active_user,
# scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
# ),
current_user: models.User = Security(
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
),
) -> Any:
"""
Retrieve all users.
@ -35,7 +35,7 @@ def create_user(
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
current_user: models.User = Security(
deps.get_current_active_user,
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
),
) -> Any:
@ -57,21 +57,20 @@ def update_user_me(
*,
db: Session = Depends(deps.get_db),
fullname: str = Body(None),
phone_number: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
print("Current user data: ", current_user_data)
user_in = schemas.UserUpdate(**current_user_data)
if phone_number is not None:
user_in.phone_number = phone_number
if fullname is not None:
user_in.fullname = fullname
if email is not None:
user_in.email = email
print(f"DB obj: {current_user}\n obj IN : {user_in}")
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@ -79,7 +78,7 @@ def update_user_me(
@router.get("/me", response_model=schemas.User)
def read_user_me(
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Get current user.
@ -95,25 +94,33 @@ def read_user_me(
fullname=current_user.fullname,
created_at=current_user.created_at,
updated_at=current_user.updated_at,
role=role,
last_login = current_user.last_login,
role=role
)
return user_data
@router.get("/me/change-password", response_model=schemas.User)
def read_user_me(
@router.patch("/me/change-password", response_model=schemas.User)
def change_password(
*,
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
old_password: str = Body(None),
new_password: str = Body(None),
current_user: models.User = Depends(deps.get_current_user),
old_password: str = Body(..., embed=True),
new_password: str = Body(..., embed=True),
) -> Any:
"""
Get current user.
Change current user's password.
"""
if not current_user.user_role:
role = None
else:
role = current_user.user_role.role.name
# Verify the old password
if not verify_password(old_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Old password is incorrect")
# Change the password
new_password_hashed = get_password_hash(new_password)
current_user.hashed_password = new_password_hashed
db.commit()
role = current_user.user_role.role.name if current_user.user_role else None
user_data = schemas.User(
id=current_user.id,
email=current_user.email,
@ -121,15 +128,17 @@ def read_user_me(
fullname=current_user.fullname,
created_at=current_user.created_at,
updated_at=current_user.updated_at,
last_login=current_user.last_login,
role=role,
)
return user_data
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Security(
deps.get_current_active_user,
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
),
db: Session = Depends(deps.get_db),
@ -137,6 +146,8 @@ def read_user_by_id(
"""
Get a specific user by id.
"""
if user_id is None:
return "User id is not given."
user = crud.user.get(db, id=user_id)
return user
@ -148,7 +159,7 @@ def update_user(
user_id: int,
user_in: schemas.UserUpdate,
current_user: models.User = Security(
deps.get_current_active_user,
deps.get_current_user,
scopes=[Role.ADMIN["name"], Role.SUPER_ADMIN["name"]],
),
) -> Any:

View File

@ -15,7 +15,7 @@ SQLALCHEMY_DATABASE_URI = "postgresql+psycopg2://{username}:{password}@{host}:{p
class Settings(BaseSettings):
PROJECT_NAME: str = "AUTHENTICATION AND AUTHORIZATION"
API_V1_STR: str = "/api/v1"
API_V1_STR: str = "/v1"
SECRET_KEY: str
ACCESS_TOKEN_EXPIRE_MINUTES: int
REFRESH_TOKEN_EXPIRE_MINUTES: int

View File

@ -31,15 +31,14 @@ class UserLoginSchema(BaseModel):
class UserSchema(UserBaseSchema):
id: int
user_role: Optional[UserRole]
last_login: Optional[datetime]
user_role: Optional[UserRole] = None
last_login: Optional[datetime] = None
created_at: datetime
updated_at: datetime
is_active: bool = Field(default=False)
class Config:
orm_mode = True
json_exclude = {'user_role'}
# Additional properties to return via API
class User(UserSchema):