From 095bcfa82314cc9b2340cc279209462a3e1b9326 Mon Sep 17 00:00:00 2001 From: Pedro Nascimento Date: Fri, 30 Aug 2024 14:27:46 -0300 Subject: [PATCH] Basic Adapting to 2FA use --- app/models.py | 1 + app/routers/auth.py | 52 +++++++++++++++++++++++------- app/security.py | 78 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 120 insertions(+), 11 deletions(-) create mode 100644 app/security.py diff --git a/app/models.py b/app/models.py index 56dd11f..a20df6a 100644 --- a/app/models.py +++ b/app/models.py @@ -273,6 +273,7 @@ class User(Model): cpf = fields.CharField(max_length=11, unique=True, null=True, validators=[CPFValidator()]) email = fields.CharField(max_length=255, unique=True) password = fields.CharField(max_length=255) + #secret_key = fields.CharField(max_length=255, null=True) is_active = fields.BooleanField(default=True) is_superuser = fields.BooleanField(default=False) user_class = fields.CharEnumField(enum_type=UserClassEnum, null=True, default=UserClassEnum.PIPELINE_USER) diff --git a/app/routers/auth.py b/app/routers/auth.py index 3e23581..8c81d2c 100644 --- a/app/routers/auth.py +++ b/app/routers/auth.py @@ -1,14 +1,17 @@ # -*- coding: utf-8 -*- +import io from datetime import timedelta from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm +from fastapi.responses import StreamingResponse from app import config from app.models import User from app.types.pydantic_models import Token from app.utils import authenticate_user, create_access_token +from app.security import TwoFactorAuth, get_two_factor_auth router = APIRouter(prefix="/auth", tags=["AutenticaĆ§Ć£o"]) @@ -23,21 +26,48 @@ async def login_for_access_token( if not user: raise HTTPException( - status_code = status.HTTP_401_UNAUTHORIZED, - detail = "Incorrect username or password", - headers = {"WWW-Authenticate": "Bearer"}, + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, ) - access_token_expires = timedelta( - minutes = config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES - ) + access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( - data = {"sub": user.username}, - expires_delta = access_token_expires + data={"sub": user.username}, expires_delta=access_token_expires ) + return {"access_token": access_token, "token_type": "bearer"} + + +@router.post("/enable-2fa/{user_id}") +async def enable_2fa( + two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth) +): + return { + "secret_key": two_factor_auth.secret_key + } + + +@router.get("/generate-qr/{user_id}") +async def generate_qr( + two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth) +): + qr_code = two_factor_auth.qr_code + if qr_code is None: + raise HTTPException(status_code=404, detail="User not found") + + return StreamingResponse(io.BytesIO(qr_code), media_type="image/png") + + +@router.post("/verify-totp/{user_id}") +async def verify_totp( + totp_code: str, + two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth), +): + is_valid = two_factor_auth.verify_totp_code(totp_code) + if not is_valid: + raise HTTPException(status_code=400, detail="Code invalid") return { - "access_token": access_token, - "token_type": "bearer" - } \ No newline at end of file + "valid": is_valid + } diff --git a/app/security.py b/app/security.py new file mode 100644 index 0000000..5688285 --- /dev/null +++ b/app/security.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +import base64 +import io +import secrets +from typing import Optional + +import qrcode +from fastapi import Depends +from pyotp import TOTP + +from app.models import User + + +class TwoFactorAuth: + + def __init__(self, user_id: str, secret_key: str): + self._user_id = user_id + self._secret_key = secret_key + self._totp = TOTP(self._secret_key) + self._qr_cache: Optional[bytes] = None + + @property + def totp(self) -> TOTP: + return self._totp + + @property + def secret_key(self) -> str: + return self._secret_key + + @staticmethod + def _generate_secret_key() -> str: + secret_bytes = secrets.token_bytes(20) + secret_key = base64.b32encode(secret_bytes).decode("utf-8") + return secret_key + + @staticmethod + async def get_or_create_secret_key(user_id: str) -> str: + user = await User.get_or_none(id=user_id) + + if not user: + raise ValueError(f"User with id {user_id} not found") + + # If User doesn't have a secret_key, create one + if not user.secret_key: + secret_key = TwoFactorAuth._generate_secret_key() + user.secret_key = secret_key + user.save() + + return user.secret_key + + def _create_qr_code(self) -> bytes: + uri = self.totp.provisioning_uri( + name=self._user_id, + issuer_name="2FA", + ) + img = qrcode.make(uri) + img_byte_array = io.BytesIO() + img.save(img_byte_array, format="PNG") + img_byte_array.seek(0) + return img_byte_array.getvalue() + + @property + def qr_code(self) -> bytes: + if self._qr_cache is None: + self._qr_cache = self._create_qr_code() + return self._qr_cache + + def verify_totp_code(self, totp_code: str) -> bool: + return self.totp.verify(totp_code) + + +async def get_two_factor_auth( + user_id: str +) -> TwoFactorAuth: + secret_key = await TwoFactorAuth.get_or_create_secret_key( + user_id + ) + return TwoFactorAuth(user_id, secret_key)