ticketfrei3/kibicara/webapi/admin.py

167 lines
4.8 KiB
Python
Raw Normal View History

2020-07-01 19:34:16 +00:00
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
2020-07-16 12:02:52 +00:00
# Copyright (C) 2020 by Christian Hagenest <c.hagenest@pm.me>
2020-07-01 19:34:16 +00:00
#
# SPDX-License-Identifier: 0BSD
2020-07-11 10:54:07 +00:00
""" REST API endpoints for hood admins. """
2020-07-01 19:34:16 +00:00
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
2020-07-11 16:23:27 +00:00
from kibicara import email
from kibicara.config import config
2020-07-01 19:34:16 +00:00
from kibicara.model import Admin, AdminHoodRelation
from logging import getLogger
from nacl.encoding import URLSafeBase64Encoder
from nacl.exceptions import CryptoError
from nacl.secret import SecretBox
from passlib.hash import argon2
from ormantic.exceptions import NoMatch
from pickle import dumps, loads
from pydantic import BaseModel
2020-07-07 01:16:23 +00:00
from smtplib import SMTPException
2020-07-01 19:34:16 +00:00
from sqlite3 import IntegrityError
logger = getLogger(__name__)
class BodyAdmin(BaseModel):
email: str
password: str
class BodyAccessToken(BaseModel):
access_token: str
token_type: str = 'bearer'
2020-07-01 19:34:16 +00:00
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/admin/login')
2020-09-11 22:01:30 +00:00
secret_box = SecretBox(bytes.fromhex(config['secret']))
2020-07-01 19:34:16 +00:00
def to_token(**kwargs):
return secret_box.encrypt(dumps(kwargs), encoder=URLSafeBase64Encoder).decode(
'ascii'
)
2020-07-01 19:34:16 +00:00
def from_token(token):
return loads(
secret_box.decrypt(token.encode('ascii'), encoder=URLSafeBase64Encoder)
)
2020-07-01 19:34:16 +00:00
async def get_auth(email, password):
try:
admin = await Admin.objects.get(email=email)
if argon2.verify(password, admin.passhash):
return admin
raise ValueError
except NoMatch:
raise ValueError
async def get_admin(access_token=Depends(oauth2_scheme)):
try:
admin = await get_auth(**from_token(access_token))
except (CryptoError, ValueError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid authentication credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
2020-07-01 19:34:16 +00:00
return admin
router = APIRouter()
@router.post(
'/register/',
status_code=status.HTTP_202_ACCEPTED,
response_model=BaseModel,
operation_id='register',
)
2020-07-01 19:34:16 +00:00
async def admin_register(values: BodyAdmin):
"""Sends an email with a confirmation link.
2020-07-11 10:54:07 +00:00
- **email**: E-Mail Address of new hood admin
- **password**: Password of new hood admin
"""
2020-07-15 22:19:23 +00:00
if len(values.password) < 8:
logger.debug('Password is too short')
2020-07-15 22:19:23 +00:00
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail='Password is too short'
)
2020-07-16 12:02:52 +00:00
register_token = to_token(**values.__dict__)
logger.debug(f'register_token={register_token}')
2020-07-01 19:34:16 +00:00
try:
admin = await Admin.objects.filter(email=values.email).all()
if admin:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
body = f'{config["frontend_url"]}/confirm?token={register_token}'
logger.debug(body)
2020-07-11 16:23:27 +00:00
email.send_email(
to=values.email,
subject='Confirm Account',
body=body,
)
2020-07-07 01:16:23 +00:00
except (ConnectionRefusedError, SMTPException):
2020-07-01 19:34:16 +00:00
logger.exception('Email sending failed')
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY)
return {}
@router.post(
'/confirm/{register_token}',
response_model=BodyAccessToken,
operation_id='confirm',
)
2020-07-01 19:34:16 +00:00
async def admin_confirm(register_token: str):
"""Registration confirmation and account creation.
2020-07-11 10:54:07 +00:00
- **register_token**: Registration token received in email from /register
"""
2020-07-01 19:34:16 +00:00
try:
values = from_token(register_token)
passhash = argon2.hash(values['password'])
await Admin.objects.create(email=values['email'], passhash=passhash)
return BodyAccessToken(access_token=register_token)
2020-07-01 19:34:16 +00:00
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.post(
'/login/',
response_model=BodyAccessToken,
operation_id='login',
)
2020-07-01 19:34:16 +00:00
async def admin_login(form_data: OAuth2PasswordRequestForm = Depends()):
"""Get an access token.
2020-07-11 10:54:07 +00:00
- **username**: Email of a registered hood admin
- **password**: Password of a registered hood admin
"""
2020-07-01 19:34:16 +00:00
try:
await get_auth(form_data.username, form_data.password)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Incorrect email or password',
)
2020-07-01 19:34:16 +00:00
token = to_token(email=form_data.username, password=form_data.password)
return BodyAccessToken(access_token=token)
2020-07-01 19:34:16 +00:00
@router.get(
'/hoods/',
# TODO response_model,
operation_id='get_hoods_admin',
)
2020-07-01 19:34:16 +00:00
async def admin_hood_read_all(admin=Depends(get_admin)):
2020-07-11 10:54:07 +00:00
""" Get a list of all hoods of a given admin. """
return (
await AdminHoodRelation.objects.select_related('hood').filter(admin=admin).all()
)