[misc] Use format() instead of %s, f-strings, concat

This commit is contained in:
Martin Rey 2020-09-29 00:39:32 +02:00 committed by Maike
parent e72816d4c4
commit 2db213af69
29 changed files with 162 additions and 129 deletions

View file

@ -33,9 +33,9 @@ def send_email(to, subject, sender='kibicara', body=''):
body (str): The body of the e-mail body (str): The body of the e-mail
""" """
msg = MIMEMultipart() msg = MIMEMultipart()
msg['From'] = 'Kibicara <%s@%s>' % (sender, getfqdn()) msg['From'] = 'Kibicara <{0}@{1}>'.format(sender, getfqdn())
msg['To'] = to msg['To'] = to
msg['Subject'] = '[Kibicara] %s' % subject msg['Subject'] = '[Kibicara] {0}'.format(subject)
msg.attach(MIMEText(body)) msg.attach(MIMEText(body))
with SMTP('localhost') as smtp: with SMTP('localhost') as smtp:

View file

@ -100,7 +100,7 @@ class Censor:
async def __run(self): async def __run(self):
await self.hood.load() await self.hood.load()
self.__task.set_name('%s %s' % (self.__class__.__name__, self.hood.name)) self.__task.set_name('{0} {1}'.format(self.__class__.__name__, self.hood.name))
try: try:
self.status = BotStatus.RUNNING self.status = BotStatus.RUNNING
await self.run() await self.run()
@ -148,13 +148,16 @@ class Censor:
async def __is_appropriate(self, message): async def __is_appropriate(self, message):
for badword in await BadWord.objects.filter(hood=self.hood).all(): for badword in await BadWord.objects.filter(hood=self.hood).all():
if search(badword.pattern, message.text, IGNORECASE): if search(badword.pattern, message.text, IGNORECASE):
logger.debug('Matched bad word - dropped message: %s' % message.text) logger.debug('Matched bad word - dropped message: {0}'.format(
message.text))
return False return False
for trigger in await Trigger.objects.filter(hood=self.hood).all(): for trigger in await Trigger.objects.filter(hood=self.hood).all():
if search(trigger.pattern, message.text, IGNORECASE): if search(trigger.pattern, message.text, IGNORECASE):
logger.debug('Matched trigger - passed message: %s' % message.text) logger.debug('Matched trigger - passed message: {0}'.format(
message.text))
return True return True
logger.debug('Did not match any trigger - dropped message: %s' % message.text) logger.debug('Did not match any trigger - dropped message: {0}'.format(
message.text))
return False return False

View file

@ -1,6 +1,7 @@
# Copyright (C) 2020 by Maike <maike@systemli.org> # Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -35,22 +36,22 @@ class EmailBot(Censor):
while True: while True:
message = await self.receive() message = await self.receive()
logger.debug( logger.debug(
'Received message from censor (%s): %s' % (self.hood.name, message.text) 'Received message from censor ({0}): {1}'.format(self.hood.name, message.text)
) )
for subscriber in await EmailSubscribers.objects.filter( for subscriber in await EmailSubscribers.objects.filter(
hood=self.hood hood=self.hood
).all(): ).all():
token = to_token(email=subscriber.email, hood=self.hood.id) token = to_token(email=subscriber.email, hood=self.hood.id)
body = ( body = (
'%s\n\n--\n' '{0}\n\n--\n'
'If you want to stop receiving these mails,' 'If you want to stop receiving these mails,'
'follow this link: %s/hoods/%d/email-unsubscribe?token=%s' 'follow this link: {1}/hoods/{2}/email-unsubscribe?token={3}'
) % (message.text, config['frontend_url'], self.hood.id, token) ).format(message.text, config['frontend_url'], self.hood.id, token)
try: try:
logger.debug('Trying to send: \n%s' % body) logger.debug('Trying to send: \n{0}'.format(body))
email.send_email( email.send_email(
subscriber.email, subscriber.email,
"Kibicara " + self.hood.name, "Kibicara {0}".format(self.hood.name),
body=body, body=body,
) )
except (ConnectionRefusedError, SMTPException): except (ConnectionRefusedError, SMTPException):

View file

@ -1,6 +1,7 @@
# Copyright (C) 2020 by Maike <maike@systemli.org> # Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -64,19 +65,19 @@ class Main:
) )
response = post( response = post(
'%s/api/hoods/%d/email/messages/' % (config['root_url'], email.hood.pk), '{0}/api/hoods/{1}/email/messages/'.format(config['root_url'], email.hood.pk),
json={'text': text, 'secret': email.secret}, json={'text': text, 'secret': email.secret},
) )
if response.status_code == status.HTTP_201_CREATED: if response.status_code == status.HTTP_201_CREATED:
exit(0) exit(0)
elif response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS: elif response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS:
logger.error('Message was\'t accepted: %s' % text) logger.error('Message was\'t accepted: {0}'.format(text))
elif response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY: elif response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY:
logger.error('Malformed request: %s' % response.json()) logger.error('Malformed request: {0}'.format(response.json()))
elif response.status_code == status.HTTP_401_UNAUTHORIZED: elif response.status_code == status.HTTP_401_UNAUTHORIZED:
logger.error('Wrong API secret. kibicara_mda seems to be misconfigured') logger.error('Wrong API secret. kibicara_mda seems to be misconfigured')
else: else:
logger.error( logger.error(
'REST-API failed with response status %d' % response.status_code 'REST-API failed with response status {0}'.format(response.status_code)
) )
exit(1) exit(1)

View file

