diff --git a/kibicara/webapi/admin.py b/kibicara/webapi/admin.py index d45eff4..b563259 100644 --- a/kibicara/webapi/admin.py +++ b/kibicara/webapi/admin.py @@ -6,6 +6,7 @@ """ REST API endpoints for hood admins. """ +from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Response, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from kibicara import email @@ -19,7 +20,7 @@ from nacl.secret import SecretBox from passlib.hash import argon2 from ormantic.exceptions import NoMatch from pickle import dumps, loads -from pydantic import BaseModel +from pydantic import BaseModel, validator from smtplib import SMTPException from sqlite3 import IntegrityError @@ -27,10 +28,23 @@ from sqlite3 import IntegrityError logger = getLogger(__name__) -class BodyAdmin(BaseModel): +class BodyEmail(BaseModel): email: str + + +class BodyPassword(BaseModel): password: str + @validator('password') + def valid_password(cls, value): + if len(value) < 8: + raise ValueError('Password is too short') + return value + + +class BodyAdmin(BodyEmail, BodyPassword): + pass + class BodyAccessToken(BaseModel): access_token: str @@ -155,6 +169,62 @@ async def admin_login(form_data: OAuth2PasswordRequestForm = Depends()): return BodyAccessToken(access_token=token) +@router.post( + '/reset', + status_code=status.HTTP_202_ACCEPTED, + response_model=BaseModel, + operation_id='reset', +) +async def admin_reset_password(values: BodyEmail): + """Sends an email with a password reset link. + + - **email**: E-Mail Address of new hood admin + - **password**: Password of new hood admin + """ + register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__) + logger.debug(f'register_token={register_token}') + try: + admin = await Admin.objects.filter(email=values.email).all() + if not admin: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + body = f'{config["frontend_url"]}/password-reset?token={register_token}' + logger.debug(body) + email.send_email( + to=values.email, + subject='Reset your password', + body=body, + ) + except (ConnectionRefusedError, SMTPException): + logger.exception('Email sending failed') + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY) + return {} + + +@router.post( + '/reset/{reset_token}', + response_model=BodyAccessToken, + operation_id='confirm reset', +) +async def admin_confirm_reset(reset_token: str, values: BodyPassword): + try: + token_values = from_token(reset_token) + if ( + datetime.fromisoformat(token_values['datetime']) + timedelta(hours=3) + < datetime.now() + ): + raise HTTPException(status_code=status.HTTP_410_GONE) + passhash = argon2.hash(values.password) + admins = await Admin.objects.filter(email=token_values['email']).all() + if len(admins) != 1: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + await admins[0].update(passhash=passhash) + return BodyAccessToken(access_token=reset_token) + except IntegrityError: + raise HTTPException(status_code=status.HTTP_409_CONFLICT) + except CryptoError: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) + + @router.get( '/hoods/', # TODO response_model,