[misc] Migrate to TortoiseORM

This commit is contained in:
Thomas Lindner 2023-03-19 19:03:25 +01:00
parent 767c92000b
commit 9f0a872c6c
25 changed files with 534 additions and 524 deletions

View file

@ -34,6 +34,7 @@ install_requires =
pytoml
requests
scrypt
tortoise-orm
[options.packages.find]
where = src
@ -52,19 +53,19 @@ skip_install = True
deps =
black
flake8
mypy
types-requests
commands =
black --check --diff src tests
flake8 src tests
# not yet
#mypy --ignore-missing-imports src tests
[testenv]
deps =
mypy
pytest
pytest-asyncio
types-requests
commands =
# not yet
# mypy --ignore-missing-imports src tests
pytest tests
[flake8]

View file

@ -4,17 +4,13 @@
#
# SPDX-License-Identifier: 0BSD
from nacl.secret import SecretBox
from nacl.utils import random
"""Default configuration.
The default configuration gets overwritten by a configuration file if one exists.
"""
config = {
"database_connection": "sqlite:////tmp/kibicara.sqlite",
"database_connection": "sqlite://:memory:",
"frontend_url": "http://localhost:4200", # url of frontend, change in prod
"secret": random(SecretBox.KEY_SIZE).hex(), # generate with: openssl rand -hex 32
# production params
"frontend_path": None, # required, path to frontend html/css/js files
"production": True,

View file

@ -16,9 +16,9 @@ from fastapi.staticfiles import StaticFiles
from hypercorn.asyncio import serve
from hypercorn.config import Config
from pytoml import load
from tortoise import Tortoise
from kibicara.config import config
from kibicara.model import Mapping
from kibicara.platformapi import Spawner
from kibicara.webapi import router
@ -66,12 +66,25 @@ class Main:
format="%(asctime)s %(name)s %(message)s",
)
getLogger("aiosqlite").setLevel(WARNING)
Mapping.create_all()
asyncio_run(self.__run())
async def __run(self):
await Tortoise.init(
db_url=config["database_connection"],
modules={
"models": [
"kibicara.model",
"kibicara.platforms.email.model",
"kibicara.platforms.telegram.model",
"kibicara.platforms.test.model",
"kibicara.platforms.twitter.model",
]
},
)
await Tortoise.generate_schemas()
await Spawner.init_all()
await self.__start_webserver()
await Tortoise.close_connections()
async def __start_webserver(self):
class SinglePageApplication(StaticFiles):

View file

