# Copyright (C) 2020 by Thomas Lindner # Copyright (C) 2020 by Cathy Hu # Copyright (C) 2020 by Martin Rey # # SPDX-License-Identifier: 0BSD from kibicara.platformapi import Censor, Spawner, Message from kibicara.platforms.mastodon.model import MastodonAccount from mastodon import Mastodon, MastodonError from asyncio import gather import re from logging import getLogger logger = getLogger(__name__) class MastodonBot(Censor): def __init__(self, mastodon_account_model): super().__init__(mastodon_account_model.hood) self.model = mastodon_account_model self.enabled = self.model.enabled async def run(self): await self.model.instance.load() self.account = Mastodon( client_id=self.model.instance.client_id, client_secret=self.model.instance.client_secret, api_base_url=self.model.instance.name, access_token=self.model.access_token, ) await gather(self.poll(), self.push()) async def poll(self): """Get new mentions and DMs from Mastodon""" while True: try: notifications = self.account.notifications() except MastodonError as e: logger.warning("%s in hood %s" % (e, self.model.hood.name)) continue last_seen = int(self.model.last_seen) for status in notifications: try: status_id = int(status['status']['id']) except KeyError: continue # ignore notifications which don't have a status if status_id <= last_seen: continue # toot was already processed in the past if status_id > int(self.model.last_seen): await self.model.update(last_seen=str(status_id)) text = re.sub(r'<[^>]*>', '', status['status']['content']) text = re.sub( "(?<=^|(?<=[^a-zA-Z0-9-_.]))@([A-Za-z]+[A-Za-z0-9-_]+)", "", text ) logger.debug( "Mastodon in %s received toot #%s: %s" % (self.model.hood.name, status_id, text) ) if status['status']['visibility'] == 'public': await self.publish(Message(text, toot_id=status_id)) else: await self.publish(Message(text)) async def push(self): """Push new Ticketfrei reports to Mastodon; if source is mastodon, boost it.""" while True: message = await self.receive() if hasattr(message, "tood_id"): logger.debug("Boosting post %s: %s" % (message.tood_id, message.text)) self.account.status_reblog(message.tood_id) else: logger.debug("Posting message: %s" % (message.text,)) self.account.status_post(message.text) spawner = Spawner(MastodonAccount, MastodonBot)