-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #212 from prefeitura-rio/development
Development
- Loading branch information
Showing
14 changed files
with
389 additions
and
203 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,127 @@ | ||
# -*- coding: utf-8 -*- | ||
from datetime import timedelta | ||
import io | ||
from typing import Annotated | ||
|
||
from fastapi import APIRouter, Depends, HTTPException, status | ||
from fastapi.security import OAuth2PasswordRequestForm | ||
from fastapi.responses import StreamingResponse | ||
|
||
from app import config | ||
from app.models import User | ||
from app.types.pydantic_models import Token | ||
from app.utils import authenticate_user, create_access_token | ||
from app.utils import authenticate_user, generate_user_token | ||
from app.security import TwoFactorAuth | ||
from app.dependencies import ( | ||
get_current_frontend_user | ||
) | ||
|
||
|
||
router = APIRouter(prefix="/auth", tags=["Autenticação"]) | ||
|
||
|
||
@router.post("/token") | ||
async def login_for_access_token( | ||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()] | ||
async def login_without_2fa( | ||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], | ||
) -> Token: | ||
|
||
user: User = await authenticate_user(form_data.username, form_data.password) | ||
user = await authenticate_user(form_data.username, form_data.password) | ||
if not user: | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="Incorrect username or password", | ||
headers={"WWW-Authenticate": "Bearer"}, | ||
) | ||
|
||
if user.is_2fa_required: | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="2FA required. Use the /2fa/login/ endpoint", | ||
headers={"WWW-Authenticate": "Bearer"}, | ||
) | ||
|
||
return { | ||
"access_token": generate_user_token(user), | ||
"token_type": "bearer" | ||
} | ||
|
||
|
||
@router.post("/2fa/is-2fa-active/") | ||
async def is_2fa_active( | ||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], | ||
) -> bool: | ||
user = await authenticate_user(form_data.username, form_data.password) | ||
if not user: | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="Incorrect username or password", | ||
headers={"WWW-Authenticate": "Bearer"}, | ||
) | ||
|
||
return user.is_2fa_activated | ||
|
||
|
||
@router.post("/2fa/login/") | ||
async def login_with_2fa( | ||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], | ||
totp_code: str, | ||
) -> Token: | ||
|
||
user = await authenticate_user(form_data.username, form_data.password) | ||
if not user: | ||
raise HTTPException( | ||
status_code = status.HTTP_401_UNAUTHORIZED, | ||
detail = "Incorrect username or password", | ||
headers = {"WWW-Authenticate": "Bearer"}, | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="Incorrect username or password", | ||
headers={"WWW-Authenticate": "Bearer"}, | ||
) | ||
|
||
access_token_expires = timedelta( | ||
minutes = config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES | ||
) | ||
secret_key = await TwoFactorAuth.get_or_create_secret_key(user.id) | ||
two_factor_auth = TwoFactorAuth(user.id, secret_key) | ||
|
||
access_token = create_access_token( | ||
data = {"sub": user.username}, | ||
expires_delta = access_token_expires | ||
) | ||
is_valid = two_factor_auth.verify_totp_code(totp_code) | ||
if not is_valid: | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="Incorrect OTP", | ||
headers={"WWW-Authenticate": "Bearer"}, | ||
) | ||
if not user.is_2fa_activated: | ||
user.is_2fa_activated = True | ||
await user.save() | ||
|
||
return { | ||
"access_token": access_token, | ||
"token_type": "bearer" | ||
} | ||
"access_token": generate_user_token(user), | ||
"token_type": "bearer", | ||
} | ||
|
||
|
||
@router.post("/2fa/enable/") | ||
async def enable_2fa( | ||
current_user: Annotated[User, Depends(get_current_frontend_user)], | ||
): | ||
secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id) | ||
two_factor_auth = TwoFactorAuth(current_user.id, secret_key) | ||
|
||
return { | ||
"secret_key": two_factor_auth.secret_key | ||
} | ||
|
||
|
||
@router.get("/2fa/generate-qrcode/") | ||
async def generate_qrcode( | ||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], | ||
): | ||
current_user = await authenticate_user(form_data.username, form_data.password) | ||
if not current_user: | ||
raise HTTPException( | ||
status_code=status.HTTP_401_UNAUTHORIZED, | ||
detail="Incorrect username or password", | ||
headers={"WWW-Authenticate": "Bearer"}, | ||
) | ||
|
||
secret_key = await TwoFactorAuth.get_or_create_secret_key(current_user.id) | ||
two_factor_auth = TwoFactorAuth(current_user.id, secret_key) | ||
|
||
qr_code = two_factor_auth.qr_code | ||
if qr_code is None: | ||
raise HTTPException(status_code=404, detail="User not found") | ||
|
||
return StreamingResponse(io.BytesIO(qr_code), media_type="image/png") |
Oops, something went wrong.