Skip to content

Commit

Permalink
Basic Adapting to 2FA use
Browse files Browse the repository at this point in the history
  • Loading branch information
TanookiVerde committed Aug 30, 2024
1 parent e05305c commit 095bcfa
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 11 deletions.
1 change: 1 addition & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class User(Model):
cpf = fields.CharField(max_length=11, unique=True, null=True, validators=[CPFValidator()])
email = fields.CharField(max_length=255, unique=True)
password = fields.CharField(max_length=255)
#secret_key = fields.CharField(max_length=255, null=True)
is_active = fields.BooleanField(default=True)
is_superuser = fields.BooleanField(default=False)
user_class = fields.CharEnumField(enum_type=UserClassEnum, null=True, default=UserClassEnum.PIPELINE_USER)
Expand Down
52 changes: 41 additions & 11 deletions app/routers/auth.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# -*- coding: utf-8 -*-
import io
from datetime import timedelta
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.responses import StreamingResponse

from app import config
from app.models import User
from app.types.pydantic_models import Token
from app.utils import authenticate_user, create_access_token
from app.security import TwoFactorAuth, get_two_factor_auth


router = APIRouter(prefix="/auth", tags=["Autenticação"])
Expand All @@ -23,21 +26,48 @@ async def login_for_access_token(

if not user:
raise HTTPException(
status_code = status.HTTP_401_UNAUTHORIZED,
detail = "Incorrect username or password",
headers = {"WWW-Authenticate": "Bearer"},
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

access_token_expires = timedelta(
minutes = config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES
)
access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)

access_token = create_access_token(
data = {"sub": user.username},
expires_delta = access_token_expires
data={"sub": user.username}, expires_delta=access_token_expires
)

return {"access_token": access_token, "token_type": "bearer"}


@router.post("/enable-2fa/{user_id}")
async def enable_2fa(
two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth)
):
return {
"secret_key": two_factor_auth.secret_key
}


@router.get("/generate-qr/{user_id}")
async def generate_qr(
two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth)
):
qr_code = two_factor_auth.qr_code
if qr_code is None:
raise HTTPException(status_code=404, detail="User not found")

return StreamingResponse(io.BytesIO(qr_code), media_type="image/png")


@router.post("/verify-totp/{user_id}")
async def verify_totp(
totp_code: str,
two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth),
):
is_valid = two_factor_auth.verify_totp_code(totp_code)
if not is_valid:
raise HTTPException(status_code=400, detail="Code invalid")
return {
"access_token": access_token,
"token_type": "bearer"
}
"valid": is_valid
}
78 changes: 78 additions & 0 deletions app/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
import base64
import io
import secrets
from typing import Optional

import qrcode
from fastapi import Depends
from pyotp import TOTP

from app.models import User


class TwoFactorAuth:

def __init__(self, user_id: str, secret_key: str):
self._user_id = user_id
self._secret_key = secret_key
self._totp = TOTP(self._secret_key)
self._qr_cache: Optional[bytes] = None

@property
def totp(self) -> TOTP:
return self._totp

@property
def secret_key(self) -> str:
return self._secret_key

@staticmethod
def _generate_secret_key() -> str:
secret_bytes = secrets.token_bytes(20)
secret_key = base64.b32encode(secret_bytes).decode("utf-8")
return secret_key

@staticmethod
async def get_or_create_secret_key(user_id: str) -> str:
user = await User.get_or_none(id=user_id)

if not user:
raise ValueError(f"User with id {user_id} not found")

# If User doesn't have a secret_key, create one
if not user.secret_key:
secret_key = TwoFactorAuth._generate_secret_key()
user.secret_key = secret_key
user.save()

return user.secret_key

def _create_qr_code(self) -> bytes:
uri = self.totp.provisioning_uri(
name=self._user_id,
issuer_name="2FA",
)
img = qrcode.make(uri)
img_byte_array = io.BytesIO()
img.save(img_byte_array, format="PNG")
img_byte_array.seek(0)
return img_byte_array.getvalue()

@property
def qr_code(self) -> bytes:
if self._qr_cache is None:
self._qr_cache = self._create_qr_code()
return self._qr_cache

def verify_totp_code(self, totp_code: str) -> bool:
return self.totp.verify(totp_code)


async def get_two_factor_auth(
user_id: str
) -> TwoFactorAuth:
secret_key = await TwoFactorAuth.get_or_create_secret_key(
user_id
)
return TwoFactorAuth(user_id, secret_key)

0 comments on commit 095bcfa

Please sign in to comment.