[twitter] Deprecate Twitter backend

This commit is contained in:
missytake 2023-03-18 15:45:07 +01:00
parent 39a21fe34a
commit 9802632237
10 changed files with 0 additions and 684 deletions

View file

@ -1,164 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from asyncio import CancelledError, gather, sleep
from logging import getLogger
from peony import PeonyClient, exceptions
from kibicara.config import config
from kibicara.platformapi import Censor, Message, Spawner
from kibicara.platforms.twitter.model import Twitter
logger = getLogger(__name__)
class TwitterBot(Censor):
def __init__(self, twitter_model):
super().__init__(twitter_model.hood)
self.twitter_model = twitter_model
self.enabled = self.twitter_model.enabled
self.polling_interval_sec = 60
self.mentions_since_id = self.twitter_model.mentions_since_id
self.dms_since_id = self.twitter_model.dms_since_id
@classmethod
async def destroy_hood(cls, hood):
"""Removes all its database entries."""
for twitter in await Twitter.objects.filter(hood=hood).all():
await twitter.delete()
async def run(self):
try:
if not self.twitter_model.verified:
raise ValueError('Oauth Handshake not completed')
self.client = PeonyClient(
consumer_key=config['twitter']['consumer_key'],
consumer_secret=config['twitter']['consumer_secret'],
access_token=self.twitter_model.access_token,
access_token_secret=self.twitter_model.access_token_secret,
)
if self.twitter_model.mentions_since_id is None:
logger.debug('since_id is None in model, fetch newest mention id')
await self._poll_mentions()
if self.twitter_model.dms_since_id is None:
logger.debug('since_id is None in model, fetch newest dm id')
await self._poll_direct_messages()
user = await self.client.user
if user.screen_name:
await self.twitter_model.update(username=user.screen_name)
logger.debug(
'Starting Twitter bot: {0}'.format(self.twitter_model.__dict__)
)
await gather(self.poll(), self.push())
except CancelledError:
logger.debug(
'Bot {0} received Cancellation.'.format(self.twitter_model.hood.name)
)
except exceptions.Unauthorized:
logger.debug(
'Bot {0} has invalid auth token.'.format(self.twitter_model.hood.name)
)
await self.twitter_model.update(enabled=False)
self.enabled = self.twitter_model.enabled
except (KeyError, ValueError, exceptions.NotAuthenticated):
logger.warning('Missing consumer_keys for Twitter in your configuration.')
await self.twitter_model.update(enabled=False)
self.enabled = self.twitter_model.enabled
finally:
logger.debug('Bot {0} stopped.'.format(self.twitter_model.hood.name))
async def poll(self):
while True:
dms = await self._poll_direct_messages()
logger.debug(
'Polled dms ({0}): {1}'.format(self.twitter_model.hood.name, str(dms))
)
mentions = await self._poll_mentions()
logger.debug(
'Polled mentions ({0}): {1}'.format(
self.twitter_model.hood.name, str(mentions)
)
)
await self.twitter_model.update(
dms_since_id=self.dms_since_id, mentions_since_id=self.mentions_since_id
)
for message in dms:
await self.publish(Message(message))
for message_id, message in mentions:
await self.publish(Message(message, twitter_mention_id=message_id))
await sleep(self.polling_interval_sec)
async def _poll_direct_messages(self):
dms = await self.client.api.direct_messages.events.list.get()
dms = dms.events
# TODO check for next_cursor (see twitter api)
dms_filtered = []
if dms:
for dm in dms:
if int(dm.id) == self.dms_since_id:
break
dms_filtered.append(dm)
self.dms_since_id = int(dms[0].id)
messages = []
for dm in dms_filtered:
filtered_text = await self._filter_text(
dm.message_create.message_data.entities,
dm.message_create.message_data.text,
)
if not filtered_text:
continue
messages.append(filtered_text)
return messages
async def _poll_mentions(self):
mentions = await self.client.api.statuses.mentions_timeline.get(
since_id=self.mentions_since_id
)
if mentions:
self.mentions_since_id = mentions[0].id
messages = []
for mention in mentions:
filtered_text = await self._filter_text(mention.entities, mention.text)
if not filtered_text:
continue
messages.append((mention.id, filtered_text))
return messages
async def _filter_text(self, entities, text):
remove_indices = set()
for user in entities.user_mentions:
remove_indices.update(range(user.indices[0], user.indices[1] + 1))
for url in entities.urls:
remove_indices.update(range(url.indices[0], url.indices[1] + 1))
for symbol in entities.symbols:
remove_indices.update(range(symbol.indices[0], symbol.indices[1] + 1))
filtered_text = ''
for index, character in enumerate(text):
if index not in remove_indices:
filtered_text += character
return filtered_text.strip()
async def push(self):
while True:
message = await self.receive()
logger.debug(
'Received message from censor ({0}): {1}'.format(
self.twitter_model.hood.name, message.text
)
)
if hasattr(message, 'twitter_mention_id'):
await self._retweet(message.twitter_mention_id)
else:
await self._post_tweet(message.text)
async def _post_tweet(self, message):
return await self.client.api.statuses.update.post(status=message)
async def _retweet(self, message_id):
return await self.client.api.statuses.retweet.post(id=message_id)
spawner = Spawner(Twitter, TwitterBot)

