Skip to content

Commit

Permalink
Implemented 2FA new login endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
TanookiVerde committed Aug 30, 2024
1 parent 90d764f commit 603e6a4
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 24 deletions.
2 changes: 1 addition & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class User(Model):
cpf = fields.CharField(max_length=11, unique=True, null=True, validators=[CPFValidator()])
email = fields.CharField(max_length=255, unique=True)
password = fields.CharField(max_length=255)
#secret_key = fields.CharField(max_length=255, null=True)
secret_key = fields.CharField(max_length=255, null=True)
is_active = fields.BooleanField(default=True)
is_superuser = fields.BooleanField(default=False)
user_class = fields.CharEnumField(enum_type=UserClassEnum, null=True, default=UserClassEnum.PIPELINE_USER)
Expand Down
91 changes: 71 additions & 20 deletions app/routers/auth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import io
from datetime import timedelta
from typing import Annotated
from typing import Annotated, Optional

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
Expand All @@ -11,7 +11,10 @@
from app.models import User
from app.types.pydantic_models import Token
from app.utils import authenticate_user, create_access_token
from app.security import TwoFactorAuth, get_two_factor_auth
from app.security import TwoFactorAuth
from app.dependencies import (
get_current_frontend_user
)


router = APIRouter(prefix="/auth", tags=["Autenticação"])
Expand Down Expand Up @@ -39,35 +42,83 @@ async def login_for_access_token(

return {"access_token": access_token, "token_type": "bearer"}

@router.post("/2fa/login")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
totp_code: Optional[str] = None,
) -> Token:

user: User = await authenticate_user(form_data.username, form_data.password)

if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

# IF 2FA is not Enabled
is_2fa_disabled = user.secret_key is None
is_trying_2fa = totp_code is not None

# If 2FA is enabled and user is initializing the session
if not is_2fa_disabled and not is_trying_2fa:
# User must provide a OTP
return {
"2fa_enabled": True
}

# If 2FA is enabled and user is trying to login with OTP
if is_trying_2fa and not is_2fa_disabled:
secret_key = await TwoFactorAuth.get_or_create_secret_key(user.id)
two_factor_auth = TwoFactorAuth(user.id, secret_key)

is_valid = two_factor_auth.verify_totp_code(totp_code)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect OTP",
headers={"WWW-Authenticate": "Bearer"},
)

# If 2FA is disabled
if is_2fa_disabled:
is_valid = True

# Generate Token
access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)

@router.post("/enable-2fa/{user_id}")
return {
"2fa_enabled": False,
"access_token": access_token,
"token_type": "bearer"
}


@router.post("/2fa/enable/")
async def enable_2fa(
two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth)
current_user: Annotated[User, Depends(get_current_frontend_user)],
):
secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id)
two_factor_auth = TwoFactorAuth(current_user.id, secret_key)

return {
"secret_key": two_factor_auth.secret_key
}


@router.get("/generate-qr/{user_id}")
@router.get("/2fa/generate-qr/")
async def generate_qr(
two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth)
current_user: Annotated[User, Depends(get_current_frontend_user)],
):
secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id)
two_factor_auth = TwoFactorAuth(current_user.id, secret_key)

qr_code = two_factor_auth.qr_code
if qr_code is None:
raise HTTPException(status_code=404, detail="User not found")

return StreamingResponse(io.BytesIO(qr_code), media_type="image/png")


@router.post("/verify-totp/{user_id}")
async def verify_totp(
totp_code: str,
two_factor_auth: TwoFactorAuth = Depends(get_two_factor_auth),
):
is_valid = two_factor_auth.verify_totp_code(totp_code)
if not is_valid:
raise HTTPException(status_code=400, detail="Code invalid")
return {
"valid": is_valid
}
return StreamingResponse(io.BytesIO(qr_code), media_type="image/png")
6 changes: 3 additions & 3 deletions app/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ async def get_or_create_secret_key(user_id: str) -> str:
if not user.secret_key:
secret_key = TwoFactorAuth._generate_secret_key()
user.secret_key = secret_key
user.save()
await user.save()

return user.secret_key

def _create_qr_code(self) -> bytes:
uri = self.totp.provisioning_uri(
name=self._user_id,
name=str(self._user_id),
issuer_name="2FA",
)
img = qrcode.make(uri)
img_byte_array = io.BytesIO()
img.save(img_byte_array, format="PNG")
img.save(img_byte_array)
img_byte_array.seek(0)
return img_byte_array.getvalue()

Expand Down
12 changes: 12 additions & 0 deletions migrations/app/27_20240830145823_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
from tortoise import BaseDBAsyncClient


async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" ADD "secret_key" VARCHAR(255);"""


async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "user" DROP COLUMN "secret_key";"""

0 comments on commit 603e6a4

Please sign in to comment.