@ -1,4 +1,4 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
@ -6,69 +6,52 @@
"""ORM Models for core."""
from databases import Database
from ormantic import Boolean, ForeignKey, Integer, Model, Text
from sqlalchemy import MetaData, create_engine
from kibicara.config import config
class Mapping:
database = Database(config["database_connection"])
metadata = MetaData()
@classmethod
def create_all(cls):
engine = create_engine(str(cls.database.url))
cls.metadata.create_all(engine)
@classmethod
def drop_all(cls):
engine = create_engine(str(cls.database.url))
cls.metadata.drop_all(engine)
from tortoise import fields
from tortoise.models import Model
class Admin(Model):
id: Integer(primary_key=True) = None
email: Text(unique=True)
passhash: Text()
id = fields.IntField(pk=True)
email = fields.CharField(64, unique=True)
passhash = fields.TextField()
hoods: fields.ManyToManyRelation["Hood"] = fields.ManyToManyField(
"models.Hood", related_name="admins", through="admin_hood_relations"
)
class Mapping(Mapping):
table_name = "admins"
class Meta:
table = "admins"
class Hood(Model):
id: Integer(primary_key=True) = None
name: Text(unique=True)
landingpage: Text()
email_enabled: Boolean() = True
id = fields.IntField(pk=True)
name = fields.CharField(64, unique=True)
landingpage = fields.TextField()
email_enabled = fields.BooleanField(default=True)
admins: fields.ManyToManyRelation[Admin]
include_patterns: fields.ReverseRelation["IncludePattern"]
exclude_patterns: fields.ReverseRelation["ExcludePattern"]
class Mapping(Mapping):
table_name = "hoods"
class Meta:
table = "hoods"
class AdminHoodRelation(Model):
id: Integer(primary_key=True) = None
admin: ForeignKey(Admin)
hood: ForeignKey(Hood)
class IncludePattern(Model):
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="include_patterns"
)
pattern = fields.TextField()
class Mapping(Mapping):
table_name = "admin_hood_relations"
class Meta:
table = "include_patterns"
class Trigger(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
pattern: Text()
class ExcludePattern(Model):
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="exclude_patterns"
)
pattern = fields.TextField()
class Mapping(Mapping):
table_name = "triggers"
class BadWord(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
pattern: Text()
class Mapping(Mapping):
table_name = "badwords"
class Meta:
table = "exclude_patterns"

View file

@ -6,12 +6,15 @@
"""API classes for implementing bots for platforms."""
from asyncio import Queue, create_task
from asyncio import Queue, Task, create_task
from enum import Enum, auto
from logging import getLogger
from re import IGNORECASE, search
from typing import Generic, Optional, Type, TypeVar
from kibicara.model import BadWord, Trigger
from tortoise.models import Model
from kibicara.model import ExcludePattern, Hood, IncludePattern
logger = getLogger(__name__)
@ -29,7 +32,7 @@ class Message:
**kwargs (object, optional): Other platform-specific data.
"""
def __init__(self, text: str, **kwargs):
def __init__(self, text: str, **kwargs) -> None:
self.text = text
self.__dict__.update(kwargs)
@ -73,11 +76,11 @@ class Censor:
__instances: dict[int, list["Censor"]] = {}
def __init__(self, hood):
def __init__(self, hood: Hood) -> None:
self.hood = hood
self.enabled = True
self._inbox = Queue()
self.__task = None
self._inbox: Queue[Message] = Queue()
self.__task: Optional[Task[None]] = None
self.__hood_censors = self.__instances.setdefault(hood.id, [])
self.__hood_censors.append(self)
self.status = BotStatus.INSTANTIATED
@ -93,7 +96,8 @@ class Censor:
self.__task.cancel()
async def __run(self) -> None:
await self.hood.load()
assert self.__task is not None
await self.hood.refresh_from_db()
self.__task.set_name("{0} {1}".format(self.__class__.__name__, self.hood.name))
try:
self.status = BotStatus.RUNNING
@ -112,7 +116,7 @@ class Censor:
pass
@classmethod
async def destroy_hood(cls, hood) -> None:
async def destroy_hood(cls, hood: Hood) -> None:
"""Remove all of its database entries.
Note: Override this in the derived bot class.
@ -140,25 +144,29 @@ class Censor:
return await self._inbox.get()
async def __is_appropriate(self, message: Message) -> bool:
for badword in await BadWord.objects.filter(hood=self.hood).all():
if search(badword.pattern, message.text, IGNORECASE):
logger.debug(
for exclude in await ExcludePattern.filter(hood=self.hood):
if search(exclude.pattern, message.text, IGNORECASE):
logger.info(
"Matched bad word - dropped message: {0}".format(message.text)
)
return False
for trigger in await Trigger.objects.filter(hood=self.hood).all():
if search(trigger.pattern, message.text, IGNORECASE):
logger.debug(
for include in await IncludePattern.filter(hood=self.hood):
if search(include.pattern, message.text, IGNORECASE):
logger.info(
"Matched trigger - passed message: {0}".format(message.text)
)
return True
logger.debug(
logger.info(
"Did not match any trigger - dropped message: {0}".format(message.text)
)
return False
class Spawner:
ORMClass = TypeVar("ORMClass", bound=Model)
BotClass = TypeVar("BotClass", bound=Censor)
class Spawner(Generic[ORMClass, BotClass]):
"""Spawns a bot with a specific bot model.
Examples:
@ -179,10 +187,10 @@ class Spawner:
__instances: list["Spawner"] = []
def __init__(self, ORMClass, BotClass):
self.ORMClass = ORMClass
self.BotClass = BotClass
self.__bots = {}
def __init__(self, orm_class: Type[ORMClass], bot_class: Type[BotClass]) -> None:
self.ORMClass = orm_class
self.BotClass = bot_class
self.__bots: dict[int, BotClass] = {}
self.__instances.append(self)
@classmethod
@ -192,7 +200,7 @@ class Spawner:
await spawner._init()
@classmethod
async def destroy_hood(cls, hood) -> None:
async def destroy_hood(cls, hood: Hood) -> None:
for spawner in cls.__instances:
for pk in list(spawner.__bots):
bot = spawner.__bots[pk]
@ -202,15 +210,15 @@ class Spawner:
await spawner.BotClass.destroy_hood(hood)
async def _init(self) -> None:
for item in await self.ORMClass.objects.all():
async for item in self.ORMClass.all():
self.start(item)
def start(self, item) -> None:
def start(self, item: ORMClass) -> None:
"""Instantiate and start a bot with the provided ORM object.
Example:
```
xyz = await XYZ.objects.create(hood=hood, **values.__dict__)
xyz = await XYZ.create(hood=hood, **values.__dict__)
spawner.start(xyz)
```
@ -221,7 +229,7 @@ class Spawner:
if bot.enabled:
bot.start()
def stop(self, item) -> None:
def stop(self, item: ORMClass) -> None:
"""Stop and delete a bot.
Args:
@ -231,10 +239,10 @@ class Spawner:
if bot is not None:
bot.stop()
def get(self, item) -> Censor:
def get(self, item: ORMClass) -> BotClass:
"""Get a running bot.
Args:
item (ORM Model object): ORM object corresponding to bot.
"""
return self.__bots.get(item.pk)
return self.__bots[item.pk]

View file

@ -12,7 +12,7 @@ from kibicara import email
from kibicara.config import config
from kibicara.model import Hood
from kibicara.platformapi import Censor, Spawner
from kibicara.platforms.email.model import Email, EmailSubscribers
from kibicara.platforms.email.model import Email, EmailSubscriber
from kibicara.webapi.admin import to_token
logger = getLogger(__name__)
@ -26,9 +26,9 @@ class EmailBot(Censor):
@classmethod
async def destroy_hood(cls, hood):
"""Removes all its database entries."""
for inbox in await Email.objects.filter(hood=hood).all():
for inbox in await Email.filter(hood=hood).all():
await inbox.delete()
for subscriber in await EmailSubscribers.objects.filter(hood=hood).all():
for subscriber in await EmailSubscriber.filter(hood=hood).all():
await subscriber.delete()
async def run(self):
@ -40,9 +40,7 @@ class EmailBot(Censor):
self.hood.name, message.text
)
)
for subscriber in await EmailSubscribers.objects.filter(
hood=self.hood
).all():
for subscriber in await EmailSubscriber.filter(hood=self.hood).all():
token = to_token(email=subscriber.email, hood=self.hood.id)
body = (
"{0}\n\n--\n"

View file

@ -20,7 +20,7 @@ from pytoml import load
from requests import post
from kibicara.config import config
from kibicara.platforms.email.model import Email, EmailSubscribers
from kibicara.platforms.email.model import Email, EmailSubscriber
logger = getLogger(__name__)
@ -53,7 +53,7 @@ class Main:
async def __run(self, email_name):
try:
email = await Email.objects.get(name=email_name)
email = await Email.get(name=email_name)
except NoMatch:
logger.error("No recipient with this name")
exit(1)
@ -75,7 +75,7 @@ class Main:
logger.error("Could not parse sender")
exit(1)
maybe_subscriber = await EmailSubscribers.objects.filter(email=sender).all()
maybe_subscriber = await EmailSubscriber.filter(email=sender).all()
if len(maybe_subscriber) != 1 or maybe_subscriber[0].hood.id != email.hood.id:
logger.error("Not a subscriber")
exit(1)

View file

@ -1,33 +1,38 @@
# Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from ormantic import ForeignKey, Integer, Model, Text
from tortoise import fields
from tortoise.models import Model
from kibicara.model import Hood, Mapping
from kibicara.model import Hood
class Email(Model):
"""This table is used to track the names. It also stores the token secret."""
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
name: Text(unique=True)
secret: Text()
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="platforms_email", unique=True
)
name = fields.CharField(32, unique=True)
secret = fields.TextField()
class Mapping(Mapping):
table_name = "email"
class Meta:
table = "platforms_email"
class EmailSubscribers(Model):
class EmailSubscriber(Model):
"""This table stores all subscribers, who want to receive messages via email."""
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
email: Text(unique=True)
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="platforms_email_subscribers"
)
email = fields.CharField(64, unique=True)
class Mapping(Mapping):
table_name = "email_subscribers"
class Meta:
table = "platforms_email_subscribers"

View file

@ -1,6 +1,6 @@
# Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
@ -8,18 +8,18 @@
from logging import getLogger
from os import urandom
from smtplib import SMTPException
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from nacl import exceptions
from ormantic.exceptions import NoMatch
from pydantic import BaseModel, validator
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara import email
from kibicara.config import config
from kibicara.model import Hood
from kibicara.platformapi import Message
from kibicara.platforms.email.bot import spawner
from kibicara.platforms.email.model import Email, EmailSubscribers
from kibicara.platforms.email.model import Email, EmailSubscriber
from kibicara.webapi.admin import from_token, to_token
from kibicara.webapi.hoods import get_hood, get_hood_unauthorized
@ -53,7 +53,7 @@ class BodySubscriber(BaseModel):
email: str
async def get_email(email_id: int, hood=Depends(get_hood)):
async def get_email(email_id: int, hood: Hood = Depends(get_hood)):
"""Get Email row by hood.
You can specify an email_id to nail it down, but it works without as well.
@ -62,15 +62,15 @@ async def get_email(email_id: int, hood=Depends(get_hood)):
:return: Email row of the found email bot.
"""
try:
return await Email.objects.get(id=email_id, hood=hood)
except NoMatch:
return await Email.get(id=email_id, hood=hood)
except DoesNotExist:
return HTTPException(status_code=status.HTTP_404_NOT_FOUND)
async def get_subscriber(subscriber_id: int, hood=Depends(get_hood)):
async def get_subscriber(subscriber_id: int, hood: Hood = Depends(get_hood)):
try:
return await EmailSubscribers.objects.get(id=subscriber_id, hood=hood)
except NoMatch:
return await EmailSubscriber.get(id=subscriber_id, hood=hood)
except DoesNotExist:
return HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@ -83,9 +83,9 @@ router = APIRouter()
# TODO response_model
operation_id="get_emails_public",
)
async def email_read_all_public(hood=Depends(get_hood_unauthorized)):
async def email_read_all_public(hood: Hood = Depends(get_hood_unauthorized)):
if hood.email_enabled:
emails = await Email.objects.filter(hood=hood).all()
emails = await Email.filter(hood=hood)
return [BodyEmailPublic(name=email.name) for email in emails]
return []
@ -95,8 +95,8 @@ async def email_read_all_public(hood=Depends(get_hood_unauthorized)):
# TODO response_model
operation_id="get_emails",
)
async def email_read_all(hood=Depends(get_hood)):
return await Email.objects.filter(hood=hood).select_related("hood").all()
async def email_read_all(hood: Hood = Depends(get_hood)):
return await Email.filter(hood=hood)
@router.post(
@ -112,7 +112,7 @@ async def email_create(values: BodyEmail, response: Response, hood=Depends(get_h
:return: Email row of the new email bot.
"""
try:
email = await Email.objects.create(
email = await Email.create(
hood=hood, secret=urandom(32).hex(), **values.__dict__
)
response.headers["Location"] = str(hood.id)
@ -200,7 +200,7 @@ async def email_subscribe(
token,
)
try:
subs = await EmailSubscribers.objects.filter(email=subscriber.email).all()
subs = await EmailSubscriber.filter(email=subscriber.email).all()
if subs:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
email.send_email(
@ -239,7 +239,7 @@ async def email_subscribe_confirm(token, hood=Depends(get_hood_unauthorized)):
if hood.id is not payload["hood"]:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
try:
await EmailSubscribers.objects.create(hood=hood.id, email=payload["email"])
await EmailSubscriber.create(hood=hood.id, email=payload["email"])
return {}
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -262,12 +262,12 @@ async def email_unsubscribe(token, hood=Depends(get_hood_unauthorized)):
# If token.hood and url.hood are different, raise an error:
if hood.id is not payload["hood"]:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
subscriber = await EmailSubscribers.objects.filter(
subscriber = await EmailSubscriber.filter(
hood=payload["hood"], email=payload["email"]
).get()
await subscriber.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)
except NoMatch:
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
except exceptions.CryptoError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
@ -279,7 +279,7 @@ async def email_unsubscribe(token, hood=Depends(get_hood_unauthorized)):
operation_id="get_subscribers",
)
async def subscribers_read_all(hood=Depends(get_hood)):
return await EmailSubscribers.objects.filter(hood=hood).all()
return await EmailSubscriber.filter(hood=hood).all()
@router.get(
@ -306,7 +306,7 @@ async def email_message_create(
:param hood: Hood the Email bot belongs to.
:return: returns status code 201 if the message is accepted by the censor.
"""
for receiver in await Email.objects.filter(hood=hood).all():
for receiver in await Email.filter(hood=hood).all():
if message.secret == receiver.secret:
# pass message.text to bot.py
if await spawner.get(hood).publish(Message(message.text)):

View file

@ -5,13 +5,12 @@
from asyncio import CancelledError, gather, sleep
from logging import getLogger
from sqlite3 import IntegrityError
from aiogram import Bot, Dispatcher, exceptions, types
from ormantic.exceptions import NoMatch
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.platformapi import Censor, Message, Spawner
from kibicara.platforms.telegram.model import Telegram, TelegramUser
from kibicara.platforms.telegram.model import Telegram, TelegramSubscriber
logger = getLogger(__name__)
@ -25,8 +24,8 @@ class TelegramBot(Censor):
@classmethod
async def destroy_hood(cls, hood):
"""Removes all its database entries."""
for telegram in await Telegram.objects.filter(hood=hood).all():
for user in await TelegramUser.objects.filter(bot=telegram).all():
for telegram in await Telegram.filter(hood=hood).all():
for user in await TelegramSubscriber.filter(bot=telegram).all():
await user.delete()
await telegram.delete()
@ -69,9 +68,7 @@ class TelegramBot(Censor):
self.telegram_model.hood.name, message.text
)
)
for user in await TelegramUser.objects.filter(
bot=self.telegram_model
).all():
for user in await TelegramSubscriber.filter(bot=self.telegram_model).all():
await self._send_message(user.user_id, message.text)
async def _send_message(self, user_id, message):
@ -116,7 +113,7 @@ class TelegramBot(Censor):
if message.from_user.is_bot:
await message.reply("Error: Bots can not join here.")
return
await TelegramUser.objects.create(
await TelegramSubscriber.create(
user_id=message.from_user.id, bot=self.telegram_model
)
await message.reply(self.telegram_model.welcome_message)
@ -125,12 +122,12 @@ class TelegramBot(Censor):
async def _remove_user(self, message: types.Message):
try:
telegram_user = await TelegramUser.objects.get(
telegram_user = await TelegramSubscriber.get(
user_id=message.from_user.id, bot=self.telegram_model
)
await telegram_user.delete()
await message.reply("You were removed successfully from this bot.")
except NoMatch:
except DoesNotExist:
await message.reply("Error: You are not subscribed to this bot.")
async def _send_help(self, message: types.Message):

View file

@ -1,30 +1,35 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# Copyright (C) 2023 by Thomas Lindner <tom@dl6tom.de>
#
# SPDX-License-Identifier: 0BSD
from ormantic import Boolean, ForeignKey, Integer, Model, Text
from tortoise import fields
from tortoise.models import Model
from kibicara.model import Hood, Mapping
from kibicara.model import Hood
class Telegram(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
api_token: Text(unique=True)
welcome_message: Text()
username: Text(allow_null=True) = None
enabled: Boolean() = True
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="platforms_telegram"
)
api_token = fields.CharField(64, unique=True)
welcome_message = fields.TextField()
username = fields.TextField()
enabled = fields.BooleanField(default=True)
class Mapping(Mapping):
table_name = "telegrambots"
class Meta:
table = "platforms_telegram"
class TelegramUser(Model):
id: Integer(primary_key=True) = None
user_id: Integer(unique=True)
# TODO unique
bot: ForeignKey(Telegram)
class TelegramSubscriber(Model):
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="platforms_telegram_subscribers"
)
user_id = fields.IntField()
class Mapping(Mapping):
table_name = "telegramusers"
class Meta:
table = "platforms_telegram_subscribers"

View file

@ -4,16 +4,15 @@
# SPDX-License-Identifier: 0BSD
from logging import getLogger
from sqlite3 import IntegrityError
from aiogram import exceptions
from aiogram.bot.api import check_token
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel, validator
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.platforms.telegram.bot import spawner
from kibicara.platforms.telegram.model import Telegram, TelegramUser
from kibicara.platforms.telegram.model import Telegram, TelegramSubscriber
from kibicara.webapi.hoods import get_hood, get_hood_unauthorized
logger = getLogger(__name__)
@ -38,8 +37,8 @@ class BodyTelegramPublic(BaseModel):
async def get_telegram(telegram_id: int, hood=Depends(get_hood)):
try:
return await Telegram.objects.get(id=telegram_id, hood=hood)
except NoMatch:
return await Telegram.get(id=telegram_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@ -53,7 +52,7 @@ telegram_callback_router = APIRouter()
operation_id="get_telegrams_public",
)
async def telegram_read_all_public(hood=Depends(get_hood_unauthorized)):
telegrambots = await Telegram.objects.filter(hood=hood).all()
telegrambots = await Telegram.filter(hood=hood).all()
return [
BodyTelegramPublic(username=telegrambot.username)
for telegrambot in telegrambots
@ -67,7 +66,7 @@ async def telegram_read_all_public(hood=Depends(get_hood_unauthorized)):
operation_id="get_telegrams",
)
async def telegram_read_all(hood=Depends(get_hood)):
return await Telegram.objects.filter(hood=hood).all()
return await Telegram.filter(hood=hood).all()
@router.get(
@ -86,7 +85,7 @@ async def telegram_read(telegram=Depends(get_telegram)):
)
async def telegram_delete(telegram=Depends(get_telegram)):
spawner.stop(telegram)
for user in await TelegramUser.objects.filter(bot=telegram).all():
for user in await TelegramSubscriber.filter(bot=telegram).all():
await user.delete()
await telegram.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@ -102,7 +101,7 @@ async def telegram_create(
response: Response, values: BodyTelegram, hood=Depends(get_hood)
):
try:
telegram = await Telegram.objects.create(hood=hood, **values.__dict__)
telegram = await Telegram.create(hood=hood, **values.__dict__)
spawner.start(telegram)
response.headers["Location"] = str(telegram.id)
return telegram

View file

@ -1,17 +1,17 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from kibicara.platformapi import Censor, Spawner
from kibicara.platformapi import Censor, Message, Spawner
from kibicara.platforms.test.model import Test
class TestBot(Censor):
def __init__(self, test):
def __init__(self, test: Test):
super().__init__(test.hood)
self.messages = []
self.messages: list[Message] = []
async def run(self):
while True:

View file

@ -1,16 +1,19 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from ormantic import ForeignKey, Integer, Model
from tortoise import fields
from tortoise.models import Model
from kibicara.model import Hood, Mapping
from kibicara.model import Hood
class Test(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="platforms_test"
)
class Mapping(Mapping):
table_name = "testapi"
class Meta:
table = "platforms_test"

View file

@ -1,15 +1,14 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import Hood
from kibicara.platformapi import Message
from kibicara.platforms.test.bot import spawner
from kibicara.platforms.test.model import Test
@ -20,10 +19,10 @@ class BodyMessage(BaseModel):
text: str
async def get_test(test_id: int, hood=Depends(get_hood)):
async def get_test(test_id: int, hood: Hood = Depends(get_hood)) -> Test:
try:
return await Test.objects.get(id=test_id, hood=hood)
except NoMatch:
return await Test.get(id=test_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@ -31,14 +30,14 @@ router = APIRouter()
@router.get("/")
async def test_read_all(hood=Depends(get_hood)):
return await Test.objects.filter(hood=hood).all()
async def test_read_all(hood: Hood = Depends(get_hood)):
return await Test.filter(hood=hood)
@router.post("/", status_code=status.HTTP_201_CREATED)
async def test_create(response: Response, hood=Depends(get_hood)):
async def test_create(response: Response, hood: Hood = Depends(get_hood)):
try:
test = await Test.objects.create(hood=hood)
test = await Test.create(hood=hood)
spawner.start(test)
response.headers["Location"] = str(test.id)
return test
@ -47,22 +46,22 @@ async def test_create(response: Response, hood=Depends(get_hood)):
@router.get("/{test_id}")
async def test_read(test=Depends(get_test)):
async def test_read(test: Test = Depends(get_test)):
return test
@router.delete("/{test_id}", status_code=status.HTTP_204_NO_CONTENT)
async def test_delete(test=Depends(get_test)):
async def test_delete(test: Test = Depends(get_test)):
spawner.stop(test)
await test.delete()
@router.get("/{test_id}/messages/")
async def test_message_read_all(test=Depends(get_test)):
async def test_message_read_all(test: Test = Depends(get_test)):
return spawner.get(test).messages
@router.post("/{test_id}/messages/")
async def test_message_create(message: BodyMessage, test=Depends(get_test)):
async def test_message_create(message: BodyMessage, test: Test = Depends(get_test)):
await spawner.get(test).publish(Message(message.text))
return {}

View file

@ -1,23 +1,27 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# Copyright (C) 2023 by Thomas Lindner <tom@dl6tom.de>
#
# SPDX-License-Identifier: 0BSD
from ormantic import Boolean, ForeignKey, Integer, Model, Text
from tortoise import fields
from tortoise.models import Model
from kibicara.model import Hood, Mapping
from kibicara.model import Hood
class Twitter(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
dms_since_id: Integer(allow_null=True) = None
mentions_since_id: Integer(allow_null=True) = None
access_token: Text()
access_token_secret: Text()
username: Text(allow_null=True) = None
verified: Boolean() = False
enabled: Boolean() = False
id = fields.IntField(pk=True)
hood: fields.ForeignKeyRelation[Hood] = fields.ForeignKeyField(
"models.Hood", related_name="platforms_twitter"
)
dms_since_id = fields.IntField()
mentions_since_id = fields.IntField()
access_token = fields.TextField()
access_token_secret = fields.TextField()
username = fields.TextField()
verified = fields.BooleanField(default=False)
enabled = fields.BooleanField(default=False)
class Mapping(Mapping):
table_name = "twitterbots"
class Meta:
table = "platforms_twitter"

View file

@ -4,13 +4,12 @@
# SPDX-License-Identifier: 0BSD
from logging import getLogger
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from peony.exceptions import NotAuthenticated
from peony.oauth_dance import get_access_token, get_oauth_token
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.config import config
from kibicara.platforms.twitter.bot import spawner
@ -26,8 +25,8 @@ class BodyTwitterPublic(BaseModel):
async def get_twitter(twitter_id: int, hood=Depends(get_hood)):
try:
return await Twitter.objects.get(id=twitter_id, hood=hood)
except NoMatch:
return await Twitter.get(id=twitter_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@ -41,7 +40,7 @@ twitter_callback_router = APIRouter()
operation_id="get_twitters_public",
)
async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)):
twitterbots = await Twitter.objects.filter(hood=hood).all()
twitterbots = await Twitter.filter(hood=hood).all()
return [
BodyTwitterPublic(username=twitterbot.username)
for twitterbot in twitterbots
@ -55,7 +54,7 @@ async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)):
operation_id="get_twitters",
)
async def twitter_read_all(hood=Depends(get_hood)):
return await Twitter.objects.filter(hood=hood).all()
return await Twitter.filter(hood=hood).all()
@router.get(
@ -125,7 +124,7 @@ async def twitter_create(response: Response, hood=Depends(get_hood)):
"""
try:
# Purge Twitter corpses
for corpse in await Twitter.objects.filter(hood=hood, verified=False).all():
for corpse in await Twitter.filter(hood=hood, verified=False).all():
await corpse.delete()
# Create Twitter
request_token = await get_oauth_token(
@ -137,7 +136,7 @@ async def twitter_create(response: Response, hood=Depends(get_hood)):
)
if request_token["oauth_callback_confirmed"] != "true":
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
twitter = await Twitter.objects.create(
twitter = await Twitter.create(
hood=hood,
access_token=request_token["oauth_token"],
access_token_secret=request_token["oauth_token_secret"],
@ -159,7 +158,7 @@ async def twitter_read_callback(
oauth_token: str, oauth_verifier: str, hood=Depends(get_hood)
):
try:
twitter = await Twitter.objects.filter(access_token=oauth_token).get()
twitter = await Twitter.filter(access_token=oauth_token).get()
access_token = await get_access_token(
config["twitter"]["consumer_key"],
config["twitter"]["consumer_secret"],
@ -177,7 +176,7 @@ async def twitter_read_callback(
return {}
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except NoMatch:
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
except (KeyError, ValueError, NotAuthenticated):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)

View file

@ -19,16 +19,20 @@ from kibicara.platforms.twitter.webapi import router as twitter_router
from kibicara.platforms.twitter.webapi import twitter_callback_router
from kibicara.webapi.admin import router as admin_router
from kibicara.webapi.hoods import router as hoods_router
from kibicara.webapi.hoods.badwords import router as badwords_router
from kibicara.webapi.hoods.triggers import router as triggers_router
from kibicara.webapi.hoods.exclude_patterns import router as exclude_patterns_router
from kibicara.webapi.hoods.include_patterns import router as include_patterns_router
router = APIRouter()
router.include_router(admin_router, prefix="/admin", tags=["admin"])
hoods_router.include_router(
triggers_router, prefix="/{hood_id}/triggers", tags=["triggers"]
include_patterns_router,
prefix="/{hood_id}/include_patterns",
tags=["include_patterns"],
)
hoods_router.include_router(
badwords_router, prefix="/{hood_id}/badwords", tags=["badwords"]
exclude_patterns_router,
prefix="/{hood_id}/exclude_patterns",
tags=["exclude_patterns"],
)
hoods_router.include_router(test_router, prefix="/{hood_id}/test", tags=["test"])
hoods_router.include_router(

View file

@ -1,4 +1,4 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Christian Hagenest <c.hagenest@pm.me>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
@ -11,20 +11,20 @@ from datetime import datetime, timedelta
from logging import getLogger
from pickle import dumps, loads
from smtplib import SMTPException
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from nacl.encoding import URLSafeBase64Encoder
from nacl.exceptions import CryptoError
from nacl.secret import SecretBox
from ormantic.exceptions import NoMatch
from nacl.utils import random
from passlib.hash import argon2
from pydantic import BaseModel, validator
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara import email
from kibicara.config import config
from kibicara.model import Admin, AdminHoodRelation, Hood
from kibicara.model import Admin, Hood
from kibicara.webapi.utils import delete_hood
logger = getLogger(__name__)
@ -54,41 +54,42 @@ class BodyAccessToken(BaseModel):
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/login")
secret_box = SecretBox(bytes.fromhex(config["secret"]))
secret_box = SecretBox(
bytes.fromhex(str(config.get("secret", random(SecretBox.KEY_SIZE).hex())))
)
def to_token(**kwargs):
def to_token(**kwargs) -> str:
return secret_box.encrypt(dumps(kwargs), encoder=URLSafeBase64Encoder).decode(
"ascii"
)
def from_token(token):
def from_token(token: str) -> dict:
return loads(
secret_box.decrypt(token.encode("ascii"), encoder=URLSafeBase64Encoder)
)
async def get_auth(email, password):
async def get_auth(email: str, password: str) -> Admin:
try:
admin = await Admin.objects.get(email=email)
admin = await Admin.get(email=email)
if argon2.verify(password, admin.passhash):
return admin
raise ValueError
except NoMatch:
except DoesNotExist:
raise ValueError
async def get_admin(access_token=Depends(oauth2_scheme)):
async def get_admin(access_token: str = Depends(oauth2_scheme)) -> Admin:
try:
admin = await get_auth(**from_token(access_token))
return await get_auth(**from_token(access_token))
except (CryptoError, ValueError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return admin
router = APIRouter()
@ -109,9 +110,9 @@ async def admin_register(values: BodyAdmin):
register_token = to_token(**values.__dict__)
logger.debug("register_token={0}".format(register_token))
try:
admin = await Admin.objects.filter(email=values.email).all()
if admin:
if await Admin.exists(email=values.email):
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# link goes to frontend. this is not the confirm API endpoint below!
body = "{0}/confirm?token={1}".format(config["frontend_url"], register_token)
logger.debug(body)
email.send_email(
@ -138,7 +139,8 @@ async def admin_confirm(register_token: str):
try:
values = from_token(register_token)
passhash = argon2.hash(values["password"])
await Admin.objects.create(email=values["email"], passhash=passhash)
await Admin.create(email=values["email"], passhash=passhash)
# XXX login and registration tokens are exchangeable. does this hurt?
return BodyAccessToken(access_token=register_token)
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -178,14 +180,14 @@ async def admin_reset_password(values: BodyEmail):
- **email**: E-Mail Address of new hood admin
- **password**: Password of new hood admin
"""
register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__)
logger.debug("register_token={0}".format(register_token))
reset_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__)
logger.debug("reset_token={0}".format(reset_token))
try:
admin = await Admin.objects.filter(email=values.email).all()
if not admin:
if await Admin.exists(email=values.email):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# link goes to frontend. this is not the reset API endpoint below!
body = "{0}/password-reset?token={1}".format(
config["frontend_url"], register_token
config["frontend_url"], reset_token
)
logger.debug(body)
email.send_email(
@ -213,11 +215,10 @@ async def admin_confirm_reset(reset_token: str, values: BodyPassword):
):
raise HTTPException(status_code=status.HTTP_410_GONE)
passhash = argon2.hash(values.password)
admins = await Admin.objects.filter(email=token_values["email"]).all()
if len(admins) != 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
await admins[0].update(passhash=passhash)
await Admin.filter(email=token_values["email"]).update(passhash=passhash)
return BodyAccessToken(access_token=reset_token)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except CryptoError:
@ -229,11 +230,9 @@ async def admin_confirm_reset(reset_token: str, values: BodyPassword):
# TODO response_model,
operation_id="get_hoods_admin",
)
async def admin_hood_read_all(admin=Depends(get_admin)):
async def admin_hood_read_all(admin: Admin = Depends(get_admin)):
"""Get a list of all hoods of a given admin."""
return (
await AdminHoodRelation.objects.select_related("hood").filter(admin=admin).all()
)
return await Hood.filter(admin=admin)
@router.get(
@ -241,12 +240,8 @@ async def admin_hood_read_all(admin=Depends(get_admin)):
# TODO response_model,
operation_id="get_admin",
)
async def admin_read(admin=Depends(get_admin)):
"""Get a list of all hoods of a given admin."""
admin = await Admin.objects.filter(email=admin.email).all()
if len(admin) != 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return BodyEmail(email=admin[0].email)
async def admin_read(admin: Admin = Depends(get_admin)):
return BodyEmail(email=admin.email)
@router.delete(
@ -254,18 +249,10 @@ async def admin_read(admin=Depends(get_admin)):
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_admin",
)
async def admin_delete(admin=Depends(get_admin)):
hood_relations = (
await AdminHoodRelation.objects.select_related("hood").filter(admin=admin).all()
)
for hood in hood_relations:
admins = (
await AdminHoodRelation.objects.select_related("admin")
.filter(hood=hood.id)
.all()
)
if len(admins) == 1 and admins[0].id == admin.id:
actual_hood = await Hood.objects.filter(id=hood.id).all()
await delete_hood(actual_hood[0])
async def admin_delete(admin: Admin = Depends(get_admin)):
async for hood in Hood.filter(admins__contains=admin):
await hood.admins.remove(admin)
await hood.fetch_related("admins")
if len(hood.admins) == 0:
await delete_hood(hood)
await admin.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -1,4 +1,4 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
@ -6,13 +6,11 @@
"""REST API Endpoints for managing hoods."""
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import AdminHoodRelation, Hood, Trigger
from kibicara.model import Admin, Hood, IncludePattern
from kibicara.platforms.email.bot import spawner
from kibicara.webapi.admin import get_admin
from kibicara.webapi.utils import delete_hood
@ -20,28 +18,24 @@ from kibicara.webapi.utils import delete_hood
class BodyHood(BaseModel):
name: str
landingpage: str = """
Default Landing Page
"""
landingpage: str = "Default Landing Page"
async def get_hood_unauthorized(hood_id: int):
async def get_hood_unauthorized(hood_id: int) -> Hood:
try:
hood = await Hood.objects.get(id=hood_id)
except NoMatch:
return await Hood.get(id=hood_id)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return hood
async def get_hood(hood=Depends(get_hood_unauthorized), admin=Depends(get_admin)):
async def get_hood(hood_id: int, admin: Admin = Depends(get_admin)) -> Hood:
try:
await AdminHoodRelation.objects.get(admin=admin, hood=hood)
except NoMatch:
return await Hood.get(id=hood_id, admins__in=[admin])
except DoesNotExist:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Bearer"},
)
return hood
router = APIRouter()
@ -55,7 +49,7 @@ router = APIRouter()
)
async def hood_read_all():
"""Get all existing hoods."""
return await Hood.objects.all()
return await Hood.all()
@router.post(
@ -65,19 +59,21 @@ async def hood_read_all():
operation_id="create_hood",
tags=["hoods"],
)
async def hood_create(values: BodyHood, response: Response, admin=Depends(get_admin)):
async def hood_create(
values: BodyHood, response: Response, admin: Admin = Depends(get_admin)
):
"""Creates a hood.
- **name**: Name of the hood
- **landingpage**: Markdown formatted description of the hood
"""
try:
hood = await Hood.objects.create(**values.__dict__)
await AdminHoodRelation.objects.create(admin=admin.id, hood=hood.id)
hood = await Hood.create(**values.__dict__)
await admin.hoods.add(hood)
spawner.start(hood)
# Initialize Triggers to match all
await Trigger.objects.create(hood=hood, pattern=".")
await IncludePattern.create(hood=hood, pattern=".")
response.headers["Location"] = str(hood.id)
return hood
@ -91,25 +87,24 @@ async def hood_create(values: BodyHood, response: Response, admin=Depends(get_ad
operation_id="get_hood",
tags=["hoods"],
)
async def hood_read(hood=Depends(get_hood_unauthorized)):
async def hood_read(hood: Hood = Depends(get_hood_unauthorized)):
"""Get hood with id **hood_id**."""
return hood
@router.put(
"/{hood_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="update_hood",
tags=["hoods"],
)
async def hood_update(values: BodyHood, hood=Depends(get_hood)):
async def hood_update(values: BodyHood, hood: Hood = Depends(get_hood)):
"""Updates hood with id **hood_id**.
- **name**: New name of the hood
- **landingpage**: New Markdown formatted description of the hood
"""
await hood.update(**values.__dict__)
return Response(status_code=status.HTTP_204_NO_CONTENT)
await Hood.filter(id=hood).update(**values.__dict__)
return hood
@router.delete(
@ -121,4 +116,3 @@ async def hood_update(values: BodyHood, hood=Depends(get_hood)):
async def hood_delete(hood=Depends(get_hood)):
"""Deletes hood with id **hood_id**."""
await delete_hood(hood)
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -1,106 +0,0 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing badwords.
Provides API endpoints for adding, removing and reading regular expressions that block a
received message to be dropped by a censor. This provides a message filter customizable
by the hood admins.
"""
from re import compile as regex_compile
from re import error as RegexError
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from kibicara.model import BadWord
from kibicara.webapi.hoods import get_hood
class BodyBadWord(BaseModel):
pattern: str
async def get_badword(badword_id: int, hood=Depends(get_hood)):
try:
return await BadWord.objects.get(id=badword_id, hood=hood)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_badwords",
)
async def badword_read_all(hood=Depends(get_hood)):
"""Get all badwords of hood with id **hood_id**."""
return await BadWord.objects.filter(hood=hood).all()
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_badword",
)
async def badword_create(
values: BodyBadWord, response: Response, hood=Depends(get_hood)
):
"""Creates a new badword for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a badword.
"""
try:
regex_compile(values.pattern)
badword = await BadWord.objects.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(badword.id)
return badword
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{badword_id}",
# TODO response_model,
operation_id="get_badword",
)
async def badword_read(badword=Depends(get_badword)):
"""Reads badword with id **badword_id** for hood with id **hood_id**."""
return badword
@router.put(
"/{badword_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="update_badword",
)
async def badword_update(values: BodyBadWord, badword=Depends(get_badword)):
"""Updates badword with id **badword_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a badword
"""
await badword.update(**values.__dict__)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.delete(
"/{badword_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_badword",
)
async def badword_delete(badword=Depends(get_badword)):
"""Deletes badword with id **badword_id** for hood with id **hood_id**."""
await badword.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -0,0 +1,116 @@
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing exclude_patterns.
Provides API endpoints for adding, removing and reading regular expressions that block a
received message to be dropped by a censor. This provides a message filter customizable
by the hood admins.
"""
from re import compile as regex_compile, error as RegexError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import ExcludePattern, Hood
from kibicara.webapi.hoods import get_hood
class BodyExcludePattern(BaseModel):
pattern: str
async def get_exclude_pattern(
exclude_pattern_id: int, hood: Hood = Depends(get_hood)
) -> ExcludePattern:
try:
return await ExcludePattern.get(id=exclude_pattern_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_exclude_patterns",
)
async def exclude_pattern_read_all(hood: Hood = Depends(get_hood)):
"""Get all exclude_patterns of hood with id **hood_id**."""
return await ExcludePattern.filter(hood=hood)
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_exclude_pattern",
)
async def exclude_pattern_create(
values: BodyExcludePattern, response: Response, hood: Hood = Depends(get_hood)
):
"""Creates a new exclude_pattern for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a exclude_pattern.
"""
try:
regex_compile(values.pattern)
exclude_pattern = await ExcludePattern.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(exclude_pattern.id)
return exclude_pattern
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{exclude_pattern_id}",
# TODO response_model,
operation_id="get_exclude_pattern",
)
async def exclude_pattern_read(
exclude_pattern: ExcludePattern = Depends(get_exclude_pattern),
):
"""
Reads exclude_pattern with id **exclude_pattern_id** for hood with id **hood_id**.
"""
return exclude_pattern
@router.put(
"/{exclude_pattern_id}",
operation_id="update_exclude_pattern",
)
async def exclude_pattern_update(
values: BodyExcludePattern,
exclude_pattern: ExcludePattern = Depends(get_exclude_pattern),
):
"""
Updates exclude_pattern with id **exclude_pattern_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a exclude_pattern
"""
await ExcludePattern.filter(id=exclude_pattern).update(**values.__dict__)
return exclude_pattern
@router.delete(
"/{exclude_pattern_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_exclude_pattern",
)
async def exclude_pattern_delete(
exclude_pattern: ExcludePattern = Depends(get_exclude_pattern),
):
"""
Deletes exclude_pattern with id **exclude_pattern_id** for hood with id **hood_id**.
"""
await exclude_pattern.delete()

View file

@ -0,0 +1,115 @@
# Copyright (C) 2020, 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing include_patterns.
Provides API endpoints for adding, removing and reading regular expressions that allow a
message to be passed through by a censor. A published message must match one of these
regular expressions otherwise it gets dropped by the censor. This provides a message
filter customizable by the hood admins.
"""
from re import compile as regex_compile, error as RegexError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from pydantic import BaseModel
from tortoise.exceptions import DoesNotExist, IntegrityError
from kibicara.model import IncludePattern, Hood
from kibicara.webapi.hoods import get_hood
class BodyIncludePattern(BaseModel):
pattern: str
async def get_include_pattern(include_pattern_id: int, hood: Hood = Depends(get_hood)):
try:
return await IncludePattern.get(id=include_pattern_id, hood=hood)
except DoesNotExist:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_include_patterns",
)
async def include_pattern_read_all(hood: Hood = Depends(get_hood)):
"""Get all include_patterns of hood with id **hood_id**."""
return await IncludePattern.filter(hood=hood)
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_include_pattern",
)
async def include_pattern_create(
values: BodyIncludePattern, response: Response, hood: Hood = Depends(get_hood)
):
"""Creates a new include_pattern for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a include_pattern.
"""
try:
regex_compile(values.pattern)
include_pattern = await IncludePattern.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(include_pattern.id)
return include_pattern
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{include_pattern_id}",
# TODO response_model,
operation_id="get_include_pattern",
)
async def include_pattern_read(
include_pattern: IncludePattern = Depends(get_include_pattern),
):
"""
Reads include_pattern with id **include_pattern_id** for hood with id **hood_id**.
"""
return include_pattern
@router.put(
"/{include_pattern_id}",
operation_id="update_include_pattern",
)
async def include_pattern_update(
values: BodyIncludePattern,
include_pattern: IncludePattern = Depends(get_include_pattern),
):
"""
Updates include_pattern with id **include_pattern_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a include_pattern
"""
await IncludePattern.filter(id=include_pattern).update(**values.__dict__)
return include_pattern
@router.delete(
"/{include_pattern_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_include_pattern",
)
async def include_pattern_delete(
include_pattern: IncludePattern = Depends(get_include_pattern),
):
"""
Deletes include_pattern with id **include_pattern_id** for hood with id **hood_id**.
"""
await include_pattern.delete()

View file

@ -1,107 +0,0 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
"""REST API endpoints for managing triggers.
Provides API endpoints for adding, removing and reading regular expressions that allow a
message to be passed through by a censor. A published message must match one of these
regular expressions otherwise it gets dropped by the censor. This provides a message
filter customizable by the hood admins.
"""
from re import compile as regex_compile
from re import error as RegexError
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from kibicara.model import Trigger
from kibicara.webapi.hoods import get_hood
class BodyTrigger(BaseModel):
pattern: str
async def get_trigger(trigger_id: int, hood=Depends(get_hood)):
try:
return await Trigger.objects.get(id=trigger_id, hood=hood)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
@router.get(
"/",
# TODO response_model,
operation_id="get_triggers",
)
async def trigger_read_all(hood=Depends(get_hood)):
"""Get all triggers of hood with id **hood_id**."""
return await Trigger.objects.filter(hood=hood).all()
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
# TODO response_model,
operation_id="create_trigger",
)
async def trigger_create(
values: BodyTrigger, response: Response, hood=Depends(get_hood)
):
"""Creates a new trigger for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a trigger.
"""
try:
regex_compile(values.pattern)
trigger = await Trigger.objects.create(hood=hood, **values.__dict__)
response.headers["Location"] = str(trigger.id)
return trigger
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except RegexError:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@router.get(
"/{trigger_id}",
# TODO response_model,
operation_id="get_trigger",
)
async def trigger_read(trigger=Depends(get_trigger)):
"""Reads trigger with id **trigger_id** for hood with id **hood_id**."""
return trigger
@router.put(
"/{trigger_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="update_trigger",
)
async def trigger_update(values: BodyTrigger, trigger=Depends(get_trigger)):
"""Updates trigger with id **trigger_id** for hood with id **hood_id**.
- **pattern**: Regular expression which is used to match a trigger
"""
await trigger.update(**values.__dict__)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.delete(
"/{trigger_id}",
status_code=status.HTTP_204_NO_CONTENT,
operation_id="delete_trigger",
)
async def trigger_delete(trigger=Depends(get_trigger)):
"""Deletes trigger with id **trigger_id** for hood with id **hood_id**."""
await trigger.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)

View file

@ -1,17 +1,14 @@
# Copyright (C) 2023 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
#
# SPDX-License-Identifier: 0BSD
from kibicara.model import AdminHoodRelation, BadWord, Trigger
from kibicara.model import ExcludePattern, Hood, IncludePattern
from kibicara.platformapi import Spawner
async def delete_hood(hood):
async def delete_hood(hood: Hood) -> None:
await Spawner.destroy_hood(hood)
for trigger in await Trigger.objects.filter(hood=hood).all():
await trigger.delete()
for badword in await BadWord.objects.filter(hood=hood).all():
await badword.delete()
for relation in await AdminHoodRelation.objects.filter(hood=hood).all():
await relation.delete()
await IncludePattern.filter(hood=hood).delete()
await ExcludePattern.filter(hood=hood).delete()
await hood.delete()