View file

@ -1,23 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from ormantic import Boolean, ForeignKey, Integer, Model, Text
from kibicara.model import Hood, Mapping
class Twitter(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
dms_since_id: Integer(allow_null=True) = None
mentions_since_id: Integer(allow_null=True) = None
access_token: Text()
access_token_secret: Text()
username: Text(allow_null=True) = None
verified: Boolean() = False
enabled: Boolean() = False
class Mapping(Mapping):
table_name = 'twitterbots'

View file

@ -1,183 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from logging import getLogger
from sqlite3 import IntegrityError
from fastapi import APIRouter, Depends, HTTPException, Response, status
from ormantic.exceptions import NoMatch
from peony.exceptions import NotAuthenticated
from peony.oauth_dance import get_access_token, get_oauth_token
from pydantic import BaseModel
from kibicara.config import config
from kibicara.platforms.twitter.bot import spawner
from kibicara.platforms.twitter.model import Twitter
from kibicara.webapi.hoods import get_hood, get_hood_unauthorized
logger = getLogger(__name__)
class BodyTwitterPublic(BaseModel):
username: str
async def get_twitter(twitter_id: int, hood=Depends(get_hood)):
try:
return await Twitter.objects.get(id=twitter_id, hood=hood)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
router = APIRouter()
twitter_callback_router = APIRouter()
@router.get(
'/public',
# TODO response_model,
operation_id='get_twitters_public',
)
async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)):
twitterbots = await Twitter.objects.filter(hood=hood).all()
return [
BodyTwitterPublic(username=twitterbot.username)
for twitterbot in twitterbots
if twitterbot.verified == 1 and twitterbot.enabled == 1 and twitterbot.username
]
@router.get(
'/',
# TODO response_model,
operation_id='get_twitters',
)
async def twitter_read_all(hood=Depends(get_hood)):
return await Twitter.objects.filter(hood=hood).all()
@router.get(
'/{twitter_id}',
# TODO response_model
operation_id='get_twitter',
)
async def twitter_read(twitter=Depends(get_twitter)):
return twitter
@router.delete(
'/{twitter_id}',
status_code=status.HTTP_204_NO_CONTENT,
# TODO response_model
operation_id='delete_twitter',
)
async def twitter_delete(twitter=Depends(get_twitter)):
spawner.stop(twitter)
await twitter.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.get(
'/{twitter_id}/status',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='status_twitter',
)
async def twitter_status(twitter=Depends(get_twitter)):
return {'status': spawner.get(twitter).status.name}
@router.post(
'/{twitter_id}/start',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='start_twitter',
)
async def twitter_start(twitter=Depends(get_twitter)):
await twitter.update(enabled=True)
spawner.get(twitter).start()
return {}
@router.post(
'/{twitter_id}/stop',
status_code=status.HTTP_200_OK,
# TODO response_model
operation_id='stop_twitter',
)
async def twitter_stop(twitter=Depends(get_twitter)):
await twitter.update(enabled=False)
spawner.get(twitter).stop()
return {}
@router.post(
'/',
status_code=status.HTTP_201_CREATED,
# TODO response_model
operation_id='create_twitter',
)
async def twitter_create(response: Response, hood=Depends(get_hood)):
"""
`https://api.twitter.com/oauth/authorize?oauth_token=`
"""
try:
# Purge Twitter corpses
for corpse in await Twitter.objects.filter(hood=hood, verified=False).all():
await corpse.delete()
# Create Twitter
request_token = await get_oauth_token(
config['twitter']['consumer_key'],
config['twitter']['consumer_secret'],
callback_uri='{0}/dashboard/twitter-callback?hood={1}'.format(
config['frontend_url'], hood.id
),
)
if request_token['oauth_callback_confirmed'] != 'true':
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
twitter = await Twitter.objects.create(
hood=hood,
access_token=request_token['oauth_token'],
access_token_secret=request_token['oauth_token_secret'],
)
response.headers['Location'] = str(twitter.id)
return twitter
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except (KeyError, ValueError, NotAuthenticated):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
@twitter_callback_router.get(
'/callback',
# TODO response_model
operation_id='callback_twitter',
)
async def twitter_read_callback(
oauth_token: str, oauth_verifier: str, hood=Depends(get_hood)
):
try:
twitter = await Twitter.objects.filter(access_token=oauth_token).get()
access_token = await get_access_token(
config['twitter']['consumer_key'],
config['twitter']['consumer_secret'],
twitter.access_token,
twitter.access_token_secret,
oauth_verifier,
)
await twitter.update(
access_token=access_token['oauth_token'],
access_token_secret=access_token['oauth_token_secret'],
verified=True,
enabled=True,
)
spawner.start(twitter)
return {}
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
except NoMatch:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
except (KeyError, ValueError, NotAuthenticated):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)

View file

@ -15,8 +15,6 @@ from fastapi import APIRouter
from kibicara.platforms.email.webapi import router as email_router
from kibicara.platforms.telegram.webapi import router as telegram_router
from kibicara.platforms.test.webapi import router as test_router
from kibicara.platforms.twitter.webapi import router as twitter_router
from kibicara.platforms.twitter.webapi import twitter_callback_router
from kibicara.webapi.admin import router as admin_router
from kibicara.webapi.hoods import router as hoods_router
from kibicara.webapi.hoods.badwords import router as badwords_router
@ -34,9 +32,5 @@ hoods_router.include_router(test_router, prefix='/{hood_id}/test', tags=['test']
hoods_router.include_router(
telegram_router, prefix='/{hood_id}/telegram', tags=['telegram']
)
hoods_router.include_router(
twitter_router, prefix='/{hood_id}/twitter', tags=['twitter']
)
router.include_router(twitter_callback_router, prefix='/twitter', tags=['twitter'])
hoods_router.include_router(email_router, prefix='/{hood_id}/email', tags=['email'])
router.include_router(hoods_router, prefix='/hoods')

View file

@ -1,21 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from pytest import fixture
from kibicara.model import Hood
from kibicara.platforms.twitter.model import Twitter
@fixture(scope='function')
def twitter(event_loop, hood_id):
hood = event_loop.run_until_complete(Hood.objects.get(id=hood_id))
return event_loop.run_until_complete(
Twitter.objects.create(
hood=hood,
access_token='access_token123',
access_token_secret='access_token_secret123',
)
)

View file

@ -1,159 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from fastapi import status
from pytest import fixture, mark
from kibicara import config
from kibicara.platforms import twitter
from kibicara.platforms.twitter.model import Twitter
@fixture(scope='function')
def receive_oauth_request_token(monkeypatch, twitter_request_response):
@mark.asyncio
async def mock_get_oauth_request_token(
consumer_key, consumer_secret, callback_uri=''
):
return twitter_request_response
monkeypatch.setattr(twitter.webapi, 'get_oauth_token', mock_get_oauth_request_token)
@fixture(scope='function')
def receive_oauth_access_token(monkeypatch, twitter_access_response):
@mark.asyncio
async def mock_get_oauth_access_token(
consumer_key, consumer_secret, access_token, access_token_secret, oauth_verifier
):
return twitter_access_response
monkeypatch.setattr(twitter.webapi, 'get_access_token', mock_get_oauth_access_token)
@fixture(scope='function')
def disable_spawner(monkeypatch):
class DoNothing:
def start(self, bot):
assert bot is not None
monkeypatch.setattr(twitter.webapi, 'spawner', DoNothing())
@mark.parametrize(
'twitter_request_response, twitter_access_response',
[
(
{
'oauth_callback_confirmed': 'true',
'oauth_token': 'oauth_request_token123',
'oauth_token_secret': 'oauth_request_secret123',
},
{
'oauth_token': 'oauth_access_token123',
'oauth_token_secret': 'oauth_access_secret123',
},
)
],
)
def test_twitter_create_bot(
client,
event_loop,
monkeypatch,
auth_header,
hood_id,
receive_oauth_request_token,
receive_oauth_access_token,
disable_spawner,
twitter_request_response,
twitter_access_response,
):
monkeypatch.setitem(
config.config,
'twitter',
{'consumer_key': 'consumer_key123', 'consumer_secret': 'consumer_secret123'},
)
# Twitter create endpoint
response = client.post(
'/api/hoods/{0}/twitter/'.format(hood_id), headers=auth_header
)
assert response.status_code == status.HTTP_201_CREATED
bot_id = response.json()['id']
twitter = event_loop.run_until_complete(Twitter.objects.get(id=bot_id))
assert (
response.json()['access_token']
== twitter_request_response['oauth_token']
== twitter.access_token
)
assert (
response.json()['access_token_secret']
== twitter_request_response['oauth_token_secret']
== twitter.access_token_secret
)
assert not twitter.verified
assert response.json()['verified'] == twitter.verified
assert not twitter.enabled
assert response.json()['enabled'] == twitter.enabled
assert response.json()['hood']['id'] == hood_id
# Twitter callback endpoint should enable bot
response = client.get(
'/api/twitter/callback',
headers=auth_header,
params={
'hood_id': hood_id,
'oauth_token': twitter_request_response['oauth_token'],
'oauth_verifier': 'oauth_verifier123',
},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {}
twitter = event_loop.run_until_complete(Twitter.objects.get(id=bot_id))
assert twitter_access_response['oauth_token'] == twitter.access_token
assert twitter_access_response['oauth_token_secret'] == twitter.access_token_secret
assert twitter.verified
assert twitter.enabled
def test_twitter_callback_invalid_oauth_token(client, auth_header):
response = client.get(
'/api/twitter/callback',
headers=auth_header,
params={'hood_id': '1', 'oauth_token': 'abc', 'oauth_verifier': 'def'},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_twitter_create_twitter_invalid_id(client, auth_header):
response = client.post('/api/hoods/1337/twitter/', headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.post('/api/hoods/wrong/twitter/', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_twitter_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/{hood_id}/twitter/')
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_twitter_create_wrong_consumer_keys(client, monkeypatch, auth_header, hood_id):
# No consumer keys
response = client.post(
'/api/hoods/{0}/twitter/'.format(hood_id), headers=auth_header
)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
# Invalid consumer keys
monkeypatch.setitem(
config.config,
'twitter',
{'consumer_key': 'consumer_key123', 'consumer_secret': 'consumer_secret123'},
)
response = client.post(
'/api/hoods/{0}/twitter/'.format(hood_id), headers=auth_header
)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR

View file

@ -1,42 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from fastapi import status
from ormantic.exceptions import NoMatch
from pytest import raises
from kibicara.platforms.twitter.model import Twitter
def test_twitter_delete_bot(client, event_loop, twitter, auth_header):
response = client.delete(
'/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id),
headers=auth_header,
)
assert response.status_code == status.HTTP_204_NO_CONTENT
with raises(NoMatch):
event_loop.run_until_complete(Twitter.objects.get(id=twitter.id))
def test_twitter_delete_bot_invalid_id(client, auth_header, hood_id):
response = client.delete('/api/hoods/1337/twitter/123', headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete('/api/hoods/wrong/twitter/123', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.delete(
'/api/hoods/{0}/twitter/7331'.format(hood_id), headers=auth_header
)
assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete(
'/api/hoods/{0}/twitter/wrong'.format(hood_id), headers=auth_header
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_twitter_delete_bot_unauthorized(client, twitter):
response = client.delete(
'/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id)
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,39 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from fastapi import status
def test_twitter_get_bot(client, auth_header, event_loop, twitter):
response = client.get(
'/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id),
headers=auth_header,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()['id'] == twitter.id
assert response.json()['access_token'] == twitter.access_token
assert response.json()['access_token_secret'] == twitter.access_token_secret
def test_twitter_get_bot_invalid_id(client, auth_header, hood_id):
response = client.get('/api/hoods/1337/twitter/123', headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get('/api/hoods/wrong/twitter/123', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.get(
'/api/hoods/{0}/twitter/7331'.format(hood_id), headers=auth_header
)
assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get(
'/api/hoods/{0}/twitter/wrong'.format(hood_id), headers=auth_header
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_twitter_get_bot_unauthorized(client, twitter):
response = client.get(
'/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id)
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,47 +0,0 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
#
# SPDX-License-Identifier: 0BSD
from fastapi import status
from kibicara.model import Hood
from kibicara.platforms.twitter.model import Twitter
def test_twitter_get_bots(client, auth_header, event_loop, hood_id):
hood = event_loop.run_until_complete(Hood.objects.get(id=hood_id))
twitter0 = event_loop.run_until_complete(
Twitter.objects.create(
hood=hood,
access_token='access_token123',
access_token_secret='access_token_secret123',
)
)
twitter1 = event_loop.run_until_complete(
Twitter.objects.create(
hood=hood,
access_token='access_token456',
access_token_secret='access_token_secret456',
)
)
response = client.get(
'/api/hoods/{0}/twitter'.format(twitter0.hood.id), headers=auth_header
)
assert response.status_code == status.HTTP_200_OK
assert response.json()[0]['id'] == twitter0.id
assert response.json()[0]['access_token'] == twitter0.access_token
assert response.json()[1]['id'] == twitter1.id
assert response.json()[1]['access_token'] == twitter1.access_token
def test_twitter_get_bots_invalid_id(client, auth_header, hood_id):
response = client.get('/api/hoods/1337/twitter', headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get('/api/hoods/wrong/twitter', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_twitter_get_bots_unauthorized(client, hood_id):
response = client.get('/api/hoods/{0}/twitter'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED