From 603e6a417ed671de302134cc5136a048cc77a809 Mon Sep 17 00:00:00 2001 From: Pedro Nascimento Date: Fri, 30 Aug 2024 15:57:30 -0300 Subject: [PATCH] Implemented 2FA new login endpoint --- app/models.py | 2 +- app/routers/auth.py | 91 +++++++++++++++++----- app/security.py | 6 +- migrations/app/27_20240830145823_update.py | 12 +++ 4 files changed, 87 insertions(+), 24 deletions(-) create mode 100644 migrations/app/27_20240830145823_update.py diff --git a/app/models.py b/app/models.py index a20df6a..8ccc082 100644 --- a/app/models.py +++ b/app/models.py @@ -273,7 +273,7 @@ class User(Model): cpf = fields.CharField(max_length=11, unique=True, null=True, validators=[CPFValidator()]) email = fields.CharField(max_length=255, unique=True) password = fields.CharField(max_length=255) - #secret_key = fields.CharField(max_length=255, null=True) + secret_key = fields.CharField(max_length=255, null=True) is_active = fields.BooleanField(default=True) is_superuser = fields.BooleanField(default=False) user_class = fields.CharEnumField(enum_type=UserClassEnum, null=True, default=UserClassEnum.PIPELINE_USER) diff --git a/app/routers/auth.py b/app/routers/auth.py index 8c81d2c..909d139 100644 --- a/app/routers/auth.py +++ b/app/routers/auth.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import io from datetime import timedelta -from typing import Annotated +from typing import Annotated, Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm @@ -11,7 +11,10 @@ from app.models import User from app.types.pydantic_models import Token from app.utils import authenticate_user, create_access_token -from app.security import TwoFactorAuth, get_two_factor_auth +from app.security import TwoFactorAuth +from app.dependencies import ( + get_current_frontend_user +) router = APIRouter(prefix="/auth", tags=["AutenticaĆ§Ć£o"]) @@ -39,35 +42,83 @@ async def login_for_access_token( return {"access_token": access_token, "token_type": "bearer"} +@router.post("/2fa/login") +async def login( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + totp_code: Optional[str] = None, +) -> Token: + + user: User = await authenticate_user(form_data.username, form_data.password) + + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + + # IF 2FA is not Enabled + is_2fa_disabled = user.secret_key is None + is_trying_2fa = totp_code is not None + + # If 2FA is enabled and user is initializing the session + if not is_2fa_disabled and not is_trying_2fa: + # User must provide a OTP + return { + "2fa_enabled": True + } + + # If 2FA is enabled and user is trying to login with OTP + if is_trying_2fa and not is_2fa_disabled: + secret_key = await TwoFactorAuth.get_or_create_secret_key(user.id) + two_factor_auth = TwoFactorAuth(user.id, secret_key) + + is_valid = two_factor_auth.verify_totp_code(totp_code) + if not is_valid: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect OTP", + headers={"WWW-Authenticate": "Bearer"}, + ) + + # If 2FA is disabled + if is_2fa_disabled: + is_valid = True + + # Generate Token + access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) -@router.post("/enable-2fa/{user_id}") + return { + "2fa_enabled": False, + "access_token": access_token, + "token_type": "bearer" + } + + +@router.post("/2fa/enable/") async def enable_2fa( - two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth) + current_user: Annotated[User, Depends(get_current_frontend_user)], ): + secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id) + two_factor_auth = TwoFactorAuth(current_user.id, secret_key) + return { "secret_key": two_factor_auth.secret_key } -@router.get("/generate-qr/{user_id}") +@router.get("/2fa/generate-qr/") async def generate_qr( - two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth) + current_user: Annotated[User, Depends(get_current_frontend_user)], ): + secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id) + two_factor_auth = TwoFactorAuth(current_user.id, secret_key) + qr_code = two_factor_auth.qr_code if qr_code is None: raise HTTPException(status_code=404, detail="User not found") - return StreamingResponse(io.BytesIO(qr_code), media_type="image/png") - - -@router.post("/verify-totp/{user_id}") -async def verify_totp( - totp_code: str, - two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth), -): - is_valid = two_factor_auth.verify_totp_code(totp_code) - if not is_valid: - raise HTTPException(status_code=400, detail="Code invalid") - return { - "valid": is_valid - } + return StreamingResponse(io.BytesIO(qr_code), media_type="image/png") \ No newline at end of file diff --git a/app/security.py b/app/security.py index 5688285..84fd716 100644 --- a/app/security.py +++ b/app/security.py @@ -44,18 +44,18 @@ async def get_or_create_secret_key(user_id: str) -> str: if not user.secret_key: secret_key = TwoFactorAuth._generate_secret_key() user.secret_key = secret_key - user.save() + await user.save() return user.secret_key def _create_qr_code(self) -> bytes: uri = self.totp.provisioning_uri( - name=self._user_id, + name=str(self._user_id), issuer_name="2FA", ) img = qrcode.make(uri) img_byte_array = io.BytesIO() - img.save(img_byte_array, format="PNG") + img.save(img_byte_array) img_byte_array.seek(0) return img_byte_array.getvalue() diff --git a/migrations/app/27_20240830145823_update.py b/migrations/app/27_20240830145823_update.py new file mode 100644 index 0000000..f17dac5 --- /dev/null +++ b/migrations/app/27_20240830145823_update.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +from tortoise import BaseDBAsyncClient + + +async def upgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "user" ADD "secret_key" VARCHAR(255);""" + + +async def downgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "user" DROP COLUMN "secret_key";"""