[core] Add password reset function

This commit is contained in:
Cathy Hu 2020-10-08 19:23:47 +02:00 committed by acipm
parent 3fddb960d2
commit f1dc563846

View file

@ -6,6 +6,7 @@
""" REST API endpoints for hood admins. """
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from kibicara import email
@ -19,7 +20,7 @@ from nacl.secret import SecretBox
from passlib.hash import argon2
from ormantic.exceptions import NoMatch
from pickle import dumps, loads
from pydantic import BaseModel
from pydantic import BaseModel, validator
from smtplib import SMTPException
from sqlite3 import IntegrityError
@ -27,10 +28,23 @@ from sqlite3 import IntegrityError
logger = getLogger(__name__)
class BodyAdmin(BaseModel):
class BodyEmail(BaseModel):
email: str
class BodyPassword(BaseModel):
password: str
@validator('password')
def valid_password(cls, value):
if len(value) < 8:
raise ValueError('Password is too short')
return value
class BodyAdmin(BodyEmail, BodyPassword):
pass
class BodyAccessToken(BaseModel):
access_token: str
@ -155,6 +169,62 @@ async def admin_login(form_data: OAuth2PasswordRequestForm = Depends()):
return BodyAccessToken(access_token=token)
@router.post(
'/reset',
status_code=status.HTTP_202_ACCEPTED,
response_model=BaseModel,
operation_id='reset',
)
async def admin_reset_password(values: BodyEmail):
"""Sends an email with a password reset link.
- **email**: E-Mail Address of new hood admin
- **password**: Password of new hood admin
"""
register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__)
logger.debug(f'register_token={register_token}')
try:
admin = await Admin.objects.filter(email=values.email).all()
if not admin:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
body = f'{config["frontend_url"]}/password-reset?token={register_token}'
logger.debug(body)
email.send_email(
to=values.email,
subject='Reset your password',
body=body,
)
except (ConnectionRefusedError, SMTPException):
logger.exception('Email sending failed')
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY)
return {}
@router.post(
'/reset/{reset_token}',
response_model=BodyAccessToken,
operation_id='confirm reset',
)
async def admin_confirm_reset(reset_token: str, values: BodyPassword):
try:
token_values = from_token(reset_token)
if (
datetime.fromisoformat(token_values['datetime']) + timedelta(hours=3)
< datetime.now()
):
raise HTTPException(status_code=status.HTTP_410_GONE)
passhash = argon2.hash(values.password)
admins = await Admin.objects.filter(email=token_values['email']).all()
if len(admins) != 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
await admins[0].update(passhash=passhash)
return BodyAccessToken(access_token=reset_token)
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except CryptoError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
@router.get(
'/hoods/',
# TODO response_model,