[mastodon] Some web routes to add a Mastodon Account to Ticketfrei

pull/2/head
missytake 2022-03-01 18:37:01 +01:00
parent 4dc4b9cfca
commit b1f8c08d25
1 changed files with 168 additions and 0 deletions

View File

@ -0,0 +1,168 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from kibicara.config import config
from kibicara.platforms.mastodon.bot import spawner
from kibicara.platforms.mastodon.model import MastodonAccount, MastodonInstance
from kibicara.webapi.hoods import get_hood, get_hood_unauthorized
from mastodon import Mastodon, MastodonError
from logging import getLogger
logger = getLogger(__name__)
class BodyMastodonPublic(BaseModel):
username: str
instance: str
async def get_mastodon(mastodon_id, hood=Depends(get_hood)):
try:
return await MastodonAccount.objects.get(id=mastodon_id, hood=hood)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
async def get_mastodon_instance(instance_url: str) -> MastodonInstance:
"""Return a MastodonInstance ORM object with valid client_id and client_secret.
:param: instance_url: the API base URL of the mastodon server
:return the MastodonInstance ORM object
"""
try:
return await MastodonInstance.objects.get(name=instance_url)
except NoMatch:
app_name = config.get("frontend_url")
client_id, client_secret = Mastodon.create_app(
app_name, api_base_url=instance_url
)
await MastodonInstance.objects.create(
name=instance_url, client_id=client_id, client_secret=client_secret
)
return await MastodonInstance.objects.get(name=instance_url)
router = APIRouter()
twitter_callback_router = APIRouter()
@router.get(
'/public',
# TODO response_model,
operation_id='get_mastodons_public',
)
async def mastodon_read_all_public(hood=Depends(get_hood_unauthorized)):
mastodonbots = await MastodonAccount.objects.filter(hood=hood).all()
return [
BodyMastodonPublic(username=mbot.username, instance=mbot.model.instance.name)
for mbot in mastodonbots
if mbot.enabled == 1 and mbot.username
]
@router.get(
'/',
# TODO response_model,
operation_id='get_mastodons',
)
async def mastodon_read_all(hood=Depends(get_hood)):
return await MastodonAccount.objects.filter(hood=hood).all()
@router.get(
'/{mastodon_id}',
# TODO response_model
operation_id='get_mastodon',
)
async def mastodon_read(mastodon=Depends(get_mastodon)):
return mastodon
@router.delete(
'/{mastodon_id}',
status_code=status.HTTP_204_NO_CONTENT,
# TODO response_model
operation_id='delete_mastodon',
)
async def mastodon_delete(mastodon=Depends(get_mastodon)):
spawner.stop(mastodon)
await mastodon.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.get(
'/{mastodon_id}/status',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='status_mastodon',
)
async def mastodon_status(mastodon=Depends(get_mastodon)):
return {'status': spawner.get(mastodon).status.name}
@router.post(
'/{mastodon_id}/start',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='start_mastodon',
)
async def mastodon_start(mastodon=Depends(get_mastodon)):
await mastodon.update(enabled=True)
spawner.get(mastodon).start()
return {}
@router.post(
'/{mastodon_id}/stop',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='stop_mastodon',
)
async def mastodon_stop(mastodon=Depends(get_mastodon)):
await mastodon.update(enabled=False)
spawner.get(mastodon).stop()
return {}
@router.post(
'/',
status_code=status.HTTP_201_CREATED,
# TODO response_model
operation_id='create_mastodon',
)
async def mastodon_create(instance_url, username, password, hood=Depends(get_hood)):
"""Add a Mastodon Account to a Ticketfrei account.
open questions:
do we really get the username + password like this?
can the instance_url have different ways of writing?
:param: instance_url: the API base URL of the mastodon server
:param: username: the username of the Mastodon account
:param: password: the password of the Mastodon account
:param: hood: the hood ORM object
"""
instance = await get_mastodon_instance(instance_url)
account = Mastodon(
instance.client_id, instance.client_secret, api_base_url=instance_url
)
try:
access_token = account.log_in(username, password)
except MastodonError:
logger.warning("Login to Mastodon failed.", exc_info=True)
return # show error to user
MastodonAccount.objects.create(
hood=hood,
instance=instance,
access_token=access_token,
enabled=True,
last_seen="0",
)