@ -1,6 +1,7 @@
# Copyright (C) 2020 by Maike <maike@systemli.org> # Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -112,7 +113,7 @@ async def email_create(values: BodyEmail, response: Response, hood=Depends(get_h
email = await Email.objects.create( email = await Email.objects.create(
hood=hood, secret=urandom(32).hex(), **values.__dict__ hood=hood, secret=urandom(32).hex(), **values.__dict__
) )
response.headers['Location'] = '%d' % hood.id response.headers['Location'] = str(hood.id)
return email return email
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -190,7 +191,7 @@ async def email_subscribe(
:return: Returns status code 200 after sending confirmation email. :return: Returns status code 200 after sending confirmation email.
""" """
token = to_token(hood=hood.id, email=subscriber.email) token = to_token(hood=hood.id, email=subscriber.email)
confirm_link = '%s/hoods/%d/email-confirm?token=%s' % ( confirm_link = '{0}/hoods/{1}/email-confirm?token={2}'.format(
config['frontend_url'], config['frontend_url'],
hood.id, hood.id,
token, token,
@ -201,8 +202,8 @@ async def email_subscribe(
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
email.send_email( email.send_email(
subscriber.email, subscriber.email,
"Subscribe to Kibicara " + hood.name, "Subscribe to Kibicara {0}".format(hood.name),
body='To confirm your subscription, follow this link: ' + confirm_link, body='To confirm your subscription, follow this link: {0}'.format(confirm_link),
) )
return {} return {}
except ConnectionRefusedError: except ConnectionRefusedError:
@ -251,7 +252,7 @@ async def email_unsubscribe(token, hood=Depends(get_hood_unauthorized)):
:param hood: Hood the Email bot belongs to. :param hood: Hood the Email bot belongs to.
""" """
try: try:
logger.warning("token is: " + token) logger.warning("token is: {0}".format(token))
payload = from_token(token) payload = from_token(token)
# If token.hood and url.hood are different, raise an error: # If token.hood and url.hood are different, raise an error:
if hood.id is not payload['hood']: if hood.id is not payload['hood']:
@ -304,10 +305,10 @@ async def email_message_create(
if message.secret == receiver.secret: if message.secret == receiver.secret:
# pass message.text to bot.py # pass message.text to bot.py
if await spawner.get(hood).publish(Message(message.text)): if await spawner.get(hood).publish(Message(message.text)):
logger.warning("Message was accepted: " + message.text) logger.warning("Message was accepted: {0}".format(message.text))
return {} return {}
else: else:
logger.warning("Message was't accepted: " + message.text) logger.warning("Message was't accepted: {0}".format(message.text))
raise HTTPException( raise HTTPException(
status_code=status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS status_code=status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS
) )

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -40,27 +41,29 @@ class TelegramBot(Censor):
try: try:
self.bot = Bot(token=self.telegram_model.api_token) self.bot = Bot(token=self.telegram_model.api_token)
self.dp = self._create_dispatcher() self.dp = self._create_dispatcher()
logger.debug(f'Bot {self.telegram_model.hood.name} starting.') logger.debug('Bot {0} starting.'.format(self.telegram_model.hood.name))
user = await self.bot.get_me() user = await self.bot.get_me()
if user.username: if user.username:
await self.telegram_model.update(username=user.username) await self.telegram_model.update(username=user.username)
await gather(self.dp.start_polling(), self._push()) await gather(self.dp.start_polling(), self._push())
except CancelledError: except CancelledError:
logger.debug(f'Bot {self.telegram_model.hood.name} received Cancellation.') logger.debug('Bot {0} received Cancellation.'.format(
self.telegram_model.hood.name))
self.dp = None self.dp = None
raise raise
except exceptions.ValidationError: except exceptions.ValidationError:
logger.debug(f'Bot {self.telegram_model.hood.name} has invalid auth token.') logger.debug('Bot {0} has invalid auth token.'.format(
self.telegram_model.hood.name))
await self.telegram_model.update(enabled=False) await self.telegram_model.update(enabled=False)
finally: finally:
logger.debug(f'Bot {self.telegram_model.hood.name} stopped.') logger.debug('Bot {0} stopped.'.format(self.telegram_model.hood.name))
async def _push(self): async def _push(self):
while True: while True:
message = await self.receive() message = await self.receive()
logger.debug( logger.debug(
'Received message from censor (%s): %s' 'Received message from censor ({0}): {1}'.format(
% (self.telegram_model.hood.name, message.text) self.telegram_model.hood.name, message.text)
) )
for user in await TelegramUser.objects.filter( for user in await TelegramUser.objects.filter(
bot=self.telegram_model bot=self.telegram_model
@ -72,29 +75,31 @@ class TelegramBot(Censor):
await self.bot.send_message(user_id, message, disable_notification=False) await self.bot.send_message(user_id, message, disable_notification=False)
except exceptions.BotBlocked: except exceptions.BotBlocked:
logger.error( logger.error(
'Target [ID:%s] (%s): blocked by user' 'Target [ID:{0}] ({1}): blocked by user'.format(user_id,
% (user_id, self.telegram_model.hood.name) self.telegram_model.hood.name)
) )
except exceptions.ChatNotFound: except exceptions.ChatNotFound:
logger.error( logger.error(
'Target [ID:%s] (%s): invalid user ID' 'Target [ID:{0}] ({1}): invalid user ID'.format(user_id,
% (user_id, self.telegram_model.hood.name) self.telegram_model.hood.name)
) )
except exceptions.RetryAfter as e: except exceptions.RetryAfter as e:
logger.error( logger.error(
'Target [ID:%s] (%s): Flood limit is exceeded. Sleep %d seconds.' 'Target [ID:{0}] ({1}): Flood limit is exceeded.'.format(
% (user_id, self.telegram_model.hood.name, e.timeout) user_id, self.telegram_model.hood.name
)
+ 'Sleep {0} seconds.'.format(e.timeout)
) )
await sleep(e.timeout) await sleep(e.timeout)
return await self._send_message(user_id, message) return await self._send_message(user_id, message)
except exceptions.UserDeactivated: except exceptions.UserDeactivated:
logger.error( logger.error(
'Target [ID:%s] (%s): user is deactivated' 'Target [ID:{0}] ({1}): user is deactivated'.format(user_id,
% (user_id, self.telegram_model.hood.name) self.telegram_model.hood.name)
) )
except exceptions.TelegramAPIError: except exceptions.TelegramAPIError:
logger.exception( logger.exception(
'Target [ID:%s] (%s): failed' % (user_id, self.telegram_model.hood.name) 'Target [ID:{0}] ({1}): failed'.format(user_id, self.telegram_model.hood.name)
) )
async def _send_welcome(self, message: types.Message): async def _send_welcome(self, message: types.Message):

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -102,7 +103,7 @@ async def telegram_create(
try: try:
telegram = await Telegram.objects.create(hood=hood, **values.__dict__) telegram = await Telegram.objects.create(hood=hood, **values.__dict__)
spawner.start(telegram) spawner.start(telegram)
response.headers['Location'] = '%d' % telegram.id response.headers['Location'] = str(telegram.id)
return telegram return telegram
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)

View file

@ -1,5 +1,6 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -37,7 +38,7 @@ async def test_create(response: Response, hood=Depends(get_hood)):
try: try:
test = await Test.objects.create(hood=hood) test = await Test.objects.create(hood=hood)
spawner.start(test) spawner.start(test)
response.headers['Location'] = '%d' % test.id response.headers['Location'] = str(test.id)
return test return test
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -47,12 +48,14 @@ class TwitterBot(Censor):
user = await self.client.user user = await self.client.user
if user.screen_name: if user.screen_name:
await self.twitter_model.update(username=user.screen_name) await self.twitter_model.update(username=user.screen_name)
logger.debug('Starting Twitter bot: %s' % self.twitter_model.__dict__) logger.debug('Starting Twitter bot: {0}'.format(self.twitter_model.__dict__))
await gather(self.poll(), self.push()) await gather(self.poll(), self.push())
except CancelledError: except CancelledError:
logger.debug(f'Bot {self.twitter_model.hood.name} received Cancellation.') logger.debug('Bot {0} received Cancellation.'.format(
self.twitter_model.hood.name))
except exceptions.Unauthorized: except exceptions.Unauthorized:
logger.debug(f'Bot {self.twitter_model.hood.name} has invalid auth token.') logger.debug('Bot {0} has invalid auth token.'.format(
self.twitter_model.hood.name))
await self.twitter_model.update(enabled=False) await self.twitter_model.update(enabled=False)
self.enabled = self.twitter_model.enabled self.enabled = self.twitter_model.enabled
except (KeyError, ValueError, exceptions.NotAuthenticated): except (KeyError, ValueError, exceptions.NotAuthenticated):
@ -60,18 +63,18 @@ class TwitterBot(Censor):
await self.twitter_model.update(enabled=False) await self.twitter_model.update(enabled=False)
self.enabled = self.twitter_model.enabled self.enabled = self.twitter_model.enabled
finally: finally:
logger.debug(f'Bot {self.twitter_model.hood.name} stopped.') logger.debug('Bot {0} stopped.'.format(self.twitter_model.hood.name))
async def poll(self): async def poll(self):
while True: while True:
dms = await self._poll_direct_messages() dms = await self._poll_direct_messages()
logger.debug( logger.debug(
'Polled dms (%s): %s' % (self.twitter_model.hood.name, str(dms)) 'Polled dms ({0}): {1}'.format(self.twitter_model.hood.name, str(dms))
) )
mentions = await self._poll_mentions() mentions = await self._poll_mentions()
logger.debug( logger.debug(
'Polled mentions (%s): %s' 'Polled mentions ({0}): {1}'.format(
% (self.twitter_model.hood.name, str(mentions)) self.twitter_model.hood.name, str(mentions))
) )
await self.twitter_model.update( await self.twitter_model.update(
dms_since_id=self.dms_since_id, mentions_since_id=self.mentions_since_id dms_since_id=self.dms_since_id, mentions_since_id=self.mentions_since_id
@ -137,8 +140,8 @@ class TwitterBot(Censor):
while True: while True:
message = await self.receive() message = await self.receive()
logger.debug( logger.debug(
'Received message from censor (%s): %s' 'Received message from censor ({0}): {1}'.format(
% (self.twitter_model.hood.name, message.text) self.twitter_model.hood.name, message.text)
) )
if hasattr(message, 'twitter_mention_id'): if hasattr(message, 'twitter_mention_id'):
await self._retweet(message.twitter_mention_id) await self._retweet(message.twitter_mention_id)

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -129,8 +130,9 @@ async def twitter_create(response: Response, hood=Depends(get_hood)):
request_token = await get_oauth_token( request_token = await get_oauth_token(
config['twitter']['consumer_key'], config['twitter']['consumer_key'],
config['twitter']['consumer_secret'], config['twitter']['consumer_secret'],
callback_uri=f"""""" callback_uri="{0}/dashboard/twitter-callback?hood={1}".format(
f"""{config["frontend_url"]}/dashboard/twitter-callback?hood={hood.id}""", config["frontend_url"], hood.id
),
) )
if request_token['oauth_callback_confirmed'] != 'true': if request_token['oauth_callback_confirmed'] != 'true':
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
@ -139,7 +141,7 @@ async def twitter_create(response: Response, hood=Depends(get_hood)):
access_token=request_token['oauth_token'], access_token=request_token['oauth_token'],
access_token_secret=request_token['oauth_token_secret'], access_token_secret=request_token['oauth_token_secret'],
) )
response.headers['Location'] = '%d' % twitter.id response.headers['Location'] = str(twitter.id)
return twitter return twitter
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)

View file

@ -105,12 +105,12 @@ async def admin_register(values: BodyAdmin):
- **password**: Password of new hood admin - **password**: Password of new hood admin
""" """
register_token = to_token(**values.__dict__) register_token = to_token(**values.__dict__)
logger.debug(f'register_token={register_token}') logger.debug('register_token={0}'.format(register_token))
try: try:
admin = await Admin.objects.filter(email=values.email).all() admin = await Admin.objects.filter(email=values.email).all()
if admin: if admin:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
body = f'{config["frontend_url"]}/confirm?token={register_token}' body = '{0}/confirm?token={1}'.format(config["frontend_url"], register_token)
logger.debug(body) logger.debug(body)
email.send_email( email.send_email(
to=values.email, to=values.email,
@ -177,12 +177,12 @@ async def admin_reset_password(values: BodyEmail):
- **password**: Password of new hood admin - **password**: Password of new hood admin
""" """
register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__) register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__)
logger.debug(f'register_token={register_token}') logger.debug('register_token={0}'.format(register_token))
try: try:
admin = await Admin.objects.filter(email=values.email).all() admin = await Admin.objects.filter(email=values.email).all()
if not admin: if not admin:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
body = f'{config["frontend_url"]}/password-reset?token={register_token}' body = '{0}/password-reset?token={1}'.format(config["frontend_url"], register_token)
logger.debug(body) logger.debug(body)
email.send_email( email.send_email(
to=values.email, to=values.email,

View file

@ -76,7 +76,7 @@ async def hood_create(values: BodyHood, response: Response, admin=Depends(get_ad
# Initialize Triggers to match all # Initialize Triggers to match all
await Trigger.objects.create(hood=hood, pattern='.') await Trigger.objects.create(hood=hood, pattern='.')
response.headers['Location'] = '%d' % hood.id response.headers['Location'] = str(hood.id)
return hood return hood
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)

View file

@ -59,7 +59,7 @@ async def badword_create(
try: try:
regex_compile(values.pattern) regex_compile(values.pattern)
badword = await BadWord.objects.create(hood=hood, **values.__dict__) badword = await BadWord.objects.create(hood=hood, **values.__dict__)
response.headers['Location'] = '%d' % badword.id response.headers['Location'] = str(badword.id)
return badword return badword
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)

View file

@ -60,7 +60,7 @@ async def trigger_create(
try: try:
regex_compile(values.pattern) regex_compile(values.pattern)
trigger = await Trigger.objects.create(hood=hood, **values.__dict__) trigger = await Trigger.objects.create(hood=hood, **values.__dict__)
response.headers['Location'] = '%d' % trigger.id response.headers['Location'] = str(trigger.id)
return trigger return trigger
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)

View file

@ -1,5 +1,6 @@
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Christian Hagenest <c.hagenest@pm.me> # Copyright (C) 2020 by Christian Hagenest <c.hagenest@pm.me>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -55,7 +56,7 @@ def register_token(client, receive_email):
@fixture(scope='module') @fixture(scope='module')
def register_confirmed(client, register_token): def register_confirmed(client, register_token):
response = client.post('/api/admin/confirm/%s' % register_token) response = client.post('/api/admin/confirm/{0}'.format(register_token))
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
@ -70,7 +71,7 @@ def access_token(client, register_confirmed):
@fixture(scope='module') @fixture(scope='module')
def auth_header(access_token): def auth_header(access_token):
return {'Authorization': 'Bearer %s' % access_token} return {'Authorization': 'Bearer {0}'.format(access_token)}
@fixture(scope='function') @fixture(scope='function')
@ -79,13 +80,13 @@ def hood_id(client, auth_header):
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
hood_id = int(response.headers['Location']) hood_id = int(response.headers['Location'])
yield hood_id yield hood_id
client.delete('/api/hoods/%d' % hood_id, headers=auth_header) client.delete('/api/hoods/{0}'.format(hood_id), headers=auth_header)
@fixture(scope='function') @fixture(scope='function')
def trigger_id(client, hood_id, auth_header): def trigger_id(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/%d/triggers/' % hood_id, '/api/hoods/{0}/triggers/'.format(hood_id),
json={'pattern': 'test'}, json={'pattern': 'test'},
headers=auth_header, headers=auth_header,
) )
@ -93,29 +94,29 @@ def trigger_id(client, hood_id, auth_header):
trigger_id = int(response.headers['Location']) trigger_id = int(response.headers['Location'])
yield trigger_id yield trigger_id
client.delete( client.delete(
'/api/hoods/%d/triggers/%d' % (hood_id, trigger_id), headers=auth_header '/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id), headers=auth_header
) )
@fixture(scope='function') @fixture(scope='function')
def badword_id(client, hood_id, auth_header): def badword_id(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/%d/badwords/' % hood_id, json={'pattern': ''}, headers=auth_header '/api/hoods/{0}/badwords/'.format(hood_id), json={'pattern': ''}, headers=auth_header
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
badword_id = int(response.headers['Location']) badword_id = int(response.headers['Location'])
yield badword_id yield badword_id
client.delete( client.delete(
'/api/hoods/%d/badwords/%d' % (hood_id, badword_id), headers=auth_header '/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id), headers=auth_header
) )
@fixture(scope='function') @fixture(scope='function')
def test_id(client, hood_id, auth_header): def test_id(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/%d/test/' % hood_id, json={}, headers=auth_header '/api/hoods/{0}/test/'.format(hood_id), json={}, headers=auth_header
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
test_id = int(response.headers['Location']) test_id = int(response.headers['Location'])
yield test_id yield test_id
client.delete('/api/hoods/%d/test/%d' % (hood_id, test_id), headers=auth_header) client.delete('/api/hoods/{0}/test/{1}'.format(hood_id, test_id), headers=auth_header)

View file

@ -1,5 +1,6 @@
# Copyright (C) 2020 by Christian Hagenest <c.hagenest@pm.me> # Copyright (C) 2020 by Christian Hagenest <c.hagenest@pm.me>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -17,65 +18,65 @@ def test_hood_create_unauthorized(client, hood_id):
def test_hood_read(client, hood_id): def test_hood_read(client, hood_id):
response = client.get('/api/hoods/%d' % hood_id) response = client.get('/api/hoods/{0}'.format(hood_id))
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
def test_hood_update_unauthorized(client, hood_id): def test_hood_update_unauthorized(client, hood_id):
response = client.put('/api/hoods/%d' % hood_id) response = client.put('/api/hoods/{0}'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_hood_delete_unauthorized(client, hood_id): def test_hood_delete_unauthorized(client, hood_id):
response = client.delete('/api/hoods/%d' % hood_id) response = client.delete('/api/hoods/{0}'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_read_all_unauthorized(client, hood_id): def test_trigger_read_all_unauthorized(client, hood_id):
response = client.get('/api/hoods/%d/triggers/' % hood_id) response = client.get('/api/hoods/{0}/triggers/'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_create_unauthorized(client, hood_id): def test_trigger_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/%d/triggers/' % hood_id) response = client.post('/api/hoods/{0}/triggers/'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_read_unauthorized(client, hood_id, trigger_id): def test_trigger_read_unauthorized(client, hood_id, trigger_id):
response = client.get('/api/hoods/%d/triggers/%d' % (hood_id, trigger_id)) response = client.get('/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_update_unauthorized(client, hood_id, trigger_id): def test_trigger_update_unauthorized(client, hood_id, trigger_id):
response = client.put('/api/hoods/%d/triggers/%d' % (hood_id, trigger_id)) response = client.put('/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_delete_unauthorized(client, hood_id, trigger_id): def test_trigger_delete_unauthorized(client, hood_id, trigger_id):
response = client.delete('/api/hoods/%d/triggers/%d' % (hood_id, trigger_id)) response = client.delete('/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_read_all_unauthorized(client, hood_id): def test_badword_read_all_unauthorized(client, hood_id):
response = client.get('/api/hoods/%d/badwords/' % hood_id) response = client.get('/api/hoods/{0}/badwords/'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_create_unauthorized(client, hood_id): def test_badword_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/%d/badwords/' % hood_id) response = client.post('/api/hoods/{0}/badwords/'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_read_unauthorized(client, hood_id, badword_id): def test_badword_read_unauthorized(client, hood_id, badword_id):
response = client.get('/api/hoods/%d/badwords/%d' % (hood_id, badword_id)) response = client.get('/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_update_unauthorized(client, hood_id, badword_id): def test_badword_update_unauthorized(client, hood_id, badword_id):
response = client.put('/api/hoods/%d/badwords/%d' % (hood_id, badword_id)) response = client.put('/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_delete_unauthorized(client, hood_id, badword_id): def test_badword_delete_unauthorized(client, hood_id, badword_id):
response = client.delete('/api/hoods/%d/badwords/%d' % (hood_id, badword_id)) response = client.delete('/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,5 +1,6 @@
# Copyright (C) 2020 by Christian <c.hagenest@pm.me> # Copyright (C) 2020 by Christian <c.hagenest@pm.me>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -7,30 +8,30 @@ from fastapi import status
def test_test_read_all_unauthorized(client, hood_id): def test_test_read_all_unauthorized(client, hood_id):
response = client.get('/api/hoods/%d/test/' % hood_id) response = client.get('/api/hoods/{0}/test/'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_test_create_unauthorized(client, hood_id): def test_test_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/%d/test/' % hood_id) response = client.post('/api/hoods/{0}/test/'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_test_read_unauthorized(client, hood_id, test_id): def test_test_read_unauthorized(client, hood_id, test_id):
response = client.get('/api/hoods/%d/test/%d' % (hood_id, test_id)) response = client.get('/api/hoods/{0}/test/{1}'.format(hood_id, test_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_test_delete_unauthorized(client, hood_id, test_id): def test_test_delete_unauthorized(client, hood_id, test_id):
response = client.delete('/api/hoods/%d/test/%d' % (hood_id, test_id)) response = client.delete('/api/hoods/{0}/test/{1}'.format(hood_id, test_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_test_message_read_all_unauthorized(client, hood_id, test_id): def test_test_message_read_all_unauthorized(client, hood_id, test_id):
response = client.get('/api/hoods/%d/test/%d/messages/' % (hood_id, test_id)) response = client.get('/api/hoods/{0}/test/{1}/messages/'.format(hood_id, test_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_test_message_create_unauthorized(client, hood_id, test_id): def test_test_message_create_unauthorized(client, hood_id, test_id):
response = client.post('/api/hoods/%d/test/%d/messages/' % (hood_id, test_id)) response = client.post('/api/hoods/{0}/test/{1}/messages/'.format(hood_id, test_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,5 +1,6 @@
# Copyright (C) 2020 by Maike <maike@systemli.org> # Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -11,11 +12,11 @@ from pytest import fixture
@fixture(scope="function") @fixture(scope="function")
def email_row(client, hood_id, auth_header): def email_row(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/%d/email/' % hood_id, '/api/hoods/{0}/email/'.format(hood_id),
json={'name': 'kibicara-test'}, json={'name': 'kibicara-test'},
headers=auth_header, headers=auth_header,
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
email_id = int(response.headers['Location']) email_id = int(response.headers['Location'])
yield response.json() yield response.json()
client.delete('/api/hoods/%d/email/%d' % (hood_id, email_id), headers=auth_header) client.delete('/api/hoods/{0}/email/{1}'.format(hood_id, email_id), headers=auth_header)

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Maike <maike@systemli.org> # Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -12,7 +13,7 @@ from urllib.parse import urlparse
def test_email_subscribe_unsubscribe(client, hood_id, receive_email): def test_email_subscribe_unsubscribe(client, hood_id, receive_email):
response = client.post( response = client.post(
'/api/hoods/%d/email/subscribe/' % hood_id, json={'email': 'test@localhost'} '/api/hoods/{0}/email/subscribe/'.format(hood_id), json={'email': 'test@localhost'}
) )
assert response.status_code == status.HTTP_202_ACCEPTED assert response.status_code == status.HTTP_202_ACCEPTED
mail = receive_email() mail = receive_email()
@ -24,17 +25,17 @@ def test_email_subscribe_unsubscribe(client, hood_id, receive_email):
)[0] )[0]
start = len('token=') start = len('token=')
response = client.post( response = client.post(
'/api/hoods/%d/email/subscribe/confirm/%s' '/api/hoods/{0}/email/subscribe/confirm/{1}'
% (hood_id, urlparse(confirm_url).query[start:]) .format(hood_id, urlparse(confirm_url).query[start:])
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
response = client.post( response = client.post(
'/api/hoods/%d/email/subscribe/confirm/%s' '/api/hoods/{0}/email/subscribe/confirm/{1}'
% (hood_id, urlparse(confirm_url).query[start:]) .format(hood_id, urlparse(confirm_url).query[start:])
) )
assert response.status_code == status.HTTP_409_CONFLICT assert response.status_code == status.HTTP_409_CONFLICT
token = to_token(email=mail['to'], hood=hood_id) token = to_token(email=mail['to'], hood=hood_id)
response = client.delete('/api/hoods/%d/email/unsubscribe/%s' % (hood_id, token)) response = client.delete('/api/hoods/{0}/email/unsubscribe/{1}'.format(hood_id, token))
assert response.status_code == status.HTTP_204_NO_CONTENT assert response.status_code == status.HTTP_204_NO_CONTENT
@ -44,7 +45,7 @@ def test_email_message(client, hood_id, trigger_id, email_row):
'author': "test@localhost", 'author': "test@localhost",
'secret': email_row['secret'], 'secret': email_row['secret'],
} }
response = client.post('/api/hoods/%d/email/messages/' % hood_id, json=body) response = client.post('/api/hoods/{0}/email/messages/'.format(hood_id), json=body)
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED

View file

@ -1,5 +1,6 @@
# Copyright (C) 2020 by Maike <maike@systemli.org> # Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de> # Copyright (C) 2020 by Thomas Lindner <tom@dl6tom.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -7,16 +8,16 @@ from fastapi import status
def test_email_create_unauthorized(client, hood_id): def test_email_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/%d/email/' % hood_id) response = client.post('/api/hoods/{0}/email/'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_email_delete_unauthorized(client, hood_id, email_row): def test_email_delete_unauthorized(client, hood_id, email_row):
response = client.delete('/api/hoods/%d/email/%d' % (hood_id, email_row['id'])) response = client.delete('/api/hoods/{0}/email/{1}'.format(hood_id, email_row['id']))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_email_message_unauthorized(client, hood_id, email_row): def test_email_message_unauthorized(client, hood_id, email_row):
body = {"text": "test", "author": "author", "secret": "wrong"} body = {"text": "test", "author": "author", "secret": "wrong"}
response = client.post('/api/hoods/%d/email/messages/' % hood_id, json=body) response = client.post('/api/hoods/{0}/email/messages/'.format(hood_id), json=body)
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Maike <maike@systemli.org> # Copyright (C) 2020 by Maike <maike@systemli.org>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -7,15 +8,15 @@ from nacl.exceptions import CryptoError
def test_email_subscribe_empty(client, hood_id): def test_email_subscribe_empty(client, hood_id):
response = client.post('/api/hoods/%d/email/subscribe/' % hood_id) response = client.post('/api/hoods/{0}/email/subscribe/'.format(hood_id))
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_email_subscribe_confirm_wrong_token(client, hood_id): def test_email_subscribe_confirm_wrong_token(client, hood_id):
try: try:
response = client.post( response = client.post(
'/api/hoods/%d/email/subscribe/confirm/asdfasdfasdfasdfasdfasdfasdfasdf' '/api/hoods/{0}/email/subscribe/confirm/'.format(hood_id)
% hood_id + 'asdfasdfasdfasdfasdfasdfasdfasdf'
) )
assert response.status_code is not status.HTTP_201_CREATED assert response.status_code is not status.HTTP_201_CREATED
except CryptoError: except CryptoError:
@ -35,14 +36,14 @@ def test_email_message_wrong(client, hood_id, email_row):
'author': "test@localhost", 'author': "test@localhost",
'secret': email_row['secret'], 'secret': email_row['secret'],
} }
response = client.post('/api/hoods/%d/email/messages/' % hood_id, json=body) response = client.post('/api/hoods/{0}/email/messages/'.format(hood_id), json=body)
assert response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS assert response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS
def test_email_unsubscribe_wrong_token(client, hood_id): def test_email_unsubscribe_wrong_token(client, hood_id):
try: try:
client.delete( client.delete(
'/api/hoods/%d/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf' % hood_id '/api/hoods/{0}/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf'.format(hood_id)
) )
except CryptoError: except CryptoError:
pass pass

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -33,7 +34,7 @@ def test_telegram_create_bot(
monkeypatch.setattr(telegram.webapi, 'check_token', check_token_mock) monkeypatch.setattr(telegram.webapi, 'check_token', check_token_mock)
response = client.post( response = client.post(
f'/api/hoods/{hood_id}/telegram/', '/api/hoods/{0}/telegram/'.format(hood_id),
json=body, json=body,
headers=auth_header, headers=auth_header,
) )
@ -61,7 +62,7 @@ def test_telegram_invalid_api_token(
body, body,
): ):
response = client.post( response = client.post(
f'/api/hoods/{hood_id}/telegram/', '/api/hoods/{0}/telegram/'.format(hood_id),
json=body, json=body,
headers=auth_header, headers=auth_header,
) )

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -17,7 +18,7 @@ def test_telegram_delete_bot(client, event_loop, bot, telegram, auth_header):
TelegramUser.objects.create(user_id=5678, bot=telegram.id) TelegramUser.objects.create(user_id=5678, bot=telegram.id)
) )
response = client.delete( response = client.delete(
f'/api/hoods/{telegram.hood.id}/telegram/{telegram.id}', headers=auth_header '/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id), headers=auth_header
) )
assert response.status_code == status.HTTP_204_NO_CONTENT assert response.status_code == status.HTTP_204_NO_CONTENT
with raises(NoMatch): with raises(NoMatch):
@ -31,15 +32,15 @@ def test_telegram_delete_bot_invalid_id(client, auth_header, hood_id):
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete('/api/hoods/wrong/telegram/123', headers=auth_header) response = client.delete('/api/hoods/wrong/telegram/123', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.delete(f'/api/hoods/{hood_id}/telegram/7331', headers=auth_header) response = client.delete('/api/hoods/{0}/telegram/7331'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete( response = client.delete(
f'/api/hoods/{hood_id}/telegram/wrong', headers=auth_header '/api/hoods/{0}/telegram/wrong'.format(hood_id), headers=auth_header
) )
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@mark.parametrize('bot', [{'api_token': 'apitoken123', 'welcome_message': 'msg'}]) @mark.parametrize('bot', [{'api_token': 'apitoken123', 'welcome_message': 'msg'}])
def test_telegram_delete_bot_unauthorized(client, bot, telegram): def test_telegram_delete_bot_unauthorized(client, bot, telegram):
response = client.delete(f'/api/hoods/{telegram.hood.id}/telegram/{telegram.id}') response = client.delete('/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -9,7 +10,7 @@ from pytest import mark
@mark.parametrize('bot', [{'api_token': 'apitoken123', 'welcome_message': 'msg'}]) @mark.parametrize('bot', [{'api_token': 'apitoken123', 'welcome_message': 'msg'}])
def test_telegram_get_bot(client, auth_header, event_loop, bot, telegram): def test_telegram_get_bot(client, auth_header, event_loop, bot, telegram):
response = client.get( response = client.get(
f'/api/hoods/{telegram.hood.id}/telegram/{telegram.id}', headers=auth_header '/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id), headers=auth_header
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()['id'] == telegram.id assert response.json()['id'] == telegram.id
@ -22,13 +23,13 @@ def test_telegram_get_bot_invalid_id(client, auth_header, hood_id):
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get('/api/hoods/wrong/telegram/123', headers=auth_header) response = client.get('/api/hoods/wrong/telegram/123', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.get(f'/api/hoods/{hood_id}/telegram/7331', headers=auth_header) response = client.get('/api/hoods/{0}/telegram/7331'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get(f'/api/hoods/{hood_id}/telegram/wrong', headers=auth_header) response = client.get('/api/hoods/{0}/telegram/wrong'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@mark.parametrize('bot', [{'api_token': 'apitoken456', 'welcome_message': 'msg'}]) @mark.parametrize('bot', [{'api_token': 'apitoken456', 'welcome_message': 'msg'}])
def test_telegram_get_bot_unauthorized(client, bot, telegram): def test_telegram_get_bot_unauthorized(client, bot, telegram):
response = client.get(f'/api/hoods/{telegram.hood.id}/telegram/{telegram.id}') response = client.get('/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -24,7 +25,7 @@ def test_telegram_get_bots(client, auth_header, event_loop, hood_id):
) )
) )
response = client.get( response = client.get(
f'/api/hoods/{telegram0.hood.id}/telegram', headers=auth_header '/api/hoods/{0}/telegram'.format(telegram0.hood.id), headers=auth_header
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()[0]['id'] == telegram0.id assert response.json()[0]['id'] == telegram0.id
@ -41,5 +42,5 @@ def test_telegram_get_bots_invalid_id(client, auth_header, hood_id):
def test_telegram_get_bots_unauthorized(client, hood_id): def test_telegram_get_bots_unauthorized(client, hood_id):
response = client.get(f'/api/hoods/{hood_id}/telegram') response = client.get('/api/hoods/{0}/telegram'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -75,7 +76,7 @@ def test_twitter_create_bot(
) )
# Twitter create endpoint # Twitter create endpoint
response = client.post(f'/api/hoods/{hood_id}/twitter/', headers=auth_header) response = client.post('/api/hoods/{0}/twitter/'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
bot_id = response.json()['id'] bot_id = response.json()['id']
twitter = event_loop.run_until_complete(Twitter.objects.get(id=bot_id)) twitter = event_loop.run_until_complete(Twitter.objects.get(id=bot_id))
@ -137,7 +138,7 @@ def test_twitter_create_unauthorized(client, hood_id):
def test_twitter_create_wrong_consumer_keys(client, monkeypatch, auth_header, hood_id): def test_twitter_create_wrong_consumer_keys(client, monkeypatch, auth_header, hood_id):
# No consumer keys # No consumer keys
response = client.post(f'/api/hoods/{hood_id}/twitter/', headers=auth_header) response = client.post('/api/hoods/{0}/twitter/'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
# Invalid consumer keys # Invalid consumer keys
@ -147,5 +148,5 @@ def test_twitter_create_wrong_consumer_keys(client, monkeypatch, auth_header, ho
{'consumer_key': 'consumer_key123', 'consumer_secret': 'consumer_secret123'}, {'consumer_key': 'consumer_key123', 'consumer_secret': 'consumer_secret123'},
) )
response = client.post(f'/api/hoods/{hood_id}/twitter/', headers=auth_header) response = client.post('/api/hoods/{0}/twitter/'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -10,7 +11,7 @@ from pytest import raises
def test_twitter_delete_bot(client, event_loop, twitter, auth_header): def test_twitter_delete_bot(client, event_loop, twitter, auth_header):
response = client.delete( response = client.delete(
f'/api/hoods/{twitter.hood.id}/twitter/{twitter.id}', headers=auth_header '/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id), headers=auth_header
) )
assert response.status_code == status.HTTP_204_NO_CONTENT assert response.status_code == status.HTTP_204_NO_CONTENT
with raises(NoMatch): with raises(NoMatch):
@ -22,12 +23,12 @@ def test_twitter_delete_bot_invalid_id(client, auth_header, hood_id):
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete('/api/hoods/wrong/twitter/123', headers=auth_header) response = client.delete('/api/hoods/wrong/twitter/123', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.delete(f'/api/hoods/{hood_id}/twitter/7331', headers=auth_header) response = client.delete('/api/hoods/{0}/twitter/7331'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete(f'/api/hoods/{hood_id}/twitter/wrong', headers=auth_header) response = client.delete('/api/hoods/{0}/twitter/wrong'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_twitter_delete_bot_unauthorized(client, twitter): def test_twitter_delete_bot_unauthorized(client, twitter):
response = client.delete(f'/api/hoods/{twitter.hood.id}/twitter/{twitter.id}') response = client.delete('/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -7,7 +8,7 @@ from fastapi import status
def test_twitter_get_bot(client, auth_header, event_loop, twitter): def test_twitter_get_bot(client, auth_header, event_loop, twitter):
response = client.get( response = client.get(
f'/api/hoods/{twitter.hood.id}/twitter/{twitter.id}', headers=auth_header '/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id), headers=auth_header
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()['id'] == twitter.id assert response.json()['id'] == twitter.id
@ -20,12 +21,12 @@ def test_twitter_get_bot_invalid_id(client, auth_header, hood_id):
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get('/api/hoods/wrong/twitter/123', headers=auth_header) response = client.get('/api/hoods/wrong/twitter/123', headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.get(f'/api/hoods/{hood_id}/twitter/7331', headers=auth_header) response = client.get('/api/hoods/{0}/twitter/7331'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get(f'/api/hoods/{hood_id}/twitter/wrong', headers=auth_header) response = client.get('/api/hoods/{0}/twitter/wrong'.format(hood_id), headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_twitter_get_bot_unauthorized(client, twitter): def test_twitter_get_bot_unauthorized(client, twitter):
response = client.get(f'/api/hoods/{twitter.hood.id}/twitter/{twitter.id}') response = client.get('/api/hoods/{0}/twitter/{1}'.format(twitter.hood.id, twitter.id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -1,4 +1,5 @@
# Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de> # Copyright (C) 2020 by Cathy Hu <cathy.hu@fau.de>
# Copyright (C) 2020 by Martin Rey <martin.rey@mailbox.org>
# #
# SPDX-License-Identifier: 0BSD # SPDX-License-Identifier: 0BSD
@ -23,7 +24,7 @@ def test_twitter_get_bots(client, auth_header, event_loop, hood_id):
access_token_secret='access_token_secret456', access_token_secret='access_token_secret456',
) )
) )
response = client.get(f'/api/hoods/{twitter0.hood.id}/twitter', headers=auth_header) response = client.get('/api/hoods/{0}/twitter'.format(twitter0.hood.id), headers=auth_header)
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()[0]['id'] == twitter0.id assert response.json()[0]['id'] == twitter0.id
assert response.json()[0]['access_token'] == twitter0.access_token assert response.json()[0]['access_token'] == twitter0.access_token
@ -39,5 +40,5 @@ def test_twitter_get_bots_invalid_id(client, auth_header, hood_id):
def test_twitter_get_bots_unauthorized(client, hood_id): def test_twitter_get_bots_unauthorized(client, hood_id):
response = client.get(f'/api/hoods/{hood_id}/twitter') response = client.get('/api/hoods/{0}/twitter'.format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED