[misc] Migrate to TortoiseORM

This commit is contained in:
Thomas Lindner 2023-03-19 19:03:25 +01:00
parent 767c92000b
commit 72f66896ee
15 changed files with 400 additions and 410 deletions

View file

@ -34,6 +34,7 @@ install_requires =
pytoml
requests
scrypt
tortoise
[options.packages.find]
where = src
@ -52,19 +53,18 @@ skip_install = True
deps =
black
flake8
mypy
types-requests
commands =
black --check --diff src tests
flake8 src tests
# not yet
#mypy --ignore-missing-imports src tests
[testenv]
deps =
mypy
pytest
pytest-asyncio
types-requests
commands =
mypy --ignore-missing-imports src tests
pytest tests
[flake8]

View file

@ -4,17 +4,13 @@
#
# SPDX-License-Identifier: 0BSD
from nacl.secret import SecretBox
from nacl.utils import random
"""Default configuration.
The default configuration gets overwritten by a configuration file if one exists.
"""
config = {
"database_connection": "sqlite:////tmp/kibicara.sqlite",
"database_connection": "sqlite:///:memory:",
"frontend_url": "http://localhost:4200", # url of frontend, change in prod
"secret": random(SecretBox.KEY_SIZE).hex(), # generate with: openssl rand -hex 32
# production params
"frontend_path": None, # required, path to frontend html/css/js files
"production": True,

View file

@ -16,9 +16,9 @@ from fastapi.staticfiles import StaticFiles
from hypercorn.asyncio import serve
from hypercorn.config import Config
from pytoml import load
from tortoise import Tortoise
from kibicara.config import config
from kibicara.model import Mapping
from kibicara.platformapi import Spawner
from kibicara.webapi import router
@ -66,12 +66,17 @@ class Main:
format="%(asctime)s %(name)s %(message)s",
)
getLogger("aiosqlite").setLevel(WARNING)
Mapping.create_all()
asyncio_run(self.__run())
async def __run(self):
await Tortoise.init(
db_url=config["database_connection"],
modules={"models": ["kibicara.model", "kibicara.platforms.test.model"]},
)
await Tortoise.generate_schemas()
await Spawner.init_all()
await self.__start_webserver()
await Tortoise.close_connections()
async def __start_webserver(self):
class SinglePageApplication(StaticFiles):

View file

