Skip to content

Commit

Permalink
refactor: 2FA logic improving use flow
Browse files Browse the repository at this point in the history
  • Loading branch information
TanookiVerde committed Aug 30, 2024
1 parent 603e6a4 commit 8d064e3
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 60 deletions.
5 changes: 4 additions & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,14 @@ class User(Model):
cpf = fields.CharField(max_length=11, unique=True, null=True, validators=[CPFValidator()])
email = fields.CharField(max_length=255, unique=True)
password = fields.CharField(max_length=255)
secret_key = fields.CharField(max_length=255, null=True)
is_active = fields.BooleanField(default=True)
is_superuser = fields.BooleanField(default=False)
user_class = fields.CharEnumField(enum_type=UserClassEnum, null=True, default=UserClassEnum.PIPELINE_USER)
data_source = fields.ForeignKeyField("app.DataSource", related_name="users", null=True)
# 2FA
secret_key = fields.CharField(max_length=255, null=True)
is_2fa_required = fields.BooleanField(default=False)
is_2fa_activated = fields.BooleanField(default=False)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)

Expand Down
127 changes: 69 additions & 58 deletions app/routers/auth.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# -*- coding: utf-8 -*-
import io
from datetime import timedelta
from typing import Annotated, Optional

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.responses import StreamingResponse

from app import config
from app.models import User
from app.types.pydantic_models import Token
from app.utils import authenticate_user, create_access_token
from app.utils import authenticate_user, generate_user_token
from app.security import TwoFactorAuth
from app.dependencies import (
get_current_frontend_user
Expand All @@ -21,83 +19,69 @@


@router.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
async def login_without_2fa(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:

user: User = await authenticate_user(form_data.username, form_data.password)

user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
if user.is_2fa_required:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="2FA required. Use the /2fa/login/ endpoint",
headers={"WWW-Authenticate": "Bearer"},
)

access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {
"access_token": generate_user_token(user),
"token_type": "bearer"
}

return {"access_token": access_token, "token_type": "bearer"}

@router.post("/2fa/login")
async def login(
@router.post("/2fa/login/")
async def login_with_2fa(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
totp_code: Optional[str] = None,
totp_code: str,
) -> Token:

user: User = await authenticate_user(form_data.username, form_data.password)

user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

# IF 2FA is not Enabled
is_2fa_disabled = user.secret_key is None
is_trying_2fa = totp_code is not None

# If 2FA is enabled and user is initializing the session
if not is_2fa_disabled and not is_trying_2fa:
# User must provide a OTP
return {
"2fa_enabled": True
}

# If 2FA is enabled and user is trying to login with OTP
if is_trying_2fa and not is_2fa_disabled:
secret_key = await TwoFactorAuth.get_or_create_secret_key(user.id)
two_factor_auth = TwoFactorAuth(user.id, secret_key)

is_valid = two_factor_auth.verify_totp_code(totp_code)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect OTP",
headers={"WWW-Authenticate": "Bearer"},
)

# If 2FA is disabled
if is_2fa_disabled:
is_valid = True

# Generate Token
access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# Caso 1: Usuário não registrou 2FA e está tentando logar
if user.is_2fa_required and not user.is_2fa_activated:
raise HTTPException(
status_code=status.HTTP_412_PRECONDITION_FAILED,
detail="2FA not activated. Use the /2fa/enable/ endpoint",
headers={"WWW-Authenticate": "Bearer"},
)

# Caso 2: Usuário registrou 2FA e está tentando logar
secret_key = await TwoFactorAuth.get_or_create_secret_key(user.id)
two_factor_auth = TwoFactorAuth(user.id, secret_key)

is_valid = two_factor_auth.verify_totp_code(totp_code)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect OTP",
headers={"WWW-Authenticate": "Bearer"},
)
return {
"2fa_enabled": False,
"access_token": access_token,
"token_type": "bearer"
"access_token": generate_user_token(user),
"token_type": "bearer",
}


@router.post("/2fa/enable/")
async def enable_2fa(
current_user: Annotated[User, Depends(get_current_frontend_user)],
Expand All @@ -110,15 +94,42 @@ async def enable_2fa(
}


@router.get("/2fa/generate-qr/")
async def generate_qr(
current_user: Annotated[User, Depends(get_current_frontend_user)],
@router.get("/2fa/activate/generate-qrcode/")
async def generate_qrcode(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
):
current_user = await authenticate_user(form_data.username, form_data.password)
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id)
two_factor_auth = TwoFactorAuth(current_user.id, secret_key)

qr_code = two_factor_auth.qr_code
if qr_code is None:
raise HTTPException(status_code=404, detail="User not found")

return StreamingResponse(io.BytesIO(qr_code), media_type="image/png")
return StreamingResponse(io.BytesIO(qr_code), media_type="image/png")


@router.post('/2fa/activate/verify-code/')
async def verify_code(
totp_code: str,
current_user: Annotated[User, Depends(get_current_frontend_user)],
):
secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id)
two_factor_auth = TwoFactorAuth(current_user.id, secret_key)

is_valid_totp = two_factor_auth.verify_totp_code(totp_code)

if is_valid_totp:
current_user.is_2fa_activated = True
await current_user.save()

return {
'success': is_valid_totp
}
2 changes: 1 addition & 1 deletion app/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ async def get_two_factor_auth(
secret_key = await TwoFactorAuth.get_or_create_secret_key(
user_id
)
return TwoFactorAuth(user_id, secret_key)
return TwoFactorAuth(user_id, secret_key)
9 changes: 9 additions & 0 deletions app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def generate_user_token(user: User) -> str:
access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)

access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)

return access_token

async def authenticate_user(username: str, password: str) -> User:
"""Authenticate a user.
Expand Down
12 changes: 12 additions & 0 deletions migrations/app/28_20240830163720_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
from tortoise import BaseDBAsyncClient


async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" ADD "is_2fa_enabled" BOOL NOT NULL DEFAULT False;"""


async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" DROP COLUMN "is_2fa_enabled";"""
12 changes: 12 additions & 0 deletions migrations/app/29_20240830164032_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
from tortoise import BaseDBAsyncClient


async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" ADD "is_2fa_active" BOOL NOT NULL DEFAULT False;"""


async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" DROP COLUMN "is_2fa_active";"""
16 changes: 16 additions & 0 deletions migrations/app/30_20240830164307_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
from tortoise import BaseDBAsyncClient


async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" RENAME COLUMN "is_2fa_enabled" TO "is_2fa_required";
ALTER TABLE "user" RENAME COLUMN "is_2fa_active" TO "is_2fa_activated";"""


async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" RENAME COLUMN "is_2fa_required" TO "is_2fa_active";
ALTER TABLE "user" RENAME COLUMN "is_2fa_activated" TO "is_2fa_active";
ALTER TABLE "user" RENAME COLUMN "is_2fa_required" TO "is_2fa_enabled";
ALTER TABLE "user" RENAME COLUMN "is_2fa_activated" TO "is_2fa_enabled";"""

0 comments on commit 8d064e3

Please sign in to comment.