@ -1,4 +1,4 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
@ -6,69 +6,52 @@
"""ORM Models for core."""
from databases import Database
from ormantic import Boolean, ForeignKey, Integer, Model, Text
from sqlalchemy import MetaData, create_engine
from kibicara.config import config
class Mapping:
database = Database(config["database_connection"])
metadata = MetaData()
@classmethod
def create_all(cls):
engine = create_engine(str(cls.database.url))
cls.metadata.create_all(engine)
@classmethod
def drop_all(cls):
engine = create_engine(str(cls.database.url))
cls.metadata.drop_all(engine)
from tortoise import fields
from tortoise.models import Model
class Admin(Model):
id: Integer(primary_key=True) = None
email: Text(unique=True)
passhash: Text()
id = fields.IntField(pk=True)
email = fields.TextField(unique=True)
passhash = fields.TextField()
hoods: fields.ManyToManyRelation["Hood"] = fields.ManyToManyField(
"models.Hood", related_name="admins", through="admin_hood_relations"
)
class Mapping(Mapping):
table_name = "admins"
class Meta:
table = "admins"
class Hood(Model):
id: Integer(primary_key=True) = None
name: Text(unique=True)
landingpage: Text()
email_enabled: Boolean() = True
id = fields.IntField(pk=True)
name = fields.TextField(unique=True)
landingpage = fields.TextField()
email_enabled = fields.BooleanField(default=True)
admins: fields.ManyToManyRelation[Admin]
include_patterns: fields.ReverseRelation["IncludePattern"]
exclude_patterns: fields.ReverseRelation["ExcludePattern"]
class Mapping(Mapping):
table_name = "hoods"
class Meta:
table = "hoods"
class AdminHoodRelation(Model):
id: Integer(primary_key=True) = None
admin: ForeignKey(Admin)
hood: ForeignKey(Hood)
class IncludePattern(Model):
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="include_patterns"
)
pattern = fields.TextField()
class Mapping(Mapping):
table_name = "admin_hood_relations"
class Meta:
table = "include_patterns"
class Trigger(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
pattern: Text()
class ExcludePattern(Model):
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="exclude_patterns"
)
pattern = fields.TextField()
class Mapping(Mapping):
table_name = "triggers"
class BadWord(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
pattern: Text()
class Mapping(Mapping):
table_name = "badwords"
class Meta:
table = "exclude_patterns"

View file

@ -6,12 +6,15 @@
"""API classes for implementing bots for platforms."""
from asyncio import Queue, create_task
from asyncio import Queue, Task, create_task
from enum import Enum, auto
from logging import getLogger
from re import IGNORECASE, search
from typing import Generic, Optional, Type, TypeVar
from kibicara.model import BadWord, Trigger
from tortoise.models import Model
from kibicara.model import ExcludePattern, Hood, IncludePattern
logger = getLogger(__name__)
@ -29,7 +32,7 @@ class Message:
**kwargs (object, optional): Other platform-specific data.
"""
def __init__(self, text: str, **kwargs):
def __init__(self, text: str, **kwargs) -> None:
self.text = text
self.__dict__.update(kwargs)
@ -73,11 +76,11 @@ class Censor:
__instances: dict[int, list["Censor"]] = {}
def __init__(self, hood):
def __init__(self, hood: Hood) -> None:
self.hood = hood
self.enabled = True
self._inbox = Queue()
self.__task = None
self._inbox: Queue[Message] = Queue()
self.__task: Optional[Task[None]] = None
self.__hood_censors = self.__instances.setdefault(hood.id, [])
self.__hood_censors.append(self)
self.status = BotStatus.INSTANTIATED
@ -93,7 +96,8 @@ class Censor:
self.__task.cancel()
async def __run(self) -> None:
await self.hood.load()
assert self.__task is not None
await self.hood.refresh_from_db()
self.__task.set_name("{0} {1}".format(self.__class__.__name__, self.hood.name))
try:
self.status = BotStatus.RUNNING
@ -112,7 +116,7 @@ class Censor:
pass
@classmethod
async def destroy_hood(cls, hood) -> None:
async def destroy_hood(cls, hood: Hood) -> None:
"""Remove all of its database entries.
Note: Override this in the derived bot class.
@ -140,25 +144,29 @@ class Censor:
return await self._inbox.get()
async def __is_appropriate(self, message: Message) -> bool:
for badword in await BadWord.objects.filter(hood=self.hood).all():
if search(badword.pattern, message.text, IGNORECASE):
logger.debug(
for exclude in await ExcludePattern.filter(hood=self.hood):
if search(exclude.pattern, message.text, IGNORECASE):
logger.info(
"Matched bad word - dropped message: {0}".format(message.text)
)
return False
for trigger in await Trigger.objects.filter(hood=self.hood).all():
if search(trigger.pattern, message.text, IGNORECASE):
logger.debug(
for include in await IncludePattern.filter(hood=self.hood):
if search(include.pattern, message.text, IGNORECASE):
logger.info(
"Matched trigger - passed message: {0}".format(message.text)
)
return True
logger.debug(
logger.info(
"Did not match any trigger - dropped message: {0}".format(message.text)
)
return False
class Spawner:
ORMClass = TypeVar("ORMClass", bound=Model)
BotClass = TypeVar("BotClass", bound=Censor)
class Spawner(Generic[ORMClass, BotClass]):
"""Spawns a bot with a specific bot model.
Examples:
@ -179,10 +187,10 @@ class Spawner:
__instances: list["Spawner"] = []
def __init__(self, ORMClass, BotClass):
self.ORMClass = ORMClass
self.BotClass = BotClass
self.__bots = {}
def __init__(self, orm_class: Type[ORMClass], bot_class: Type[BotClass]) -> None:
self.ORMClass = orm_class
self.BotClass = bot_class
self.__bots: dict[int, BotClass] = {}
self.__instances.append(self)
@classmethod
@ -192,7 +200,7 @@ class Spawner:
await spawner._init()
@classmethod
async def destroy_hood(cls, hood) -> None:
async def destroy_hood(cls, hood: Hood) -> None:
for spawner in cls.__instances:
for pk in list(spawner.__bots):
bot = spawner.__bots[pk]
@ -202,10 +210,10 @@ class Spawner:
await spawner.BotClass.destroy_hood(hood)
async def _init(self) -> None:
for item in await self.ORMClass.objects.all():
async for item in self.ORMClass.all():
self.start(item)
def start(self, item) -> None:
def start(self, item: ORMClass) -> None:
"""Instantiate and start a bot with the provided ORM object.
Example:
@ -221,7 +229,7 @@ class Spawner:
if bot.enabled:
bot.start()
def stop(self, item) -> None:
def stop(self, item: ORMClass) -> None:
"""Stop and delete a bot.
Args:
@ -231,10 +239,10 @@ class Spawner:
if bot is not None:
bot.stop()
def get(self, item) -> Censor:
def get(self, item: ORMClass) -> BotClass:
"""Get a running bot.
Args:
item (ORM Model object): ORM object corresponding to bot.
"""
return self.__bots.get(item.pk)
return self.__bots[item.pk]

View file

@ -1,17 +1,17 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from kibicara.platformapi import Censor, Spawner
from kibicara.platformapi import Censor, Message, Spawner
from kibicara.platforms.test.model import Test
class TestBot(Censor):
def __init__(self, test):
def __init__(self, test: Test):
super().__init__(test.hood)
self.messages = []
self.messages: list[Message] = []
async def run(self):
while True:

View file

@ -1,16 +1,19 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from ormantic import ForeignKey, Integer, Model
from tortoise import fields
from tortoise.models import Model
from kibicara.model import Hood, Mapping
from kibicara.model import Hood
class Test(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="platforms_test"
)
class Mapping(Mapping):
table_name = "testapi"
class Meta:
table_name = "platforms_test"

View file

@ -1,15 +1,14 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import Hood
from kibicara.platformapi import Message
from kibicara.platforms.test.bot import spawner
from kibicara.platforms.test.model import Test
@ -20,10 +19,10 @@ class BodyMessage(BaseModel):
text: str
async def get_test(test_id: int, hood=Depends(get_hood)):
async def get_test(test_id: int, hood: Hood = Depends(get_hood)) -> Test:
try:
return await Test.objects.get(id=test_id, hood=hood)
except NoMatch:
return await Test.get(id=test_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@ -31,14 +30,14 @@ router = APIRouter()
@router.get("/")
async def test_read_all(hood=Depends(get_hood)):
return await Test.objects.filter(hood=hood).all()
async def test_read_all(hood: Hood = Depends(get_hood)):
return await Test.filter(hood=hood)
@router.post("/", status_code=status.HTTP_201_CREATED)
async def test_create(response: Response, hood=Depends(get_hood)):
async def test_create(response: Response, hood: Hood = Depends(get_hood)):
try:
test = await Test.objects.create(hood=hood)
test = await Test.create(hood=hood)
spawner.start(test)
response.headers["Location"] = str(test.id)
return test
@ -47,22 +46,22 @@ async def test_create(response: Response, hood=Depends(get_hood)):
@router.get("/{test_id}")
async def test_read(test=Depends(get_test)):
async def test_read(test: Test = Depends(get_test)):
return test
@router.delete("/{test_id}", status_code=status.HTTP_204_NO_CONTENT)
async def test_delete(test=Depends(get_test)):
async def test_delete(test: Test = Depends(get_test)):
spawner.stop(test)
await test.delete()
@router.get("/{test_id}/messages/")
async def test_message_read_all(test=Depends(get_test)):
async def test_message_read_all(test: Test = Depends(get_test)):
return spawner.get(test).messages
@router.post("/{test_id}/messages/")
async def test_message_create(message: BodyMessage, test=Depends(get_test)):
async def test_message_create(message: BodyMessage, test: Test = Depends(get_test)):
await spawner.get(test).publish(Message(message.text))
return {}

View file

@ -11,20 +11,20 @@ from datetime import datetime, timedelta
from logging import getLogger
from pickle import dumps, loads
from smtplib import SMTPException
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from nacl.encoding import URLSafeBase64Encoder
from nacl.exceptions import CryptoError
from nacl.secret import SecretBox
from ormantic.exceptions import NoMatch
from nacl.utils import random
from passlib.hash import argon2
from pydantic import BaseModel, validator
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara import email
from kibicara.config import config
from kibicara.model import Admin, AdminHoodRelation, Hood
from kibicara.model import Admin, Hood
from kibicara.webapi.utils import delete_hood
logger = getLogger(__name__)
@ -54,41 +54,42 @@ class BodyAccessToken(BaseModel):
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/login")
secret_box = SecretBox(bytes.fromhex(config["secret"]))
secret_box = SecretBox(
bytes.fromhex(str(config.get("secret", random(SecretBox.KEY_SIZE).hex())))
)
def to_token(**kwargs):
def to_token(**kwargs) -> str:
return secret_box.encrypt(dumps(kwargs), encoder=URLSafeBase64Encoder).decode(
"ascii"
)
def from_token(token):
def from_token(token: str) -> dict:
return loads(
secret_box.decrypt(token.encode("ascii"), encoder=URLSafeBase64Encoder)
)
async def get_auth(email, password):
async def get_auth(email: str, password: str) -> Admin:
try:
admin = await Admin.objects.get(email=email)
admin = await Admin.get(email=email)
if argon2.verify(password, admin.passhash):
return admin
raise ValueError
except NoMatch:
except DoesNotExist:
raise ValueError
async def get_admin(access_token=Depends(oauth2_scheme)):
async def get_admin(access_token: str = Depends(oauth2_scheme)) -> Admin:
try:
admin = await get_auth(**from_token(access_token))
return await get_auth(**from_token(access_token))
except (CryptoError, ValueError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return admin
router = APIRouter()
@ -109,9 +110,9 @@ async def admin_register(values: BodyAdmin):
register_token = to_token(**values.__dict__)
logger.debug("register_token={0}".format(register_token))
try:
admin = await Admin.objects.filter(email=values.email).all()
if admin:
if await Admin.exists(email=values.email):
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# link goes to frontend. this is not the confirm API endpoint below!
body = "{0}/confirm?token={1}".format(config["frontend_url"], register_token)
logger.debug(body)
email.send_email(
@ -138,7 +139,8 @@ async def admin_confirm(register_token: str):
try:
values = from_token(register_token)
passhash = argon2.hash(values["password"])
await Admin.objects.create(email=values["email"], passhash=passhash)
await Admin.create(email=values["email"], passhash=passhash)
# XXX login and registration tokens are exchangeable. does this hurt?
return BodyAccessToken(access_token=register_token)
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -178,14 +180,14 @@ async def admin_reset_password(values: BodyEmail):
- **email**: E-Mail Address of new hood admin
- **password**: Password of new hood admin
"""
register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__)
logger.debug("register_token={0}".format(register_token))
reset_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__)
logger.debug("reset_token={0}".format(reset_token))
try:
admin = await Admin.objects.filter(email=values.email).all()
if not admin:
if await Admin.exists(email=values.email):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# link goes to frontend. this is not the reset API endpoint below!
body = "{0}/password-reset?token={1}".format(
config["frontend_url"], register_token
config["frontend_url"], reset_token
)
logger.debug(body)
email.send_email(
@ -213,11 +215,10 @@ async def admin_confirm_reset(reset_token: str, values: BodyPassword):
):
raise HTTPException(status_code=status.HTTP_410_GONE)
passhash = argon2.hash(values.password)
admins = await Admin.objects.filter(email=token_values["email"]).all()
if len(admins) != 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
await admins[0].update(passhash=passhash)
await Admin.get(email=token_values["email"]).update(passhash=passhash)
return BodyAccessToken(access_token=reset_token)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except CryptoError:
@ -229,11 +230,9 @@ async def admin_confirm_reset(reset_token: str, values: BodyPassword):
# TODO response_model,
operation_id="get_hoods_admin",
)
async def admin_hood_read_all(admin=Depends(get_admin)):
async def admin_hood_read_all(admin: Admin = Depends(get_admin)):
"""Get a list of all hoods of a given admin."""
return (
await AdminHoodRelation.objects.select_related("hood").filter(admin=admin).all()
)
return await Hood.filter(admin=admin)
@router.get(
@ -241,12 +240,8 @@ async def admin_hood_read_all(admin=Depends(get_admin)):
# TODO response_model,
operation_id="get_admin",
)
async def admin_read(admin=Depends(get_admin)):
"""Get a list of all hoods of a given admin."""
admin = await Admin.objects.filter(email=admin.email).all()
if len(admin) != 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return BodyEmail(email=admin[0].email)
async def admin_read(admin: Admin = Depends(get_admin)):
return BodyEmail(email=admin.email)
@router.delete(
@ -254,18 +249,10 @@ async def admin_read(admin=Depends(get_admin)):
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_admin",
)
async def admin_delete(admin=Depends(get_admin)):
hood_relations = (
await AdminHoodRelation.objects.select_related("hood").filter(admin=admin).all()
)
for hood in hood_relations:
admins = (
await AdminHoodRelation.objects.select_related("admin")
.filter(hood=hood.id)
.all()
)
if len(admins) == 1 and admins[0].id == admin.id:
actual_hood = await Hood.objects.filter(id=hood.id).all()
await delete_hood(actual_hood[0])
async def admin_delete(admin: Admin = Depends(get_admin)):
async for hood in Hood.filter(admins__contains=admin):
await hood.admins.remove(admin)
await hood.fetch_related("admins")
if len(hood.admins) == 0:
await delete_hood(hood)
await admin.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -1,4 +1,4 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
@ -6,13 +6,11 @@
"""REST API Endpoints for managing hoods."""
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import AdminHoodRelation, Hood, Trigger
from kibicara.model import Admin, Hood, IncludePattern
from kibicara.platforms.email.bot import spawner
from kibicara.webapi.admin import get_admin
from kibicara.webapi.utils import delete_hood
@ -20,28 +18,24 @@ from kibicara.webapi.utils import delete_hood
class BodyHood(BaseModel):
name: str
landingpage: str = """
Default Landing Page
"""
landingpage: str = "Default Landing Page"
async def get_hood_unauthorized(hood_id: int):
async def get_hood_unauthorized(hood_id: int) -> Hood:
try:
hood = await Hood.objects.get(id=hood_id)
except NoMatch:
return await Hood.get(id=hood_id)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return hood
async def get_hood(hood=Depends(get_hood_unauthorized), admin=Depends(get_admin)):
async def get_hood(hood_id: int, admin: Admin = Depends(get_admin)) -> Hood:
try:
await AdminHoodRelation.objects.get(admin=admin, hood=hood)
except NoMatch:
return await Hood.get(id=hood_id, admins__in=[admin])
except DoesNotExist:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Bearer"},
)
return hood
router = APIRouter()
@ -55,7 +49,7 @@ router = APIRouter()
)
async def hood_read_all():
"""Get all existing hoods."""
return await Hood.objects.all()
return await Hood.all()
@router.post(
@ -65,7 +59,9 @@ async def hood_read_all():
operation_id="create_hood",
tags=["hoods"],
)
async def hood_create(values: BodyHood, response: Response, admin=Depends(get_admin)):
async def hood_create(
values: BodyHood, response: Response, admin: Admin = Depends(get_admin)
):
"""Creates a hood.
- **name**: Name of the hood
@ -73,11 +69,11 @@ async def hood_create(values: BodyHood, response: Response, admin=Depends(get_ad
"""
try:
hood = await Hood.objects.create(**values.__dict__)
await AdminHoodRelation.objects.create(admin=admin.id, hood=hood.id)
await admin.hoods.add(hood)
spawner.start(hood)
# Initialize Triggers to match all
await Trigger.objects.create(hood=hood, pattern=".")
await IncludePattern.create(hood=hood, pattern=".")
response.headers["Location"] = str(hood.id)
return hood
@ -91,25 +87,24 @@ async def hood_create(values: BodyHood, response: Response, admin=Depends(get_ad
operation_id="get_hood",
tags=["hoods"],
)
async def hood_read(hood=Depends(get_hood_unauthorized)):
async def hood_read(hood: Hood = Depends(get_hood_unauthorized)):
"""Get hood with id **hood_id**."""
return hood
@router.put(
"/{hood_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="update_hood",
tags=["hoods"],
)
async def hood_update(values: BodyHood, hood=Depends(get_hood)):
async def hood_update(values: BodyHood, hood: Hood = Depends(get_hood)):
"""Updates hood with id **hood_id**.
- **name**: New name of the hood
- **landingpage**: New Markdown formatted description of the hood
"""
await hood.update(**values.__dict__)
return Response(status_code=status.HTTP_204_NO_CONTENT)
return hood
@router.delete(
@ -121,4 +116,3 @@ async def hood_update(values: BodyHood, hood=Depends(get_hood)):
async def hood_delete(hood=Depends(get_hood)):
"""Deletes hood with id **hood_id**."""
await delete_hood(hood)
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -1,106 +0,0 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing badwords.
Provides API endpoints for adding, removing and reading regular expressions that block a
received message to be dropped by a censor. This provides a message filter customizable
by the hood admins.
"""
from re import compile as regex_compile
from re import error as RegexError
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from kibicara.model import BadWord
from kibicara.webapi.hoods import get_hood
class BodyBadWord(BaseModel):
pattern: str
async def get_badword(badword_id: int, hood=Depends(get_hood)):
try:
return await BadWord.objects.get(id=badword_id, hood=hood)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_badwords",
)
async def badword_read_all(hood=Depends(get_hood)):
"""Get all badwords of hood with id **hood_id**."""
return await BadWord.objects.filter(hood=hood).all()
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_badword",
)
async def badword_create(
values: BodyBadWord, response: Response, hood=Depends(get_hood)
):
"""Creates a new badword for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a badword.
"""
try:
regex_compile(values.pattern)
badword = await BadWord.objects.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(badword.id)
return badword
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{badword_id}",
# TODO response_model,
operation_id="get_badword",
)
async def badword_read(badword=Depends(get_badword)):
"""Reads badword with id **badword_id** for hood with id **hood_id**."""
return badword
@router.put(
"/{badword_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="update_badword",
)
async def badword_update(values: BodyBadWord, badword=Depends(get_badword)):
"""Updates badword with id **badword_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a badword
"""
await badword.update(**values.__dict__)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.delete(
"/{badword_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_badword",
)
async def badword_delete(badword=Depends(get_badword)):
"""Deletes badword with id **badword_id** for hood with id **hood_id**."""
await badword.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -0,0 +1,116 @@
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing exclude_patterns.
Provides API endpoints for adding, removing and reading regular expressions that block a
received message to be dropped by a censor. This provides a message filter customizable
by the hood admins.
"""
from re import compile as regex_compile, error as RegexError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import ExcludePattern, Hood
from kibicara.webapi.hoods import get_hood
class BodyExcludePattern(BaseModel):
pattern: str
async def get_exclude_pattern(
exclude_pattern_id: int, hood: Hood = Depends(get_hood)
) -> ExcludePattern:
try:
return await ExcludePattern.get(id=exclude_pattern_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_exclude_patterns",
)
async def exclude_pattern_read_all(hood: Hood = Depends(get_hood)):
"""Get all exclude_patterns of hood with id **hood_id**."""
return await ExcludePattern.filter(hood=hood)
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_exclude_pattern",
)
async def exclude_pattern_create(
values: BodyExcludePattern, response: Response, hood: Hood = Depends(get_hood)
):
"""Creates a new exclude_pattern for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a exclude_pattern.
"""
try:
regex_compile(values.pattern)
exclude_pattern = await ExcludePattern.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(exclude_pattern.id)
return exclude_pattern
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{exclude_pattern_id}",
# TODO response_model,
operation_id="get_exclude_pattern",
)
async def exclude_pattern_read(
exclude_pattern: ExcludePattern = Depends(get_exclude_pattern),
):
"""
Reads exclude_pattern with id **exclude_pattern_id** for hood with id **hood_id**.
"""
return exclude_pattern
@router.put(
"/{exclude_pattern_id}",
operation_id="update_exclude_pattern",
)
async def exclude_pattern_update(
values: BodyExcludePattern,
exclude_pattern: ExcludePattern = Depends(get_exclude_pattern),
):
"""
Updates exclude_pattern with id **exclude_pattern_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a exclude_pattern
"""
await exclude_pattern.update(**values.__dict__)
return exclude_pattern
@router.delete(
"/{exclude_pattern_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_exclude_pattern",
)
async def exclude_pattern_delete(
exclude_pattern: ExcludePattern = Depends(get_exclude_pattern),
):
"""
Deletes exclude_pattern with id **exclude_pattern_id** for hood with id **hood_id**.
"""
await exclude_pattern.delete()

View file

@ -0,0 +1,115 @@
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing include_patterns.
Provides API endpoints for adding, removing and reading regular expressions that allow a
message to be passed through by a censor. A published message must match one of these
regular expressions otherwise it gets dropped by the censor. This provides a message
filter customizable by the hood admins.
"""
from re import compile as regex_compile, error as RegexError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import IncludePattern, Hood
from kibicara.webapi.hoods import get_hood
class BodyIncludePattern(BaseModel):
pattern: str
async def get_include_pattern(include_pattern_id: int, hood: Hood = Depends(get_hood)):
try:
return await IncludePattern.get(id=include_pattern_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_include_patterns",
)
async def include_pattern_read_all(hood: Hood = Depends(get_hood)):
"""Get all include_patterns of hood with id **hood_id**."""
return await IncludePattern.filter(hood=hood)
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_include_pattern",
)
async def include_pattern_create(
values: BodyIncludePattern, response: Response, hood: Hood = Depends(get_hood)
):
"""Creates a new include_pattern for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a include_pattern.
"""
try:
regex_compile(values.pattern)
include_pattern = await IncludePattern.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(include_pattern.id)
return include_pattern
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{include_pattern_id}",
# TODO response_model,
operation_id="get_include_pattern",
)
async def include_pattern_read(
include_pattern: IncludePattern = Depends(get_include_pattern),
):
"""
Reads include_pattern with id **include_pattern_id** for hood with id **hood_id**.
"""
return include_pattern
@router.put(
"/{include_pattern_id}",
operation_id="update_include_pattern",
)
async def include_pattern_update(
values: BodyIncludePattern,
include_pattern: IncludePattern = Depends(get_include_pattern),
):
"""
Updates include_pattern with id **include_pattern_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a include_pattern
"""
await include_pattern.update(**values.__dict__)
return include_pattern
@router.delete(
"/{include_pattern_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_include_pattern",
)
async def include_pattern_delete(
include_pattern: IncludePattern = Depends(get_include_pattern),
):
"""
Deletes include_pattern with id **include_pattern_id** for hood with id **hood_id**.
"""
await include_pattern.delete()

View file

@ -1,107 +0,0 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing triggers.
Provides API endpoints for adding, removing and reading regular expressions that allow a
message to be passed through by a censor. A published message must match one of these
regular expressions otherwise it gets dropped by the censor. This provides a message
filter customizable by the hood admins.
"""
from re import compile as regex_compile
from re import error as RegexError
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from kibicara.model import Trigger
from kibicara.webapi.hoods import get_hood
class BodyTrigger(BaseModel):
pattern: str
async def get_trigger(trigger_id: int, hood=Depends(get_hood)):
try:
return await Trigger.objects.get(id=trigger_id, hood=hood)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_triggers",
)
async def trigger_read_all(hood=Depends(get_hood)):
"""Get all triggers of hood with id **hood_id**."""
return await Trigger.objects.filter(hood=hood).all()
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_trigger",
)
async def trigger_create(
values: BodyTrigger, response: Response, hood=Depends(get_hood)
):
"""Creates a new trigger for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a trigger.
"""
try:
regex_compile(values.pattern)
trigger = await Trigger.objects.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(trigger.id)
return trigger
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{trigger_id}",
# TODO response_model,
operation_id="get_trigger",
)
async def trigger_read(trigger=Depends(get_trigger)):
"""Reads trigger with id **trigger_id** for hood with id **hood_id**."""
return trigger
@router.put(
"/{trigger_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="update_trigger",
)
async def trigger_update(values: BodyTrigger, trigger=Depends(get_trigger)):
"""Updates trigger with id **trigger_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a trigger
"""
await trigger.update(**values.__dict__)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.delete(
"/{trigger_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_trigger",
)
async def trigger_delete(trigger=Depends(get_trigger)):
"""Deletes trigger with id **trigger_id** for hood with id **hood_id**."""
await trigger.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -1,17 +1,14 @@
# Copyright (C) 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
#
# SPDX-License-Identifier: 0BSD
from kibicara.model import AdminHoodRelation, BadWord, Trigger
from kibicara.model import IncludePattern, ExcludePattern
from kibicara.platformapi import Spawner
async def delete_hood(hood):
await Spawner.destroy_hood(hood)
for trigger in await Trigger.objects.filter(hood=hood).all():
await trigger.delete()
for badword in await BadWord.objects.filter(hood=hood).all():
await badword.delete()
for relation in await AdminHoodRelation.objects.filter(hood=hood).all():
await relation.delete()
await IncludePattern.filter(hood=hood).delete()
await ExcludePattern.filter(hood=hood).delete()
await hood.delete()