[misc] Use double-quotes (black default)

This commit is contained in:
Thomas Lindner 2023-03-18 18:47:02 +01:00
parent fd09b381a6
commit 13c20ca245
35 changed files with 464 additions and 464 deletions

View file

@ -55,7 +55,7 @@ deps =
mypy mypy
types-requests types-requests
commands = commands =
black -S --check --diff src tests black --check --diff src tests
flake8 src tests flake8 src tests
# not yet # not yet
#mypy --ignore-missing-imports src tests #mypy --ignore-missing-imports src tests

View file

@ -12,16 +12,16 @@ from nacl.utils import random
The default configuration gets overwritten by a configuration file if one exists. The default configuration gets overwritten by a configuration file if one exists.
""" """
config = { config = {
'database_connection': 'sqlite:////tmp/kibicara.sqlite', "database_connection": "sqlite:////tmp/kibicara.sqlite",
'frontend_url': 'http://localhost:4200', # url of frontend, change in prod "frontend_url": "http://localhost:4200", # url of frontend, change in prod
'secret': random(SecretBox.KEY_SIZE).hex(), # generate with: openssl rand -hex 32 "secret": random(SecretBox.KEY_SIZE).hex(), # generate with: openssl rand -hex 32
# production params # production params
'frontend_path': None, # required, path to frontend html/css/js files "frontend_path": None, # required, path to frontend html/css/js files
'production': True, "production": True,
'behind_proxy': False, "behind_proxy": False,
'keyfile': None, # optional for ssl "keyfile": None, # optional for ssl
'certfile': None, # optional for ssl "certfile": None, # optional for ssl
# dev params # dev params
'root_url': 'http://localhost:8000', # url of backend "root_url": "http://localhost:8000", # url of backend
'cors_allow_origin': 'http://localhost:4200', "cors_allow_origin": "http://localhost:4200",
} }

View file

@ -15,7 +15,7 @@ from socket import getfqdn
logger = getLogger(__name__) logger = getLogger(__name__)
def send_email(to, subject, sender='kibicara', body=''): def send_email(to, subject, sender="kibicara", body=""):
"""E-Mail sender. """E-Mail sender.
Sends an E-Mail to a specified recipient with a body Sends an E-Mail to a specified recipient with a body
@ -33,10 +33,10 @@ def send_email(to, subject, sender='kibicara', body=''):
body (str): The body of the e-mail body (str): The body of the e-mail
""" """
msg = MIMEMultipart() msg = MIMEMultipart()
msg['From'] = 'Kibicara <{0}@{1}>'.format(sender, getfqdn()) msg["From"] = "Kibicara <{0}@{1}>".format(sender, getfqdn())
msg['To'] = to msg["To"] = to
msg['Subject'] = '[Kibicara] {0}'.format(subject) msg["Subject"] = "[Kibicara] {0}".format(subject)
msg.attach(MIMEText(body)) msg.attach(MIMEText(body))
with SMTP('localhost') as smtp: with SMTP("localhost") as smtp:
smtp.send_message(msg) smtp.send_message(msg)

View file

@ -35,17 +35,17 @@ class Main:
def __init__(self): def __init__(self):
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
'-f', "-f",
'--config', "--config",
dest='configfile', dest="configfile",
default='/etc/kibicara.conf', default="/etc/kibicara.conf",
help='path to config file', help="path to config file",
) )
parser.add_argument( parser.add_argument(
'-v', "-v",
'--verbose', "--verbose",
action='count', action="count",
help='Raise verbosity level', help="Raise verbosity level",
) )
args = parser.parse_args() args = parser.parse_args()
@ -63,9 +63,9 @@ class Main:
} }
basicConfig( basicConfig(
level=LOGLEVELS.get(args.verbose, DEBUG), level=LOGLEVELS.get(args.verbose, DEBUG),
format='%(asctime)s %(name)s %(message)s', format="%(asctime)s %(name)s %(message)s",
) )
getLogger('aiosqlite').setLevel(WARNING) getLogger("aiosqlite").setLevel(WARNING)
Mapping.create_all() Mapping.create_all()
asyncio_run(self.__run()) asyncio_run(self.__run())
@ -78,31 +78,31 @@ class Main:
async def get_response(self, path, scope): async def get_response(self, path, scope):
response = await super().get_response(path, scope) response = await super().get_response(path, scope)
if response.status_code == 404: if response.status_code == 404:
response = await super().get_response('.', scope) response = await super().get_response(".", scope)
return response return response
app = FastAPI() app = FastAPI()
server_config = Config() server_config = Config()
server_config.accesslog = '-' server_config.accesslog = "-"
server_config.behind_proxy = config['behind_proxy'] server_config.behind_proxy = config["behind_proxy"]
server_config.keyfile = config['keyfile'] server_config.keyfile = config["keyfile"]
server_config.certfile = config['certfile'] server_config.certfile = config["certfile"]
if config['production']: if config["production"]:
server_config.bind = ['0.0.0.0:8000', '[::]:8000'] server_config.bind = ["0.0.0.0:8000", "[::]:8000"]
api = FastAPI() api = FastAPI()
api.include_router(router) api.include_router(router)
app.mount('/api', api) app.mount("/api", api)
if not config['production'] and config['cors_allow_origin']: if not config["production"] and config["cors_allow_origin"]:
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=config['cors_allow_origin'], allow_origins=config["cors_allow_origin"],
allow_credentials=True, allow_credentials=True,
allow_methods=['*'], allow_methods=["*"],
allow_headers=['*'], allow_headers=["*"],
) )
if config['frontend_path'] is not None: if config["frontend_path"] is not None:
app.mount( app.mount(
'/', "/",
app=SinglePageApplication(directory=config['frontend_path'], html=True), app=SinglePageApplication(directory=config["frontend_path"], html=True),
) )
await serve(app, server_config) await serve(app, server_config)

View file

@ -14,7 +14,7 @@ from kibicara.config import config
class Mapping: class Mapping:
database = Database(config['database_connection']) database = Database(config["database_connection"])
metadata = MetaData() metadata = MetaData()
@classmethod @classmethod
@ -34,7 +34,7 @@ class Admin(Model):
passhash: Text() passhash: Text()
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'admins' table_name = "admins"
class Hood(Model): class Hood(Model):
@ -44,7 +44,7 @@ class Hood(Model):
email_enabled: Boolean() = True email_enabled: Boolean() = True
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'hoods' table_name = "hoods"
class AdminHoodRelation(Model): class AdminHoodRelation(Model):
@ -53,7 +53,7 @@ class AdminHoodRelation(Model):
hood: ForeignKey(Hood) hood: ForeignKey(Hood)
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'admin_hood_relations' table_name = "admin_hood_relations"
class Trigger(Model): class Trigger(Model):
@ -62,7 +62,7 @@ class Trigger(Model):
pattern: Text() pattern: Text()
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'triggers' table_name = "triggers"
class BadWord(Model): class BadWord(Model):
@ -71,4 +71,4 @@ class BadWord(Model):
pattern: Text() pattern: Text()
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'badwords' table_name = "badwords"

View file

@ -94,7 +94,7 @@ class Censor:
async def __run(self): async def __run(self):
await self.hood.load() await self.hood.load()
self.__task.set_name('{0} {1}'.format(self.__class__.__name__, self.hood.name)) self.__task.set_name("{0} {1}".format(self.__class__.__name__, self.hood.name))
try: try:
self.status = BotStatus.RUNNING self.status = BotStatus.RUNNING
await self.run() await self.run()
@ -143,17 +143,17 @@ class Censor:
for badword in await BadWord.objects.filter(hood=self.hood).all(): for badword in await BadWord.objects.filter(hood=self.hood).all():
if search(badword.pattern, message.text, IGNORECASE): if search(badword.pattern, message.text, IGNORECASE):
logger.debug( logger.debug(
'Matched bad word - dropped message: {0}'.format(message.text) "Matched bad word - dropped message: {0}".format(message.text)
) )
return False return False
for trigger in await Trigger.objects.filter(hood=self.hood).all(): for trigger in await Trigger.objects.filter(hood=self.hood).all():
if search(trigger.pattern, message.text, IGNORECASE): if search(trigger.pattern, message.text, IGNORECASE):
logger.debug( logger.debug(
'Matched trigger - passed message: {0}'.format(message.text) "Matched trigger - passed message: {0}".format(message.text)
) )
return True return True
logger.debug( logger.debug(
'Did not match any trigger - dropped message: {0}'.format(message.text) "Did not match any trigger - dropped message: {0}".format(message.text)
) )
return False return False

View file

@ -36,7 +36,7 @@ class EmailBot(Censor):
while True: while True:
message = await self.receive() message = await self.receive()
logger.debug( logger.debug(
'Received message from censor ({0}): {1}'.format( "Received message from censor ({0}): {1}".format(
self.hood.name, message.text self.hood.name, message.text
) )
) )
@ -45,19 +45,19 @@ class EmailBot(Censor):
).all(): ).all():
token = to_token(email=subscriber.email, hood=self.hood.id) token = to_token(email=subscriber.email, hood=self.hood.id)
body = ( body = (
'{0}\n\n--\n' "{0}\n\n--\n"
+ 'If you want to stop receiving these mails,' + "If you want to stop receiving these mails,"
+ 'follow this link: {1}/hoods/{2}/email-unsubscribe?token={3}' + "follow this link: {1}/hoods/{2}/email-unsubscribe?token={3}"
).format(message.text, config['frontend_url'], self.hood.id, token) ).format(message.text, config["frontend_url"], self.hood.id, token)
try: try:
logger.debug('Trying to send: \n{0}'.format(body)) logger.debug("Trying to send: \n{0}".format(body))
email.send_email( email.send_email(
subscriber.email, subscriber.email,
'Kibicara {0}'.format(self.hood.name), "Kibicara {0}".format(self.hood.name),
body=body, body=body,
) )
except (ConnectionRefusedError, SMTPException): except (ConnectionRefusedError, SMTPException):
logger.exception('Sending email to subscriber failed.') logger.exception("Sending email to subscriber failed.")
spawner = Spawner(Hood, EmailBot) spawner = Spawner(Hood, EmailBot)

View file

@ -29,14 +29,14 @@ class Main:
def __init__(self): def __init__(self):
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
'-f', "-f",
'--config', "--config",
dest='configfile', dest="configfile",
default='/etc/kibicara.conf', default="/etc/kibicara.conf",
help='path to config file', help="path to config file",
) )
# the MDA passes the recipient address as command line argument # the MDA passes the recipient address as command line argument
parser.add_argument('recipient') parser.add_argument("recipient")
args = parser.parse_args() args = parser.parse_args()
try: try:
@ -55,54 +55,54 @@ class Main:
try: try:
email = await Email.objects.get(name=email_name) email = await Email.objects.get(name=email_name)
except NoMatch: except NoMatch:
logger.error('No recipient with this name') logger.error("No recipient with this name")
exit(1) exit(1)
# read mail from STDIN and parse to EmailMessage object # read mail from STDIN and parse to EmailMessage object
message = BytesParser(policy=default).parsebytes(stdin.buffer.read()) message = BytesParser(policy=default).parsebytes(stdin.buffer.read())
sender = '' sender = ""
if message.get('sender'): if message.get("sender"):
sender = message.get('sender') sender = message.get("sender")
elif message.get('from'): elif message.get("from"):
sender = message.get('from') sender = message.get("from")
else: else:
logger.error('No Sender of From header') logger.error("No Sender of From header")
exit(1) exit(1)
sender = parseaddr(sender)[1] sender = parseaddr(sender)[1]
if not sender: if not sender:
logger.error('Could not parse sender') logger.error("Could not parse sender")
exit(1) exit(1)
maybe_subscriber = await EmailSubscribers.objects.filter(email=sender).all() maybe_subscriber = await EmailSubscribers.objects.filter(email=sender).all()
if len(maybe_subscriber) != 1 or maybe_subscriber[0].hood.id != email.hood.id: if len(maybe_subscriber) != 1 or maybe_subscriber[0].hood.id != email.hood.id:
logger.error('Not a subscriber') logger.error("Not a subscriber")
exit(1) exit(1)
# extract relevant data from mail # extract relevant data from mail
text = sub( text = sub(
r'<[^>]*>', r"<[^>]*>",
'', "",
message.get_body(preferencelist=('plain', 'html')).get_content(), message.get_body(preferencelist=("plain", "html")).get_content(),
) )
response = post( response = post(
'{0}/api/hoods/{1}/email/messages/'.format( "{0}/api/hoods/{1}/email/messages/".format(
config['root_url'], email.hood.pk config["root_url"], email.hood.pk
), ),
json={'text': text, 'secret': email.secret}, json={"text": text, "secret": email.secret},
) )
if response.status_code == status.HTTP_201_CREATED: if response.status_code == status.HTTP_201_CREATED:
exit(0) exit(0)
elif response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS: elif response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS:
logger.error('Message was\'t accepted: {0}'.format(text)) logger.error("Message was't accepted: {0}".format(text))
elif response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY: elif response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY:
logger.error('Malformed request: {0}'.format(response.json())) logger.error("Malformed request: {0}".format(response.json()))
elif response.status_code == status.HTTP_401_UNAUTHORIZED: elif response.status_code == status.HTTP_401_UNAUTHORIZED:
logger.error('Wrong API secret. kibicara_mda seems to be misconfigured') logger.error("Wrong API secret. kibicara_mda seems to be misconfigured")
else: else:
logger.error( logger.error(
'REST-API failed with response status {0}'.format(response.status_code) "REST-API failed with response status {0}".format(response.status_code)
) )
exit(1) exit(1)

View file

@ -19,7 +19,7 @@ class Email(Model):
secret: Text() secret: Text()
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'email' table_name = "email"
class EmailSubscribers(Model): class EmailSubscribers(Model):
@ -30,4 +30,4 @@ class EmailSubscribers(Model):
email: Text(unique=True) email: Text(unique=True)
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'email_subscribers' table_name = "email_subscribers"

View file

@ -29,10 +29,10 @@ logger = getLogger(__name__)
class BodyEmail(BaseModel): class BodyEmail(BaseModel):
name: str name: str
@validator('name') @validator("name")
def valid_prefix(cls, value): def valid_prefix(cls, value):
if not value.startswith('kibicara-'): if not value.startswith("kibicara-"):
raise ValueError('Recipient address didn\'t start with kibicara-') raise ValueError("Recipient address didn't start with kibicara-")
return value return value
@ -79,9 +79,9 @@ router = APIRouter()
@router.get( @router.get(
'/public', "/public",
# TODO response_model # TODO response_model
operation_id='get_emails_public', operation_id="get_emails_public",
) )
async def email_read_all_public(hood=Depends(get_hood_unauthorized)): async def email_read_all_public(hood=Depends(get_hood_unauthorized)):
if hood.email_enabled: if hood.email_enabled:
@ -91,19 +91,19 @@ async def email_read_all_public(hood=Depends(get_hood_unauthorized)):
@router.get( @router.get(
'/', "/",
# TODO response_model # TODO response_model
operation_id='get_emails', operation_id="get_emails",
) )
async def email_read_all(hood=Depends(get_hood)): async def email_read_all(hood=Depends(get_hood)):
return await Email.objects.filter(hood=hood).select_related('hood').all() return await Email.objects.filter(hood=hood).select_related("hood").all()
@router.post( @router.post(
'/', "/",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
# TODO response_model # TODO response_model
operation_id='create_email', operation_id="create_email",
) )
async def email_create(values: BodyEmail, response: Response, hood=Depends(get_hood)): async def email_create(values: BodyEmail, response: Response, hood=Depends(get_hood)):
"""Create an Email bot. Call this when creating a hood. """Create an Email bot. Call this when creating a hood.
@ -115,27 +115,27 @@ async def email_create(values: BodyEmail, response: Response, hood=Depends(get_h
email = await Email.objects.create( email = await Email.objects.create(
hood=hood, secret=urandom(32).hex(), **values.__dict__ hood=hood, secret=urandom(32).hex(), **values.__dict__
) )
response.headers['Location'] = str(hood.id) response.headers["Location"] = str(hood.id)
return email return email
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.get( @router.get(
'/status', "/status",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model # TODO response_model
operation_id='status_email', operation_id="status_email",
) )
async def email_status(hood=Depends(get_hood)): async def email_status(hood=Depends(get_hood)):
return {'status': spawner.get(hood).status.name} return {"status": spawner.get(hood).status.name}
@router.post( @router.post(
'/start', "/start",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model # TODO response_model
operation_id='start_email', operation_id="start_email",
) )
async def email_start(hood=Depends(get_hood)): async def email_start(hood=Depends(get_hood)):
await hood.update(email_enabled=True) await hood.update(email_enabled=True)
@ -144,10 +144,10 @@ async def email_start(hood=Depends(get_hood)):
@router.post( @router.post(
'/stop', "/stop",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model # TODO response_model
operation_id='stop_email', operation_id="stop_email",
) )
async def email_stop(hood=Depends(get_hood)): async def email_stop(hood=Depends(get_hood)):
await hood.update(email_enabled=False) await hood.update(email_enabled=False)
@ -156,16 +156,16 @@ async def email_stop(hood=Depends(get_hood)):
@router.get( @router.get(
'/{email_id}', "/{email_id}",
# TODO response_model # TODO response_model
operation_id='get_email', operation_id="get_email",
) )
async def email_read(email=Depends(get_email)): async def email_read(email=Depends(get_email)):
return email return email
@router.delete( @router.delete(
'/{email_id}', status_code=status.HTTP_204_NO_CONTENT, operation_id='delete_email' "/{email_id}", status_code=status.HTTP_204_NO_CONTENT, operation_id="delete_email"
) )
async def email_delete(email=Depends(get_email)): async def email_delete(email=Depends(get_email)):
"""Delete an Email bot. """Delete an Email bot.
@ -179,9 +179,9 @@ async def email_delete(email=Depends(get_email)):
@router.post( @router.post(
'/subscribe/', "/subscribe/",
status_code=status.HTTP_202_ACCEPTED, status_code=status.HTTP_202_ACCEPTED,
operation_id='subscribe', operation_id="subscribe",
response_model=BaseModel, response_model=BaseModel,
) )
async def email_subscribe( async def email_subscribe(
@ -194,8 +194,8 @@ async def email_subscribe(
:return: Returns status code 200 after sending confirmation email. :return: Returns status code 200 after sending confirmation email.
""" """
token = to_token(hood=hood.id, email=subscriber.email) token = to_token(hood=hood.id, email=subscriber.email)
confirm_link = '{0}/hoods/{1}/email-confirm?token={2}'.format( confirm_link = "{0}/hoods/{1}/email-confirm?token={2}".format(
config['frontend_url'], config["frontend_url"],
hood.id, hood.id,
token, token,
) )
@ -205,26 +205,26 @@ async def email_subscribe(
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
email.send_email( email.send_email(
subscriber.email, subscriber.email,
'Subscribe to Kibicara {0}'.format(hood.name), "Subscribe to Kibicara {0}".format(hood.name),
body='To confirm your subscription, follow this link: {0}'.format( body="To confirm your subscription, follow this link: {0}".format(
confirm_link confirm_link
), ),
) )
return {} return {}
except ConnectionRefusedError: except ConnectionRefusedError:
logger.info(token) logger.info(token)
logger.error('Sending subscription confirmation email failed.', exc_info=True) logger.error("Sending subscription confirmation email failed.", exc_info=True)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY)
except SMTPException: except SMTPException:
logger.info(token) logger.info(token)
logger.error('Sending subscription confirmation email failed.', exc_info=True) logger.error("Sending subscription confirmation email failed.", exc_info=True)
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY)
@router.post( @router.post(
'/subscribe/confirm/{token}', "/subscribe/confirm/{token}",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
operation_id='confirm_subscriber', operation_id="confirm_subscriber",
response_model=BaseModel, response_model=BaseModel,
) )
async def email_subscribe_confirm(token, hood=Depends(get_hood_unauthorized)): async def email_subscribe_confirm(token, hood=Depends(get_hood_unauthorized)):
@ -236,19 +236,19 @@ async def email_subscribe_confirm(token, hood=Depends(get_hood_unauthorized)):
""" """
payload = from_token(token) payload = from_token(token)
# If token.hood and url.hood are different, raise an error: # If token.hood and url.hood are different, raise an error:
if hood.id is not payload['hood']: if hood.id is not payload["hood"]:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
try: try:
await EmailSubscribers.objects.create(hood=hood.id, email=payload['email']) await EmailSubscribers.objects.create(hood=hood.id, email=payload["email"])
return {} return {}
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.delete( @router.delete(
'/unsubscribe/{token}', "/unsubscribe/{token}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='unsubscribe', operation_id="unsubscribe",
) )
async def email_unsubscribe(token, hood=Depends(get_hood_unauthorized)): async def email_unsubscribe(token, hood=Depends(get_hood_unauthorized)):
"""Remove a subscriber from the database when they click on an unsubscribe link. """Remove a subscriber from the database when they click on an unsubscribe link.
@ -257,13 +257,13 @@ async def email_unsubscribe(token, hood=Depends(get_hood_unauthorized)):
:param hood: Hood the Email bot belongs to. :param hood: Hood the Email bot belongs to.
""" """
try: try:
logger.warning('token is: {0}'.format(token)) logger.warning("token is: {0}".format(token))
payload = from_token(token) payload = from_token(token)
# If token.hood and url.hood are different, raise an error: # If token.hood and url.hood are different, raise an error:
if hood.id is not payload['hood']: if hood.id is not payload["hood"]:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
subscriber = await EmailSubscribers.objects.filter( subscriber = await EmailSubscribers.objects.filter(
hood=payload['hood'], email=payload['email'] hood=payload["hood"], email=payload["email"]
).get() ).get()
await subscriber.delete() await subscriber.delete()
return Response(status_code=status.HTTP_204_NO_CONTENT) return Response(status_code=status.HTTP_204_NO_CONTENT)
@ -274,28 +274,28 @@ async def email_unsubscribe(token, hood=Depends(get_hood_unauthorized)):
@router.get( @router.get(
'/subscribers/', "/subscribers/",
# TODO response_model # TODO response_model
operation_id='get_subscribers', operation_id="get_subscribers",
) )
async def subscribers_read_all(hood=Depends(get_hood)): async def subscribers_read_all(hood=Depends(get_hood)):
return await EmailSubscribers.objects.filter(hood=hood).all() return await EmailSubscribers.objects.filter(hood=hood).all()
@router.get( @router.get(
'/subscribers/{subscriber_id}', "/subscribers/{subscriber_id}",
# TODO response_model # TODO response_model
operation_id='get_subscriber', operation_id="get_subscriber",
) )
async def subscribers_read(subscriber=Depends(get_subscriber)): async def subscribers_read(subscriber=Depends(get_subscriber)):
return subscriber return subscriber
@router.post( @router.post(
'/messages/', "/messages/",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
# TODO response_model # TODO response_model
operation_id='send_message', operation_id="send_message",
) )
async def email_message_create( async def email_message_create(
message: BodyMessage, hood=Depends(get_hood_unauthorized) message: BodyMessage, hood=Depends(get_hood_unauthorized)
@ -310,14 +310,14 @@ async def email_message_create(
if message.secret == receiver.secret: if message.secret == receiver.secret:
# pass message.text to bot.py # pass message.text to bot.py
if await spawner.get(hood).publish(Message(message.text)): if await spawner.get(hood).publish(Message(message.text)):
logger.warning('Message was accepted: {0}'.format(message.text)) logger.warning("Message was accepted: {0}".format(message.text))
return {} return {}
else: else:
logger.warning('Message wasn\'t accepted: {0}'.format(message.text)) logger.warning("Message wasn't accepted: {0}".format(message.text))
raise HTTPException( raise HTTPException(
status_code=status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS status_code=status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS
) )
logger.warning( logger.warning(
'Someone is trying to submit an email without the correct API secret' "Someone is trying to submit an email without the correct API secret"
) )
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)

View file

@ -32,9 +32,9 @@ class TelegramBot(Censor):
def _create_dispatcher(self): def _create_dispatcher(self):
dp = Dispatcher(self.bot) dp = Dispatcher(self.bot)
dp.register_message_handler(self._send_welcome, commands=['start']) dp.register_message_handler(self._send_welcome, commands=["start"])
dp.register_message_handler(self._remove_user, commands=['stop']) dp.register_message_handler(self._remove_user, commands=["stop"])
dp.register_message_handler(self._send_help, commands=['help']) dp.register_message_handler(self._send_help, commands=["help"])
dp.register_message_handler(self._receive_message) dp.register_message_handler(self._receive_message)
return dp return dp
@ -42,30 +42,30 @@ class TelegramBot(Censor):
try: try:
self.bot = Bot(token=self.telegram_model.api_token) self.bot = Bot(token=self.telegram_model.api_token)
self.dp = self._create_dispatcher() self.dp = self._create_dispatcher()
logger.debug('Bot {0} starting.'.format(self.telegram_model.hood.name)) logger.debug("Bot {0} starting.".format(self.telegram_model.hood.name))
user = await self.bot.get_me() user = await self.bot.get_me()
if user.username: if user.username:
await self.telegram_model.update(username=user.username) await self.telegram_model.update(username=user.username)
await gather(self.dp.start_polling(), self._push()) await gather(self.dp.start_polling(), self._push())
except CancelledError: except CancelledError:
logger.debug( logger.debug(
'Bot {0} received Cancellation.'.format(self.telegram_model.hood.name) "Bot {0} received Cancellation.".format(self.telegram_model.hood.name)
) )
self.dp = None self.dp = None
raise raise
except exceptions.ValidationError: except exceptions.ValidationError:
logger.debug( logger.debug(
'Bot {0} has invalid auth token.'.format(self.telegram_model.hood.name) "Bot {0} has invalid auth token.".format(self.telegram_model.hood.name)
) )
await self.telegram_model.update(enabled=False) await self.telegram_model.update(enabled=False)
finally: finally:
logger.debug('Bot {0} stopped.'.format(self.telegram_model.hood.name)) logger.debug("Bot {0} stopped.".format(self.telegram_model.hood.name))
async def _push(self): async def _push(self):
while True: while True:
message = await self.receive() message = await self.receive()
logger.debug( logger.debug(
'Received message from censor ({0}): {1}'.format( "Received message from censor ({0}): {1}".format(
self.telegram_model.hood.name, message.text self.telegram_model.hood.name, message.text
) )
) )
@ -79,34 +79,34 @@ class TelegramBot(Censor):
await self.bot.send_message(user_id, message, disable_notification=False) await self.bot.send_message(user_id, message, disable_notification=False)
except exceptions.BotBlocked: except exceptions.BotBlocked:
logger.error( logger.error(
'Target [ID:{0}] ({1}): blocked by user'.format( "Target [ID:{0}] ({1}): blocked by user".format(
user_id, self.telegram_model.hood.name user_id, self.telegram_model.hood.name
) )
) )
except exceptions.ChatNotFound: except exceptions.ChatNotFound:
logger.error( logger.error(
'Target [ID:{0}] ({1}): invalid user ID'.format( "Target [ID:{0}] ({1}): invalid user ID".format(
user_id, self.telegram_model.hood.name user_id, self.telegram_model.hood.name
) )
) )
except exceptions.RetryAfter as e: except exceptions.RetryAfter as e:
logger.error( logger.error(
'Target [ID:{0}] ({1}): Flood limit is exceeded.'.format( "Target [ID:{0}] ({1}): Flood limit is exceeded.".format(
user_id, self.telegram_model.hood.name user_id, self.telegram_model.hood.name
) )
+ 'Sleep {0} seconds.'.format(e.timeout) + "Sleep {0} seconds.".format(e.timeout)
) )
await sleep(e.timeout) await sleep(e.timeout)
return await self._send_message(user_id, message) return await self._send_message(user_id, message)
except exceptions.UserDeactivated: except exceptions.UserDeactivated:
logger.error( logger.error(
'Target [ID:{0}] ({1}): user is deactivated'.format( "Target [ID:{0}] ({1}): user is deactivated".format(
user_id, self.telegram_model.hood.name user_id, self.telegram_model.hood.name
) )
) )
except exceptions.TelegramAPIError: except exceptions.TelegramAPIError:
logger.exception( logger.exception(
'Target [ID:{0}] ({1}): failed'.format( "Target [ID:{0}] ({1}): failed".format(
user_id, self.telegram_model.hood.name user_id, self.telegram_model.hood.name
) )
) )
@ -114,14 +114,14 @@ class TelegramBot(Censor):
async def _send_welcome(self, message: types.Message): async def _send_welcome(self, message: types.Message):
try: try:
if message.from_user.is_bot: if message.from_user.is_bot:
await message.reply('Error: Bots can not join here.') await message.reply("Error: Bots can not join here.")
return return
await TelegramUser.objects.create( await TelegramUser.objects.create(
user_id=message.from_user.id, bot=self.telegram_model user_id=message.from_user.id, bot=self.telegram_model
) )
await message.reply(self.telegram_model.welcome_message) await message.reply(self.telegram_model.welcome_message)
except IntegrityError: except IntegrityError:
await message.reply('Error: You are already registered.') await message.reply("Error: You are already registered.")
async def _remove_user(self, message: types.Message): async def _remove_user(self, message: types.Message):
try: try:
@ -129,19 +129,19 @@ class TelegramBot(Censor):
user_id=message.from_user.id, bot=self.telegram_model user_id=message.from_user.id, bot=self.telegram_model
) )
await telegram_user.delete() await telegram_user.delete()
await message.reply('You were removed successfully from this bot.') await message.reply("You were removed successfully from this bot.")
except NoMatch: except NoMatch:
await message.reply('Error: You are not subscribed to this bot.') await message.reply("Error: You are not subscribed to this bot.")
async def _send_help(self, message: types.Message): async def _send_help(self, message: types.Message):
if message.from_user.is_bot: if message.from_user.is_bot:
await message.reply('Error: Bots can\'t be helped.') await message.reply("Error: Bots can't be helped.")
return return
await message.reply('Send messages here to broadcast them to your hood') await message.reply("Send messages here to broadcast them to your hood")
async def _receive_message(self, message: types.Message): async def _receive_message(self, message: types.Message):
if not message.text: if not message.text:
await message.reply('Error: Only text messages are allowed.') await message.reply("Error: Only text messages are allowed.")
return return
await self.publish(Message(message.text)) await self.publish(Message(message.text))

View file

@ -17,7 +17,7 @@ class Telegram(Model):
enabled: Boolean() = True enabled: Boolean() = True
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'telegrambots' table_name = "telegrambots"
class TelegramUser(Model): class TelegramUser(Model):
@ -27,4 +27,4 @@ class TelegramUser(Model):
bot: ForeignKey(Telegram) bot: ForeignKey(Telegram)
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'telegramusers' table_name = "telegramusers"

View file

@ -21,9 +21,9 @@ logger = getLogger(__name__)
class BodyTelegram(BaseModel): class BodyTelegram(BaseModel):
api_token: str api_token: str
welcome_message: str = 'Welcome!' welcome_message: str = "Welcome!"
@validator('api_token') @validator("api_token")
def valid_api_token(cls, value): def valid_api_token(cls, value):
try: try:
check_token(value) check_token(value)
@ -48,9 +48,9 @@ telegram_callback_router = APIRouter()
@router.get( @router.get(
'/public', "/public",
# TODO response_model, # TODO response_model,
operation_id='get_telegrams_public', operation_id="get_telegrams_public",
) )
async def telegram_read_all_public(hood=Depends(get_hood_unauthorized)): async def telegram_read_all_public(hood=Depends(get_hood_unauthorized)):
telegrambots = await Telegram.objects.filter(hood=hood).all() telegrambots = await Telegram.objects.filter(hood=hood).all()
@ -62,27 +62,27 @@ async def telegram_read_all_public(hood=Depends(get_hood_unauthorized)):
@router.get( @router.get(
'/', "/",
# TODO response_model, # TODO response_model,
operation_id='get_telegrams', operation_id="get_telegrams",
) )
async def telegram_read_all(hood=Depends(get_hood)): async def telegram_read_all(hood=Depends(get_hood)):
return await Telegram.objects.filter(hood=hood).all() return await Telegram.objects.filter(hood=hood).all()
@router.get( @router.get(
'/{telegram_id}', "/{telegram_id}",
# TODO response_model, # TODO response_model,
operation_id='get_telegram', operation_id="get_telegram",
) )
async def telegram_read(telegram=Depends(get_telegram)): async def telegram_read(telegram=Depends(get_telegram)):
return telegram return telegram
@router.delete( @router.delete(
'/{telegram_id}', "/{telegram_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='delete_telegram', operation_id="delete_telegram",
) )
async def telegram_delete(telegram=Depends(get_telegram)): async def telegram_delete(telegram=Depends(get_telegram)):
spawner.stop(telegram) spawner.stop(telegram)
@ -93,10 +93,10 @@ async def telegram_delete(telegram=Depends(get_telegram)):
@router.post( @router.post(
'/', "/",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
# TODO response_model, # TODO response_model,
operation_id='create_telegram', operation_id="create_telegram",
) )
async def telegram_create( async def telegram_create(
response: Response, values: BodyTelegram, hood=Depends(get_hood) response: Response, values: BodyTelegram, hood=Depends(get_hood)
@ -104,17 +104,17 @@ async def telegram_create(
try: try:
telegram = await Telegram.objects.create(hood=hood, **values.__dict__) telegram = await Telegram.objects.create(hood=hood, **values.__dict__)
spawner.start(telegram) spawner.start(telegram)
response.headers['Location'] = str(telegram.id) response.headers["Location"] = str(telegram.id)
return telegram return telegram
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.put( @router.put(
'/{telegram_id}', "/{telegram_id}",
status_code=status.HTTP_202_ACCEPTED, status_code=status.HTTP_202_ACCEPTED,
# TODO response_model, # TODO response_model,
operation_id='update_telegram', operation_id="update_telegram",
) )
async def telegram_update(values: BodyTelegram, telegram=Depends(get_telegram)): async def telegram_update(values: BodyTelegram, telegram=Depends(get_telegram)):
try: try:
@ -127,20 +127,20 @@ async def telegram_update(values: BodyTelegram, telegram=Depends(get_telegram)):
@router.get( @router.get(
'/{telegram_id}/status', "/{telegram_id}/status",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model, # TODO response_model,
operation_id='status_telegram', operation_id="status_telegram",
) )
async def telegram_status(telegram=Depends(get_telegram)): async def telegram_status(telegram=Depends(get_telegram)):
return {'status': spawner.get(telegram).status.name} return {"status": spawner.get(telegram).status.name}
@router.post( @router.post(
'/{telegram_id}/start', "/{telegram_id}/start",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model, # TODO response_model,
operation_id='start_telegram', operation_id="start_telegram",
) )
async def telegram_start(telegram=Depends(get_telegram)): async def telegram_start(telegram=Depends(get_telegram)):
await telegram.update(enabled=True) await telegram.update(enabled=True)
@ -149,10 +149,10 @@ async def telegram_start(telegram=Depends(get_telegram)):
@router.post( @router.post(
'/{telegram_id}/stop', "/{telegram_id}/stop",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model, # TODO response_model,
operation_id='stop_telegram', operation_id="stop_telegram",
) )
async def telegram_stop(telegram=Depends(get_telegram)): async def telegram_stop(telegram=Depends(get_telegram)):
await telegram.update(enabled=False) await telegram.update(enabled=False)

View file

@ -13,4 +13,4 @@ class Test(Model):
hood: ForeignKey(Hood) hood: ForeignKey(Hood)
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'testapi' table_name = "testapi"

View file

@ -30,39 +30,39 @@ async def get_test(test_id: int, hood=Depends(get_hood)):
router = APIRouter() router = APIRouter()
@router.get('/') @router.get("/")
async def test_read_all(hood=Depends(get_hood)): async def test_read_all(hood=Depends(get_hood)):
return await Test.objects.filter(hood=hood).all() return await Test.objects.filter(hood=hood).all()
@router.post('/', status_code=status.HTTP_201_CREATED) @router.post("/", status_code=status.HTTP_201_CREATED)
async def test_create(response: Response, hood=Depends(get_hood)): async def test_create(response: Response, hood=Depends(get_hood)):
try: try:
test = await Test.objects.create(hood=hood) test = await Test.objects.create(hood=hood)
spawner.start(test) spawner.start(test)
response.headers['Location'] = str(test.id) response.headers["Location"] = str(test.id)
return test return test
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.get('/{test_id}') @router.get("/{test_id}")
async def test_read(test=Depends(get_test)): async def test_read(test=Depends(get_test)):
return test return test
@router.delete('/{test_id}', status_code=status.HTTP_204_NO_CONTENT) @router.delete("/{test_id}", status_code=status.HTTP_204_NO_CONTENT)
async def test_delete(test=Depends(get_test)): async def test_delete(test=Depends(get_test)):
spawner.stop(test) spawner.stop(test)
await test.delete() await test.delete()
@router.get('/{test_id}/messages/') @router.get("/{test_id}/messages/")
async def test_message_read_all(test=Depends(get_test)): async def test_message_read_all(test=Depends(get_test)):
return spawner.get(test).messages return spawner.get(test).messages
@router.post('/{test_id}/messages/') @router.post("/{test_id}/messages/")
async def test_message_create(message: BodyMessage, test=Depends(get_test)): async def test_message_create(message: BodyMessage, test=Depends(get_test)):
await spawner.get(test).publish(Message(message.text)) await spawner.get(test).publish(Message(message.text))
return {} return {}

View file

@ -33,52 +33,52 @@ class TwitterBot(Censor):
async def run(self): async def run(self):
try: try:
if not self.twitter_model.verified: if not self.twitter_model.verified:
raise ValueError('Oauth Handshake not completed') raise ValueError("Oauth Handshake not completed")
self.client = PeonyClient( self.client = PeonyClient(
consumer_key=config['twitter']['consumer_key'], consumer_key=config["twitter"]["consumer_key"],
consumer_secret=config['twitter']['consumer_secret'], consumer_secret=config["twitter"]["consumer_secret"],
access_token=self.twitter_model.access_token, access_token=self.twitter_model.access_token,
access_token_secret=self.twitter_model.access_token_secret, access_token_secret=self.twitter_model.access_token_secret,
) )
if self.twitter_model.mentions_since_id is None: if self.twitter_model.mentions_since_id is None:
logger.debug('since_id is None in model, fetch newest mention id') logger.debug("since_id is None in model, fetch newest mention id")
await self._poll_mentions() await self._poll_mentions()
if self.twitter_model.dms_since_id is None: if self.twitter_model.dms_since_id is None:
logger.debug('since_id is None in model, fetch newest dm id') logger.debug("since_id is None in model, fetch newest dm id")
await self._poll_direct_messages() await self._poll_direct_messages()
user = await self.client.user user = await self.client.user
if user.screen_name: if user.screen_name:
await self.twitter_model.update(username=user.screen_name) await self.twitter_model.update(username=user.screen_name)
logger.debug( logger.debug(
'Starting Twitter bot: {0}'.format(self.twitter_model.__dict__) "Starting Twitter bot: {0}".format(self.twitter_model.__dict__)
) )
await gather(self.poll(), self.push()) await gather(self.poll(), self.push())
except CancelledError: except CancelledError:
logger.debug( logger.debug(
'Bot {0} received Cancellation.'.format(self.twitter_model.hood.name) "Bot {0} received Cancellation.".format(self.twitter_model.hood.name)
) )
except exceptions.Unauthorized: except exceptions.Unauthorized:
logger.debug( logger.debug(
'Bot {0} has invalid auth token.'.format(self.twitter_model.hood.name) "Bot {0} has invalid auth token.".format(self.twitter_model.hood.name)
) )
await self.twitter_model.update(enabled=False) await self.twitter_model.update(enabled=False)
self.enabled = self.twitter_model.enabled self.enabled = self.twitter_model.enabled
except (KeyError, ValueError, exceptions.NotAuthenticated): except (KeyError, ValueError, exceptions.NotAuthenticated):
logger.warning('Missing consumer_keys for Twitter in your configuration.') logger.warning("Missing consumer_keys for Twitter in your configuration.")
await self.twitter_model.update(enabled=False) await self.twitter_model.update(enabled=False)
self.enabled = self.twitter_model.enabled self.enabled = self.twitter_model.enabled
finally: finally:
logger.debug('Bot {0} stopped.'.format(self.twitter_model.hood.name)) logger.debug("Bot {0} stopped.".format(self.twitter_model.hood.name))
async def poll(self): async def poll(self):
while True: while True:
dms = await self._poll_direct_messages() dms = await self._poll_direct_messages()
logger.debug( logger.debug(
'Polled dms ({0}): {1}'.format(self.twitter_model.hood.name, str(dms)) "Polled dms ({0}): {1}".format(self.twitter_model.hood.name, str(dms))
) )
mentions = await self._poll_mentions() mentions = await self._poll_mentions()
logger.debug( logger.debug(
'Polled mentions ({0}): {1}'.format( "Polled mentions ({0}): {1}".format(
self.twitter_model.hood.name, str(mentions) self.twitter_model.hood.name, str(mentions)
) )
) )
@ -135,7 +135,7 @@ class TwitterBot(Censor):
remove_indices.update(range(url.indices[0], url.indices[1] + 1)) remove_indices.update(range(url.indices[0], url.indices[1] + 1))
for symbol in entities.symbols: for symbol in entities.symbols:
remove_indices.update(range(symbol.indices[0], symbol.indices[1] + 1)) remove_indices.update(range(symbol.indices[0], symbol.indices[1] + 1))
filtered_text = '' filtered_text = ""
for index, character in enumerate(text): for index, character in enumerate(text):
if index not in remove_indices: if index not in remove_indices:
filtered_text += character filtered_text += character
@ -145,11 +145,11 @@ class TwitterBot(Censor):
while True: while True:
message = await self.receive() message = await self.receive()
logger.debug( logger.debug(
'Received message from censor ({0}): {1}'.format( "Received message from censor ({0}): {1}".format(
self.twitter_model.hood.name, message.text self.twitter_model.hood.name, message.text
) )
) )
if hasattr(message, 'twitter_mention_id'): if hasattr(message, "twitter_mention_id"):
await self._retweet(message.twitter_mention_id) await self._retweet(message.twitter_mention_id)
else: else:
await self._post_tweet(message.text) await self._post_tweet(message.text)

View file

@ -20,4 +20,4 @@ class Twitter(Model):
enabled: Boolean() = False enabled: Boolean() = False
class Mapping(Mapping): class Mapping(Mapping):
table_name = 'twitterbots' table_name = "twitterbots"

View file

@ -36,9 +36,9 @@ twitter_callback_router = APIRouter()
@router.get( @router.get(
'/public', "/public",
# TODO response_model, # TODO response_model,
operation_id='get_twitters_public', operation_id="get_twitters_public",
) )
async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)): async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)):
twitterbots = await Twitter.objects.filter(hood=hood).all() twitterbots = await Twitter.objects.filter(hood=hood).all()
@ -50,28 +50,28 @@ async def twitter_read_all_public(hood=Depends(get_hood_unauthorized)):
@router.get( @router.get(
'/', "/",
# TODO response_model, # TODO response_model,
operation_id='get_twitters', operation_id="get_twitters",
) )
async def twitter_read_all(hood=Depends(get_hood)): async def twitter_read_all(hood=Depends(get_hood)):
return await Twitter.objects.filter(hood=hood).all() return await Twitter.objects.filter(hood=hood).all()
@router.get( @router.get(
'/{twitter_id}', "/{twitter_id}",
# TODO response_model # TODO response_model
operation_id='get_twitter', operation_id="get_twitter",
) )
async def twitter_read(twitter=Depends(get_twitter)): async def twitter_read(twitter=Depends(get_twitter)):
return twitter return twitter
@router.delete( @router.delete(
'/{twitter_id}', "/{twitter_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
# TODO response_model # TODO response_model
operation_id='delete_twitter', operation_id="delete_twitter",
) )
async def twitter_delete(twitter=Depends(get_twitter)): async def twitter_delete(twitter=Depends(get_twitter)):
spawner.stop(twitter) spawner.stop(twitter)
@ -80,20 +80,20 @@ async def twitter_delete(twitter=Depends(get_twitter)):
@router.get( @router.get(
'/{twitter_id}/status', "/{twitter_id}/status",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model # TODO response_model
operation_id='status_twitter', operation_id="status_twitter",
) )
async def twitter_status(twitter=Depends(get_twitter)): async def twitter_status(twitter=Depends(get_twitter)):
return {'status': spawner.get(twitter).status.name} return {"status": spawner.get(twitter).status.name}
@router.post( @router.post(
'/{twitter_id}/start', "/{twitter_id}/start",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model # TODO response_model
operation_id='start_twitter', operation_id="start_twitter",
) )
async def twitter_start(twitter=Depends(get_twitter)): async def twitter_start(twitter=Depends(get_twitter)):
await twitter.update(enabled=True) await twitter.update(enabled=True)
@ -102,10 +102,10 @@ async def twitter_start(twitter=Depends(get_twitter)):
@router.post( @router.post(
'/{twitter_id}/stop', "/{twitter_id}/stop",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
# TODO response_model # TODO response_model
operation_id='stop_twitter', operation_id="stop_twitter",
) )
async def twitter_stop(twitter=Depends(get_twitter)): async def twitter_stop(twitter=Depends(get_twitter)):
await twitter.update(enabled=False) await twitter.update(enabled=False)
@ -114,10 +114,10 @@ async def twitter_stop(twitter=Depends(get_twitter)):
@router.post( @router.post(
'/', "/",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
# TODO response_model # TODO response_model
operation_id='create_twitter', operation_id="create_twitter",
) )
async def twitter_create(response: Response, hood=Depends(get_hood)): async def twitter_create(response: Response, hood=Depends(get_hood)):
""" """
@ -129,20 +129,20 @@ async def twitter_create(response: Response, hood=Depends(get_hood)):
await corpse.delete() await corpse.delete()
# Create Twitter # Create Twitter
request_token = await get_oauth_token( request_token = await get_oauth_token(
config['twitter']['consumer_key'], config["twitter"]["consumer_key"],
config['twitter']['consumer_secret'], config["twitter"]["consumer_secret"],
callback_uri='{0}/dashboard/twitter-callback?hood={1}'.format( callback_uri="{0}/dashboard/twitter-callback?hood={1}".format(
config['frontend_url'], hood.id config["frontend_url"], hood.id
), ),
) )
if request_token['oauth_callback_confirmed'] != 'true': if request_token["oauth_callback_confirmed"] != "true":
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
twitter = await Twitter.objects.create( twitter = await Twitter.objects.create(
hood=hood, hood=hood,
access_token=request_token['oauth_token'], access_token=request_token["oauth_token"],
access_token_secret=request_token['oauth_token_secret'], access_token_secret=request_token["oauth_token_secret"],
) )
response.headers['Location'] = str(twitter.id) response.headers["Location"] = str(twitter.id)
return twitter return twitter
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -151,9 +151,9 @@ async def twitter_create(response: Response, hood=Depends(get_hood)):
@twitter_callback_router.get( @twitter_callback_router.get(
'/callback', "/callback",
# TODO response_model # TODO response_model
operation_id='callback_twitter', operation_id="callback_twitter",
) )
async def twitter_read_callback( async def twitter_read_callback(
oauth_token: str, oauth_verifier: str, hood=Depends(get_hood) oauth_token: str, oauth_verifier: str, hood=Depends(get_hood)
@ -161,15 +161,15 @@ async def twitter_read_callback(
try: try:
twitter = await Twitter.objects.filter(access_token=oauth_token).get() twitter = await Twitter.objects.filter(access_token=oauth_token).get()
access_token = await get_access_token( access_token = await get_access_token(
config['twitter']['consumer_key'], config["twitter"]["consumer_key"],
config['twitter']['consumer_secret'], config["twitter"]["consumer_secret"],
twitter.access_token, twitter.access_token,
twitter.access_token_secret, twitter.access_token_secret,
oauth_verifier, oauth_verifier,
) )
await twitter.update( await twitter.update(
access_token=access_token['oauth_token'], access_token=access_token["oauth_token"],
access_token_secret=access_token['oauth_token_secret'], access_token_secret=access_token["oauth_token_secret"],
verified=True, verified=True,
enabled=True, enabled=True,
) )

View file

@ -23,20 +23,20 @@ from kibicara.webapi.hoods.badwords import router as badwords_router
from kibicara.webapi.hoods.triggers import router as triggers_router from kibicara.webapi.hoods.triggers import router as triggers_router
router = APIRouter() router = APIRouter()
router.include_router(admin_router, prefix='/admin', tags=['admin']) router.include_router(admin_router, prefix="/admin", tags=["admin"])
hoods_router.include_router( hoods_router.include_router(
triggers_router, prefix='/{hood_id}/triggers', tags=['triggers'] triggers_router, prefix="/{hood_id}/triggers", tags=["triggers"]
) )
hoods_router.include_router( hoods_router.include_router(
badwords_router, prefix='/{hood_id}/badwords', tags=['badwords'] badwords_router, prefix="/{hood_id}/badwords", tags=["badwords"]
) )
hoods_router.include_router(test_router, prefix='/{hood_id}/test', tags=['test']) hoods_router.include_router(test_router, prefix="/{hood_id}/test", tags=["test"])
hoods_router.include_router( hoods_router.include_router(
telegram_router, prefix='/{hood_id}/telegram', tags=['telegram'] telegram_router, prefix="/{hood_id}/telegram", tags=["telegram"]
) )
hoods_router.include_router( hoods_router.include_router(
twitter_router, prefix='/{hood_id}/twitter', tags=['twitter'] twitter_router, prefix="/{hood_id}/twitter", tags=["twitter"]
) )
router.include_router(twitter_callback_router, prefix='/twitter', tags=['twitter']) router.include_router(twitter_callback_router, prefix="/twitter", tags=["twitter"])
hoods_router.include_router(email_router, prefix='/{hood_id}/email', tags=['email']) hoods_router.include_router(email_router, prefix="/{hood_id}/email", tags=["email"])
router.include_router(hoods_router, prefix='/hoods') router.include_router(hoods_router, prefix="/hoods")

View file

@ -37,10 +37,10 @@ class BodyEmail(BaseModel):
class BodyPassword(BaseModel): class BodyPassword(BaseModel):
password: str password: str
@validator('password') @validator("password")
def valid_password(cls, value): def valid_password(cls, value):
if len(value) < 8: if len(value) < 8:
raise ValueError('Password is too short') raise ValueError("Password is too short")
return value return value
@ -50,22 +50,22 @@ class BodyAdmin(BodyEmail, BodyPassword):
class BodyAccessToken(BaseModel): class BodyAccessToken(BaseModel):
access_token: str access_token: str
token_type: str = 'bearer' token_type: str = "bearer"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/admin/login') oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/login")
secret_box = SecretBox(bytes.fromhex(config['secret'])) secret_box = SecretBox(bytes.fromhex(config["secret"]))
def to_token(**kwargs): def to_token(**kwargs):
return secret_box.encrypt(dumps(kwargs), encoder=URLSafeBase64Encoder).decode( return secret_box.encrypt(dumps(kwargs), encoder=URLSafeBase64Encoder).decode(
'ascii' "ascii"
) )
def from_token(token): def from_token(token):
return loads( return loads(
secret_box.decrypt(token.encode('ascii'), encoder=URLSafeBase64Encoder) secret_box.decrypt(token.encode("ascii"), encoder=URLSafeBase64Encoder)
) )
@ -85,8 +85,8 @@ async def get_admin(access_token=Depends(oauth2_scheme)):
except (CryptoError, ValueError): except (CryptoError, ValueError):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid authentication credentials', detail="Invalid authentication credentials",
headers={'WWW-Authenticate': 'Bearer'}, headers={"WWW-Authenticate": "Bearer"},
) )
return admin return admin
@ -95,10 +95,10 @@ router = APIRouter()
@router.post( @router.post(
'/register/', "/register/",
status_code=status.HTTP_202_ACCEPTED, status_code=status.HTTP_202_ACCEPTED,
response_model=BaseModel, response_model=BaseModel,
operation_id='register', operation_id="register",
) )
async def admin_register(values: BodyAdmin): async def admin_register(values: BodyAdmin):
"""Sends an email with a confirmation link. """Sends an email with a confirmation link.
@ -107,28 +107,28 @@ async def admin_register(values: BodyAdmin):
- **password**: Password of new hood admin - **password**: Password of new hood admin
""" """
register_token = to_token(**values.__dict__) register_token = to_token(**values.__dict__)
logger.debug('register_token={0}'.format(register_token)) logger.debug("register_token={0}".format(register_token))
try: try:
admin = await Admin.objects.filter(email=values.email).all() admin = await Admin.objects.filter(email=values.email).all()
if admin: if admin:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
body = '{0}/confirm?token={1}'.format(config['frontend_url'], register_token) body = "{0}/confirm?token={1}".format(config["frontend_url"], register_token)
logger.debug(body) logger.debug(body)
email.send_email( email.send_email(
to=values.email, to=values.email,
subject='Confirm Account', subject="Confirm Account",
body=body, body=body,
) )
except (ConnectionRefusedError, SMTPException): except (ConnectionRefusedError, SMTPException):
logger.exception('Email sending failed') logger.exception("Email sending failed")
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY)
return {} return {}
@router.post( @router.post(
'/confirm/{register_token}', "/confirm/{register_token}",
response_model=BodyAccessToken, response_model=BodyAccessToken,
operation_id='confirm', operation_id="confirm",
) )
async def admin_confirm(register_token: str): async def admin_confirm(register_token: str):
"""Registration confirmation and account creation. """Registration confirmation and account creation.
@ -137,17 +137,17 @@ async def admin_confirm(register_token: str):
""" """
try: try:
values = from_token(register_token) values = from_token(register_token)
passhash = argon2.hash(values['password']) passhash = argon2.hash(values["password"])
await Admin.objects.create(email=values['email'], passhash=passhash) await Admin.objects.create(email=values["email"], passhash=passhash)
return BodyAccessToken(access_token=register_token) return BodyAccessToken(access_token=register_token)
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.post( @router.post(
'/login/', "/login/",
response_model=BodyAccessToken, response_model=BodyAccessToken,
operation_id='login', operation_id="login",
) )
async def admin_login(form_data: OAuth2PasswordRequestForm = Depends()): async def admin_login(form_data: OAuth2PasswordRequestForm = Depends()):
"""Get an access token. """Get an access token.
@ -160,17 +160,17 @@ async def admin_login(form_data: OAuth2PasswordRequestForm = Depends()):
except ValueError: except ValueError:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
detail='Incorrect email or password', detail="Incorrect email or password",
) )
token = to_token(email=form_data.username, password=form_data.password) token = to_token(email=form_data.username, password=form_data.password)
return BodyAccessToken(access_token=token) return BodyAccessToken(access_token=token)
@router.post( @router.post(
'/reset/', "/reset/",
status_code=status.HTTP_202_ACCEPTED, status_code=status.HTTP_202_ACCEPTED,
response_model=BaseModel, response_model=BaseModel,
operation_id='reset', operation_id="reset",
) )
async def admin_reset_password(values: BodyEmail): async def admin_reset_password(values: BodyEmail):
"""Sends an email with a password reset link. """Sends an email with a password reset link.
@ -179,41 +179,41 @@ async def admin_reset_password(values: BodyEmail):
- **password**: Password of new hood admin - **password**: Password of new hood admin
""" """
register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__) register_token = to_token(datetime=datetime.now().isoformat(), **values.__dict__)
logger.debug('register_token={0}'.format(register_token)) logger.debug("register_token={0}".format(register_token))
try: try:
admin = await Admin.objects.filter(email=values.email).all() admin = await Admin.objects.filter(email=values.email).all()
if not admin: if not admin:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
body = '{0}/password-reset?token={1}'.format( body = "{0}/password-reset?token={1}".format(
config['frontend_url'], register_token config["frontend_url"], register_token
) )
logger.debug(body) logger.debug(body)
email.send_email( email.send_email(
to=values.email, to=values.email,
subject='Reset your password', subject="Reset your password",
body=body, body=body,
) )
except (ConnectionRefusedError, SMTPException): except (ConnectionRefusedError, SMTPException):
logger.exception('Email sending failed') logger.exception("Email sending failed")
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY) raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY)
return {} return {}
@router.post( @router.post(
'/reset/{reset_token}', "/reset/{reset_token}",
response_model=BodyAccessToken, response_model=BodyAccessToken,
operation_id='confirm_reset', operation_id="confirm_reset",
) )
async def admin_confirm_reset(reset_token: str, values: BodyPassword): async def admin_confirm_reset(reset_token: str, values: BodyPassword):
try: try:
token_values = from_token(reset_token) token_values = from_token(reset_token)
if ( if (
datetime.fromisoformat(token_values['datetime']) + timedelta(hours=3) datetime.fromisoformat(token_values["datetime"]) + timedelta(hours=3)
< datetime.now() < datetime.now()
): ):
raise HTTPException(status_code=status.HTTP_410_GONE) raise HTTPException(status_code=status.HTTP_410_GONE)
passhash = argon2.hash(values.password) passhash = argon2.hash(values.password)
admins = await Admin.objects.filter(email=token_values['email']).all() admins = await Admin.objects.filter(email=token_values["email"]).all()
if len(admins) != 1: if len(admins) != 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
await admins[0].update(passhash=passhash) await admins[0].update(passhash=passhash)
@ -225,21 +225,21 @@ async def admin_confirm_reset(reset_token: str, values: BodyPassword):
@router.get( @router.get(
'/hoods/', "/hoods/",
# TODO response_model, # TODO response_model,
operation_id='get_hoods_admin', operation_id="get_hoods_admin",
) )
async def admin_hood_read_all(admin=Depends(get_admin)): async def admin_hood_read_all(admin=Depends(get_admin)):
"""Get a list of all hoods of a given admin.""" """Get a list of all hoods of a given admin."""
return ( return (
await AdminHoodRelation.objects.select_related('hood').filter(admin=admin).all() await AdminHoodRelation.objects.select_related("hood").filter(admin=admin).all()
) )
@router.get( @router.get(
'/', "/",
# TODO response_model, # TODO response_model,
operation_id='get_admin', operation_id="get_admin",
) )
async def admin_read(admin=Depends(get_admin)): async def admin_read(admin=Depends(get_admin)):
"""Get a list of all hoods of a given admin.""" """Get a list of all hoods of a given admin."""
@ -250,17 +250,17 @@ async def admin_read(admin=Depends(get_admin)):
@router.delete( @router.delete(
'/', "/",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='delete_admin', operation_id="delete_admin",
) )
async def admin_delete(admin=Depends(get_admin)): async def admin_delete(admin=Depends(get_admin)):
hood_relations = ( hood_relations = (
await AdminHoodRelation.objects.select_related('hood').filter(admin=admin).all() await AdminHoodRelation.objects.select_related("hood").filter(admin=admin).all()
) )
for hood in hood_relations: for hood in hood_relations:
admins = ( admins = (
await AdminHoodRelation.objects.select_related('admin') await AdminHoodRelation.objects.select_related("admin")
.filter(hood=hood.id) .filter(hood=hood.id)
.all() .all()
) )

View file

@ -20,9 +20,9 @@ from kibicara.webapi.utils import delete_hood
class BodyHood(BaseModel): class BodyHood(BaseModel):
name: str name: str
landingpage: str = ''' landingpage: str = """
Default Landing Page Default Landing Page
''' """
async def get_hood_unauthorized(hood_id: int): async def get_hood_unauthorized(hood_id: int):
@ -39,7 +39,7 @@ async def get_hood(hood=Depends(get_hood_unauthorized), admin=Depends(get_admin)
except NoMatch: except NoMatch:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
headers={'WWW-Authenticate': 'Bearer'}, headers={"WWW-Authenticate": "Bearer"},
) )
return hood return hood
@ -48,10 +48,10 @@ router = APIRouter()
@router.get( @router.get(
'/', "/",
# TODO response_model, # TODO response_model,
operation_id='get_hoods', operation_id="get_hoods",
tags=['hoods'], tags=["hoods"],
) )
async def hood_read_all(): async def hood_read_all():
"""Get all existing hoods.""" """Get all existing hoods."""
@ -59,11 +59,11 @@ async def hood_read_all():
@router.post( @router.post(
'/', "/",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
# TODO response_model, # TODO response_model,
operation_id='create_hood', operation_id="create_hood",
tags=['hoods'], tags=["hoods"],
) )
async def hood_create(values: BodyHood, response: Response, admin=Depends(get_admin)): async def hood_create(values: BodyHood, response: Response, admin=Depends(get_admin)):
"""Creates a hood. """Creates a hood.
@ -77,19 +77,19 @@ async def hood_create(values: BodyHood, response: Response, admin=Depends(get_ad
spawner.start(hood) spawner.start(hood)
# Initialize Triggers to match all # Initialize Triggers to match all
await Trigger.objects.create(hood=hood, pattern='.') await Trigger.objects.create(hood=hood, pattern=".")
response.headers['Location'] = str(hood.id) response.headers["Location"] = str(hood.id)
return hood return hood
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.get( @router.get(
'/{hood_id}', "/{hood_id}",
# TODO response_model, # TODO response_model,
operation_id='get_hood', operation_id="get_hood",
tags=['hoods'], tags=["hoods"],
) )
async def hood_read(hood=Depends(get_hood_unauthorized)): async def hood_read(hood=Depends(get_hood_unauthorized)):
"""Get hood with id **hood_id**.""" """Get hood with id **hood_id**."""
@ -97,10 +97,10 @@ async def hood_read(hood=Depends(get_hood_unauthorized)):
@router.put( @router.put(
'/{hood_id}', "/{hood_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='update_hood', operation_id="update_hood",
tags=['hoods'], tags=["hoods"],
) )
async def hood_update(values: BodyHood, hood=Depends(get_hood)): async def hood_update(values: BodyHood, hood=Depends(get_hood)):
"""Updates hood with id **hood_id**. """Updates hood with id **hood_id**.
@ -113,10 +113,10 @@ async def hood_update(values: BodyHood, hood=Depends(get_hood)):
@router.delete( @router.delete(
'/{hood_id}', "/{hood_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='delete_hood', operation_id="delete_hood",
tags=['hoods'], tags=["hoods"],
) )
async def hood_delete(hood=Depends(get_hood)): async def hood_delete(hood=Depends(get_hood)):
"""Deletes hood with id **hood_id**.""" """Deletes hood with id **hood_id**."""

View file

@ -38,9 +38,9 @@ router = APIRouter()
@router.get( @router.get(
'/', "/",
# TODO response_model, # TODO response_model,
operation_id='get_badwords', operation_id="get_badwords",
) )
async def badword_read_all(hood=Depends(get_hood)): async def badword_read_all(hood=Depends(get_hood)):
"""Get all badwords of hood with id **hood_id**.""" """Get all badwords of hood with id **hood_id**."""
@ -48,10 +48,10 @@ async def badword_read_all(hood=Depends(get_hood)):
@router.post( @router.post(
'/', "/",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
# TODO response_model, # TODO response_model,
operation_id='create_badword', operation_id="create_badword",
) )
async def badword_create( async def badword_create(
values: BodyBadWord, response: Response, hood=Depends(get_hood) values: BodyBadWord, response: Response, hood=Depends(get_hood)
@ -63,7 +63,7 @@ async def badword_create(
try: try:
regex_compile(values.pattern) regex_compile(values.pattern)
badword = await BadWord.objects.create(hood=hood, **values.__dict__) badword = await BadWord.objects.create(hood=hood, **values.__dict__)
response.headers['Location'] = str(badword.id) response.headers["Location"] = str(badword.id)
return badword return badword
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -72,9 +72,9 @@ async def badword_create(
@router.get( @router.get(
'/{badword_id}', "/{badword_id}",
# TODO response_model, # TODO response_model,
operation_id='get_badword', operation_id="get_badword",
) )
async def badword_read(badword=Depends(get_badword)): async def badword_read(badword=Depends(get_badword)):
"""Reads badword with id **badword_id** for hood with id **hood_id**.""" """Reads badword with id **badword_id** for hood with id **hood_id**."""
@ -82,9 +82,9 @@ async def badword_read(badword=Depends(get_badword)):
@router.put( @router.put(
'/{badword_id}', "/{badword_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='update_badword', operation_id="update_badword",
) )
async def badword_update(values: BodyBadWord, badword=Depends(get_badword)): async def badword_update(values: BodyBadWord, badword=Depends(get_badword)):
"""Updates badword with id **badword_id** for hood with id **hood_id**. """Updates badword with id **badword_id** for hood with id **hood_id**.
@ -96,9 +96,9 @@ async def badword_update(values: BodyBadWord, badword=Depends(get_badword)):
@router.delete( @router.delete(
'/{badword_id}', "/{badword_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='delete_badword', operation_id="delete_badword",
) )
async def badword_delete(badword=Depends(get_badword)): async def badword_delete(badword=Depends(get_badword)):
"""Deletes badword with id **badword_id** for hood with id **hood_id**.""" """Deletes badword with id **badword_id** for hood with id **hood_id**."""

View file

@ -39,9 +39,9 @@ router = APIRouter()
@router.get( @router.get(
'/', "/",
# TODO response_model, # TODO response_model,
operation_id='get_triggers', operation_id="get_triggers",
) )
async def trigger_read_all(hood=Depends(get_hood)): async def trigger_read_all(hood=Depends(get_hood)):
"""Get all triggers of hood with id **hood_id**.""" """Get all triggers of hood with id **hood_id**."""
@ -49,10 +49,10 @@ async def trigger_read_all(hood=Depends(get_hood)):
@router.post( @router.post(
'/', "/",
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
# TODO response_model, # TODO response_model,
operation_id='create_trigger', operation_id="create_trigger",
) )
async def trigger_create( async def trigger_create(
values: BodyTrigger, response: Response, hood=Depends(get_hood) values: BodyTrigger, response: Response, hood=Depends(get_hood)
@ -64,7 +64,7 @@ async def trigger_create(
try: try:
regex_compile(values.pattern) regex_compile(values.pattern)
trigger = await Trigger.objects.create(hood=hood, **values.__dict__) trigger = await Trigger.objects.create(hood=hood, **values.__dict__)
response.headers['Location'] = str(trigger.id) response.headers["Location"] = str(trigger.id)
return trigger return trigger
except IntegrityError: except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -73,9 +73,9 @@ async def trigger_create(
@router.get( @router.get(
'/{trigger_id}', "/{trigger_id}",
# TODO response_model, # TODO response_model,
operation_id='get_trigger', operation_id="get_trigger",
) )
async def trigger_read(trigger=Depends(get_trigger)): async def trigger_read(trigger=Depends(get_trigger)):
"""Reads trigger with id **trigger_id** for hood with id **hood_id**.""" """Reads trigger with id **trigger_id** for hood with id **hood_id**."""
@ -83,9 +83,9 @@ async def trigger_read(trigger=Depends(get_trigger)):
@router.put( @router.put(
'/{trigger_id}', "/{trigger_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='update_trigger', operation_id="update_trigger",
) )
async def trigger_update(values: BodyTrigger, trigger=Depends(get_trigger)): async def trigger_update(values: BodyTrigger, trigger=Depends(get_trigger)):
"""Updates trigger with id **trigger_id** for hood with id **hood_id**. """Updates trigger with id **trigger_id** for hood with id **hood_id**.
@ -97,9 +97,9 @@ async def trigger_update(values: BodyTrigger, trigger=Depends(get_trigger)):
@router.delete( @router.delete(
'/{trigger_id}', "/{trigger_id}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
operation_id='delete_trigger', operation_id="delete_trigger",
) )
async def trigger_delete(trigger=Depends(get_trigger)): async def trigger_delete(trigger=Depends(get_trigger)):
"""Deletes trigger with id **trigger_id** for hood with id **hood_id**.""" """Deletes trigger with id **trigger_id** for hood with id **hood_id**."""

View file

@ -15,16 +15,16 @@ from kibicara.model import Mapping
from kibicara.webapi import router from kibicara.webapi import router
@fixture(scope='module') @fixture(scope="module")
def client(): def client():
Mapping.drop_all() Mapping.drop_all()
Mapping.create_all() Mapping.create_all()
app = FastAPI() app = FastAPI()
app.include_router(router, prefix='/api') app.include_router(router, prefix="/api")
return TestClient(app) return TestClient(app)
@fixture(scope='module') @fixture(scope="module")
def monkeymodule(): def monkeymodule():
from _pytest.monkeypatch import MonkeyPatch from _pytest.monkeypatch import MonkeyPatch
@ -33,96 +33,96 @@ def monkeymodule():
mpatch.undo() mpatch.undo()
@fixture(scope='module') @fixture(scope="module")
def receive_email(monkeymodule): def receive_email(monkeymodule):
mailbox = [] mailbox = []
def mock_send_email(to, subject, sender='kibicara', body=''): def mock_send_email(to, subject, sender="kibicara", body=""):
mailbox.append(dict(to=to, subject=subject, sender=sender, body=body)) mailbox.append(dict(to=to, subject=subject, sender=sender, body=body))
def mock_receive_email(): def mock_receive_email():
return mailbox.pop() return mailbox.pop()
monkeymodule.setattr(email, 'send_email', mock_send_email) monkeymodule.setattr(email, "send_email", mock_send_email)
return mock_receive_email return mock_receive_email
@fixture(scope='module') @fixture(scope="module")
def register_token(client, receive_email): def register_token(client, receive_email):
response = client.post( response = client.post(
'/api/admin/register/', json={'email': 'user', 'password': 'password'} "/api/admin/register/", json={"email": "user", "password": "password"}
) )
assert response.status_code == status.HTTP_202_ACCEPTED assert response.status_code == status.HTTP_202_ACCEPTED
return urlparse(receive_email()['body']).query.split('=', 1)[1] return urlparse(receive_email()["body"]).query.split("=", 1)[1]
@fixture(scope='module') @fixture(scope="module")
def register_confirmed(client, register_token): def register_confirmed(client, register_token):
response = client.post('/api/admin/confirm/{0}'.format(register_token)) response = client.post("/api/admin/confirm/{0}".format(register_token))
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
@fixture(scope='module') @fixture(scope="module")
def access_token(client, register_confirmed): def access_token(client, register_confirmed):
response = client.post( response = client.post(
'/api/admin/login/', data={'username': 'user', 'password': 'password'} "/api/admin/login/", data={"username": "user", "password": "password"}
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
return response.json()['access_token'] return response.json()["access_token"]
@fixture(scope='module') @fixture(scope="module")
def auth_header(access_token): def auth_header(access_token):
return {'Authorization': 'Bearer {0}'.format(access_token)} return {"Authorization": "Bearer {0}".format(access_token)}
@fixture(scope='function') @fixture(scope="function")
def hood_id(client, auth_header): def hood_id(client, auth_header):
response = client.post('/api/hoods/', json={'name': 'hood'}, headers=auth_header) response = client.post("/api/hoods/", json={"name": "hood"}, headers=auth_header)
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
hood_id = int(response.headers['Location']) hood_id = int(response.headers["Location"])
yield hood_id yield hood_id
client.delete('/api/hoods/{0}'.format(hood_id), headers=auth_header) client.delete("/api/hoods/{0}".format(hood_id), headers=auth_header)
@fixture(scope='function') @fixture(scope="function")
def trigger_id(client, hood_id, auth_header): def trigger_id(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/{0}/triggers/'.format(hood_id), "/api/hoods/{0}/triggers/".format(hood_id),
json={'pattern': 'test'}, json={"pattern": "test"},
headers=auth_header, headers=auth_header,
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
trigger_id = int(response.headers['Location']) trigger_id = int(response.headers["Location"])
yield trigger_id yield trigger_id
client.delete( client.delete(
'/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id), headers=auth_header "/api/hoods/{0}/triggers/{1}".format(hood_id, trigger_id), headers=auth_header
) )
@fixture(scope='function') @fixture(scope="function")
def badword_id(client, hood_id, auth_header): def badword_id(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/{0}/badwords/'.format(hood_id), "/api/hoods/{0}/badwords/".format(hood_id),
json={'pattern': ''}, json={"pattern": ""},
headers=auth_header, headers=auth_header,
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
badword_id = int(response.headers['Location']) badword_id = int(response.headers["Location"])
yield badword_id yield badword_id
client.delete( client.delete(
'/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id), headers=auth_header "/api/hoods/{0}/badwords/{1}".format(hood_id, badword_id), headers=auth_header
) )
@fixture(scope='function') @fixture(scope="function")
def test_id(client, hood_id, auth_header): def test_id(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/{0}/test/'.format(hood_id), json={}, headers=auth_header "/api/hoods/{0}/test/".format(hood_id), json={}, headers=auth_header
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
test_id = int(response.headers['Location']) test_id = int(response.headers["Location"])
yield test_id yield test_id
client.delete( client.delete(
'/api/hoods/{0}/test/{1}'.format(hood_id, test_id), headers=auth_header "/api/hoods/{0}/test/{1}".format(hood_id, test_id), headers=auth_header
) )

View file

@ -6,10 +6,10 @@ from fastapi import status
def test_hoods_unauthorized(client): def test_hoods_unauthorized(client):
response = client.get('/api/admin/hoods/') response = client.get("/api/admin/hoods/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_hoods_success(client, auth_header): def test_hoods_success(client, auth_header):
response = client.get('/api/admin/hoods/', headers=auth_header) response = client.get("/api/admin/hoods/", headers=auth_header)
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK

View file

@ -8,75 +8,75 @@ from fastapi import status
def test_hood_read_all(client): def test_hood_read_all(client):
response = client.get('/api/hoods/') response = client.get("/api/hoods/")
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
def test_hood_create_unauthorized(client, hood_id): def test_hood_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/') response = client.post("/api/hoods/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_hood_read(client, hood_id): def test_hood_read(client, hood_id):
response = client.get('/api/hoods/{0}'.format(hood_id)) response = client.get("/api/hoods/{0}".format(hood_id))
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
def test_hood_update_unauthorized(client, hood_id): def test_hood_update_unauthorized(client, hood_id):
response = client.put('/api/hoods/{0}'.format(hood_id)) response = client.put("/api/hoods/{0}".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_hood_delete_unauthorized(client, hood_id): def test_hood_delete_unauthorized(client, hood_id):
response = client.delete('/api/hoods/{0}'.format(hood_id)) response = client.delete("/api/hoods/{0}".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_read_all_unauthorized(client, hood_id): def test_trigger_read_all_unauthorized(client, hood_id):
response = client.get('/api/hoods/{0}/triggers/'.format(hood_id)) response = client.get("/api/hoods/{0}/triggers/".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_create_unauthorized(client, hood_id): def test_trigger_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/{0}/triggers/'.format(hood_id)) response = client.post("/api/hoods/{0}/triggers/".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_read_unauthorized(client, hood_id, trigger_id): def test_trigger_read_unauthorized(client, hood_id, trigger_id):
response = client.get('/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id)) response = client.get("/api/hoods/{0}/triggers/{1}".format(hood_id, trigger_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_update_unauthorized(client, hood_id, trigger_id): def test_trigger_update_unauthorized(client, hood_id, trigger_id):
response = client.put('/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id)) response = client.put("/api/hoods/{0}/triggers/{1}".format(hood_id, trigger_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_trigger_delete_unauthorized(client, hood_id, trigger_id): def test_trigger_delete_unauthorized(client, hood_id, trigger_id):
response = client.delete('/api/hoods/{0}/triggers/{1}'.format(hood_id, trigger_id)) response = client.delete("/api/hoods/{0}/triggers/{1}".format(hood_id, trigger_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_read_all_unauthorized(client, hood_id): def test_badword_read_all_unauthorized(client, hood_id):
response = client.get('/api/hoods/{0}/badwords/'.format(hood_id)) response = client.get("/api/hoods/{0}/badwords/".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_create_unauthorized(client, hood_id): def test_badword_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/{0}/badwords/'.format(hood_id)) response = client.post("/api/hoods/{0}/badwords/".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_read_unauthorized(client, hood_id, badword_id): def test_badword_read_unauthorized(client, hood_id, badword_id):
response = client.get('/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id)) response = client.get("/api/hoods/{0}/badwords/{1}".format(hood_id, badword_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_update_unauthorized(client, hood_id, badword_id): def test_badword_update_unauthorized(client, hood_id, badword_id):
response = client.put('/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id)) response = client.put("/api/hoods/{0}/badwords/{1}".format(hood_id, badword_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_badword_delete_unauthorized(client, hood_id, badword_id): def test_badword_delete_unauthorized(client, hood_id, badword_id):
response = client.delete('/api/hoods/{0}/badwords/{1}'.format(hood_id, badword_id)) response = client.delete("/api/hoods/{0}/badwords/{1}".format(hood_id, badword_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -9,16 +9,16 @@ from fastapi import status
from pytest import fixture from pytest import fixture
@fixture(scope='function') @fixture(scope="function")
def email_row(client, hood_id, auth_header): def email_row(client, hood_id, auth_header):
response = client.post( response = client.post(
'/api/hoods/{0}/email/'.format(hood_id), "/api/hoods/{0}/email/".format(hood_id),
json={'name': 'kibicara-test'}, json={"name": "kibicara-test"},
headers=auth_header, headers=auth_header,
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
email_id = int(response.headers['Location']) email_id = int(response.headers["Location"])
yield response.json() yield response.json()
client.delete( client.delete(
'/api/hoods/{0}/email/{1}'.format(hood_id, email_id), headers=auth_header "/api/hoods/{0}/email/{1}".format(hood_id, email_id), headers=auth_header
) )

View file

@ -15,49 +15,49 @@ from kibicara.webapi.admin import to_token
def test_email_subscribe_unsubscribe(client, hood_id, receive_email): def test_email_subscribe_unsubscribe(client, hood_id, receive_email):
response = client.post( response = client.post(
'/api/hoods/{0}/email/subscribe/'.format(hood_id), "/api/hoods/{0}/email/subscribe/".format(hood_id),
json={'email': 'test@localhost'}, json={"email": "test@localhost"},
) )
assert response.status_code == status.HTTP_202_ACCEPTED assert response.status_code == status.HTTP_202_ACCEPTED
mail = receive_email() mail = receive_email()
body = mail['body'] body = mail["body"]
confirm_url = findall( confirm_url = findall(
r'http[s]?://' r"http[s]?://"
+ r'(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', + r"(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+",
body, body,
)[0] )[0]
start = len('token=') start = len("token=")
response = client.post( response = client.post(
'/api/hoods/{0}/email/subscribe/confirm/{1}'.format( "/api/hoods/{0}/email/subscribe/confirm/{1}".format(
hood_id, urlparse(confirm_url).query[start:] hood_id, urlparse(confirm_url).query[start:]
) )
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
response = client.post( response = client.post(
'/api/hoods/{0}/email/subscribe/confirm/{1}'.format( "/api/hoods/{0}/email/subscribe/confirm/{1}".format(
hood_id, urlparse(confirm_url).query[start:] hood_id, urlparse(confirm_url).query[start:]
) )
) )
assert response.status_code == status.HTTP_409_CONFLICT assert response.status_code == status.HTTP_409_CONFLICT
token = to_token(email=mail['to'], hood=hood_id) token = to_token(email=mail["to"], hood=hood_id)
response = client.delete( response = client.delete(
'/api/hoods/{0}/email/unsubscribe/{1}'.format(hood_id, token) "/api/hoods/{0}/email/unsubscribe/{1}".format(hood_id, token)
) )
assert response.status_code == status.HTTP_204_NO_CONTENT assert response.status_code == status.HTTP_204_NO_CONTENT
def test_email_message(client, hood_id, trigger_id, email_row): def test_email_message(client, hood_id, trigger_id, email_row):
body = { body = {
'text': 'test', "text": "test",
'author': 'test@localhost', "author": "test@localhost",
'secret': email_row['secret'], "secret": email_row["secret"],
} }
response = client.post('/api/hoods/{0}/email/messages/'.format(hood_id), json=body) response = client.post("/api/hoods/{0}/email/messages/".format(hood_id), json=body)
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
def test_email_send_mda(trigger_id, email_row): def test_email_send_mda(trigger_id, email_row):
skip('Only works if kibicara is listening on port 8000, and only sometimes') skip("Only works if kibicara is listening on port 8000, and only sometimes")
mail = """From test@example.com Tue Jun 16 15:33:19 2020 mail = """From test@example.com Tue Jun 16 15:33:19 2020
Return-path: <test@example.com> Return-path: <test@example.com>
Envelope-to: hood@localhost Envelope-to: hood@localhost
@ -85,6 +85,6 @@ test
--AqNPlAX243a8sip3B7kXv8UKD8wuti-- --AqNPlAX243a8sip3B7kXv8UKD8wuti--
""" """
proc = subprocess.run( proc = subprocess.run(
['kibicara_mda', 'hood'], stdout=subprocess.PIPE, input=mail, encoding='ascii' ["kibicara_mda", "hood"], stdout=subprocess.PIPE, input=mail, encoding="ascii"
) )
assert proc.returncode == 0 assert proc.returncode == 0

View file

@ -8,18 +8,18 @@ from fastapi import status
def test_email_create_unauthorized(client, hood_id): def test_email_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/{0}/email/'.format(hood_id)) response = client.post("/api/hoods/{0}/email/".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_email_delete_unauthorized(client, hood_id, email_row): def test_email_delete_unauthorized(client, hood_id, email_row):
response = client.delete( response = client.delete(
'/api/hoods/{0}/email/{1}'.format(hood_id, email_row['id']) "/api/hoods/{0}/email/{1}".format(hood_id, email_row["id"])
) )
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_email_message_unauthorized(client, hood_id, email_row): def test_email_message_unauthorized(client, hood_id, email_row):
body = {'text': 'test', 'author': 'author', 'secret': 'wrong'} body = {"text": "test", "author": "author", "secret": "wrong"}
response = client.post('/api/hoods/{0}/email/messages/'.format(hood_id), json=body) response = client.post("/api/hoods/{0}/email/messages/".format(hood_id), json=body)
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -8,15 +8,15 @@ from nacl.exceptions import CryptoError
def test_email_subscribe_empty(client, hood_id): def test_email_subscribe_empty(client, hood_id):
response = client.post('/api/hoods/{0}/email/subscribe/'.format(hood_id)) response = client.post("/api/hoods/{0}/email/subscribe/".format(hood_id))
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_email_subscribe_confirm_wrong_token(client, hood_id): def test_email_subscribe_confirm_wrong_token(client, hood_id):
try: try:
response = client.post( response = client.post(
'/api/hoods/{0}/email/subscribe/confirm/'.format(hood_id) "/api/hoods/{0}/email/subscribe/confirm/".format(hood_id)
+ 'asdfasdfasdfasdfasdfasdfasdfasdf' + "asdfasdfasdfasdfasdfasdfasdfasdf"
) )
assert response.status_code is not status.HTTP_201_CREATED assert response.status_code is not status.HTTP_201_CREATED
except CryptoError: except CryptoError:
@ -25,25 +25,25 @@ def test_email_subscribe_confirm_wrong_token(client, hood_id):
def test_email_subscribe_confirm_wrong_hood(client): def test_email_subscribe_confirm_wrong_hood(client):
response = client.delete( response = client.delete(
'/api/hoods/99999/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf' "/api/hoods/99999/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf"
) )
assert response.json()['detail'] == 'Not Found' assert response.json()["detail"] == "Not Found"
def test_email_message_wrong(client, hood_id, email_row): def test_email_message_wrong(client, hood_id, email_row):
body = { body = {
'text': '', "text": "",
'author': 'test@localhost', "author": "test@localhost",
'secret': email_row['secret'], "secret": email_row["secret"],
} }
response = client.post('/api/hoods/{0}/email/messages/'.format(hood_id), json=body) response = client.post("/api/hoods/{0}/email/messages/".format(hood_id), json=body)
assert response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS assert response.status_code == status.HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS
def test_email_unsubscribe_wrong_token(client, hood_id): def test_email_unsubscribe_wrong_token(client, hood_id):
try: try:
client.delete( client.delete(
'/api/hoods/{0}/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf'.format( "/api/hoods/{0}/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf".format(
hood_id hood_id
) )
) )
@ -53,6 +53,6 @@ def test_email_unsubscribe_wrong_token(client, hood_id):
def test_email_unsubscribe_wrong_hood(client): def test_email_unsubscribe_wrong_hood(client):
response = client.delete( response = client.delete(
'/api/hoods/99999/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf' "/api/hoods/99999/email/unsubscribe/asdfasdfasdfasdfasdfasdfasdfasdf"
) )
assert response.json()['detail'] == 'Not Found' assert response.json()["detail"] == "Not Found"

View file

@ -9,13 +9,13 @@ from kibicara.model import Hood
from kibicara.platforms.telegram.model import Telegram from kibicara.platforms.telegram.model import Telegram
@fixture(scope='function') @fixture(scope="function")
def telegram(event_loop, hood_id, bot): def telegram(event_loop, hood_id, bot):
hood = event_loop.run_until_complete(Hood.objects.get(id=hood_id)) hood = event_loop.run_until_complete(Hood.objects.get(id=hood_id))
return event_loop.run_until_complete( return event_loop.run_until_complete(
Telegram.objects.create( Telegram.objects.create(
hood=hood, hood=hood,
api_token=bot['api_token'], api_token=bot["api_token"],
welcome_message=bot['welcome_message'], welcome_message=bot["welcome_message"],
) )
) )

View file

@ -10,16 +10,16 @@ from kibicara.platforms import telegram
from kibicara.platforms.telegram.model import Telegram from kibicara.platforms.telegram.model import Telegram
@fixture(scope='function') @fixture(scope="function")
def disable_spawner(monkeypatch): def disable_spawner(monkeypatch):
class DoNothing: class DoNothing:
def start(self, bot): def start(self, bot):
assert bot is not None assert bot is not None
monkeypatch.setattr(telegram.webapi, 'spawner', DoNothing()) monkeypatch.setattr(telegram.webapi, "spawner", DoNothing())
@mark.parametrize('body', [{'api_token': 'string', 'welcome_message': 'string'}]) @mark.parametrize("body", [{"api_token": "string", "welcome_message": "string"}])
def test_telegram_create_bot( def test_telegram_create_bot(
event_loop, event_loop,
client, client,
@ -32,27 +32,27 @@ def test_telegram_create_bot(
def check_token_mock(token): def check_token_mock(token):
return True return True
monkeypatch.setattr(telegram.webapi, 'check_token', check_token_mock) monkeypatch.setattr(telegram.webapi, "check_token", check_token_mock)
response = client.post( response = client.post(
'/api/hoods/{0}/telegram/'.format(hood_id), "/api/hoods/{0}/telegram/".format(hood_id),
json=body, json=body,
headers=auth_header, headers=auth_header,
) )
assert response.status_code == status.HTTP_201_CREATED assert response.status_code == status.HTTP_201_CREATED
bot_id = response.json()['id'] bot_id = response.json()["id"]
telegram_obj = event_loop.run_until_complete(Telegram.objects.get(id=bot_id)) telegram_obj = event_loop.run_until_complete(Telegram.objects.get(id=bot_id))
assert response.json()['api_token'] == body['api_token'] == telegram_obj.api_token assert response.json()["api_token"] == body["api_token"] == telegram_obj.api_token
assert ( assert (
response.json()['welcome_message'] response.json()["welcome_message"]
== body['welcome_message'] == body["welcome_message"]
== telegram_obj.welcome_message == telegram_obj.welcome_message
) )
assert response.json()['hood']['id'] == telegram_obj.hood.id assert response.json()["hood"]["id"] == telegram_obj.hood.id
assert telegram_obj.enabled assert telegram_obj.enabled
@mark.parametrize('body', [{'api_token': 'string', 'welcome_message': 'string'}]) @mark.parametrize("body", [{"api_token": "string", "welcome_message": "string"}])
def test_telegram_invalid_api_token( def test_telegram_invalid_api_token(
event_loop, event_loop,
client, client,
@ -63,7 +63,7 @@ def test_telegram_invalid_api_token(
body, body,
): ):
response = client.post( response = client.post(
'/api/hoods/{0}/telegram/'.format(hood_id), "/api/hoods/{0}/telegram/".format(hood_id),
json=body, json=body,
headers=auth_header, headers=auth_header,
) )
@ -71,12 +71,12 @@ def test_telegram_invalid_api_token(
def test_telegram_create_telegram_invalid_id(client, auth_header): def test_telegram_create_telegram_invalid_id(client, auth_header):
response = client.post('/api/hoods/1337/telegram/', headers=auth_header) response = client.post("/api/hoods/1337/telegram/", headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.post('/api/hoods/wrong/telegram/', headers=auth_header) response = client.post("/api/hoods/wrong/telegram/", headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_telegram_create_unauthorized(client, hood_id): def test_telegram_create_unauthorized(client, hood_id):
response = client.post('/api/hoods/{hood_id}/telegram/') response = client.post("/api/hoods/{hood_id}/telegram/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -10,7 +10,7 @@ from pytest import mark, raises
from kibicara.platforms.telegram.model import Telegram, TelegramUser from kibicara.platforms.telegram.model import Telegram, TelegramUser
@mark.parametrize('bot', [{'api_token': 'apitoken123', 'welcome_message': 'msg'}]) @mark.parametrize("bot", [{"api_token": "apitoken123", "welcome_message": "msg"}])
def test_telegram_delete_bot(client, event_loop, bot, telegram, auth_header): def test_telegram_delete_bot(client, event_loop, bot, telegram, auth_header):
event_loop.run_until_complete( event_loop.run_until_complete(
TelegramUser.objects.create(user_id=1234, bot=telegram.id) TelegramUser.objects.create(user_id=1234, bot=telegram.id)
@ -19,7 +19,7 @@ def test_telegram_delete_bot(client, event_loop, bot, telegram, auth_header):
TelegramUser.objects.create(user_id=5678, bot=telegram.id) TelegramUser.objects.create(user_id=5678, bot=telegram.id)
) )
response = client.delete( response = client.delete(
'/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id), "/api/hoods/{0}/telegram/{1}".format(telegram.hood.id, telegram.id),
headers=auth_header, headers=auth_header,
) )
assert response.status_code == status.HTTP_204_NO_CONTENT assert response.status_code == status.HTTP_204_NO_CONTENT
@ -30,23 +30,23 @@ def test_telegram_delete_bot(client, event_loop, bot, telegram, auth_header):
def test_telegram_delete_bot_invalid_id(client, auth_header, hood_id): def test_telegram_delete_bot_invalid_id(client, auth_header, hood_id):
response = client.delete('/api/hoods/1337/telegram/123', headers=auth_header) response = client.delete("/api/hoods/1337/telegram/123", headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete('/api/hoods/wrong/telegram/123', headers=auth_header) response = client.delete("/api/hoods/wrong/telegram/123", headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.delete( response = client.delete(
'/api/hoods/{0}/telegram/7331'.format(hood_id), headers=auth_header "/api/hoods/{0}/telegram/7331".format(hood_id), headers=auth_header
) )
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.delete( response = client.delete(
'/api/hoods/{0}/telegram/wrong'.format(hood_id), headers=auth_header "/api/hoods/{0}/telegram/wrong".format(hood_id), headers=auth_header
) )
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@mark.parametrize('bot', [{'api_token': 'apitoken123', 'welcome_message': 'msg'}]) @mark.parametrize("bot", [{"api_token": "apitoken123", "welcome_message": "msg"}])
def test_telegram_delete_bot_unauthorized(client, bot, telegram): def test_telegram_delete_bot_unauthorized(client, bot, telegram):
response = client.delete( response = client.delete(
'/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id) "/api/hoods/{0}/telegram/{1}".format(telegram.hood.id, telegram.id)
) )
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -7,36 +7,36 @@ from fastapi import status
from pytest import mark from pytest import mark
@mark.parametrize('bot', [{'api_token': 'apitoken123', 'welcome_message': 'msg'}]) @mark.parametrize("bot", [{"api_token": "apitoken123", "welcome_message": "msg"}])
def test_telegram_get_bot(client, auth_header, event_loop, bot, telegram): def test_telegram_get_bot(client, auth_header, event_loop, bot, telegram):
response = client.get( response = client.get(
'/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id), "/api/hoods/{0}/telegram/{1}".format(telegram.hood.id, telegram.id),
headers=auth_header, headers=auth_header,
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()['id'] == telegram.id assert response.json()["id"] == telegram.id
assert response.json()['api_token'] == telegram.api_token assert response.json()["api_token"] == telegram.api_token
assert response.json()['welcome_message'] == telegram.welcome_message assert response.json()["welcome_message"] == telegram.welcome_message
def test_telegram_get_bot_invalid_id(client, auth_header, hood_id): def test_telegram_get_bot_invalid_id(client, auth_header, hood_id):
response = client.get('/api/hoods/1337/telegram/123', headers=auth_header) response = client.get("/api/hoods/1337/telegram/123", headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get('/api/hoods/wrong/telegram/123', headers=auth_header) response = client.get("/api/hoods/wrong/telegram/123", headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = client.get( response = client.get(
'/api/hoods/{0}/telegram/7331'.format(hood_id), headers=auth_header "/api/hoods/{0}/telegram/7331".format(hood_id), headers=auth_header
) )
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get( response = client.get(
'/api/hoods/{0}/telegram/wrong'.format(hood_id), headers=auth_header "/api/hoods/{0}/telegram/wrong".format(hood_id), headers=auth_header
) )
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@mark.parametrize('bot', [{'api_token': 'apitoken456', 'welcome_message': 'msg'}]) @mark.parametrize("bot", [{"api_token": "apitoken456", "welcome_message": "msg"}])
def test_telegram_get_bot_unauthorized(client, bot, telegram): def test_telegram_get_bot_unauthorized(client, bot, telegram):
response = client.get( response = client.get(
'/api/hoods/{0}/telegram/{1}'.format(telegram.hood.id, telegram.id) "/api/hoods/{0}/telegram/{1}".format(telegram.hood.id, telegram.id)
) )
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED

View file

@ -14,34 +14,34 @@ def test_telegram_get_bots(client, auth_header, event_loop, hood_id):
telegram0 = event_loop.run_until_complete( telegram0 = event_loop.run_until_complete(
Telegram.objects.create( Telegram.objects.create(
hood=hood, hood=hood,
api_token='api_token123', api_token="api_token123",
welcome_message='welcome_message123', welcome_message="welcome_message123",
) )
) )
telegram1 = event_loop.run_until_complete( telegram1 = event_loop.run_until_complete(
Telegram.objects.create( Telegram.objects.create(
hood=hood, hood=hood,
api_token='api_token456', api_token="api_token456",
welcome_message='welcome_message123', welcome_message="welcome_message123",
) )
) )
response = client.get( response = client.get(
'/api/hoods/{0}/telegram'.format(telegram0.hood.id), headers=auth_header "/api/hoods/{0}/telegram".format(telegram0.hood.id), headers=auth_header
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()[0]['id'] == telegram0.id assert response.json()[0]["id"] == telegram0.id
assert response.json()[0]['api_token'] == telegram0.api_token assert response.json()[0]["api_token"] == telegram0.api_token
assert response.json()[1]['id'] == telegram1.id assert response.json()[1]["id"] == telegram1.id
assert response.json()[1]['api_token'] == telegram1.api_token assert response.json()[1]["api_token"] == telegram1.api_token
def test_telegram_get_bots_invalid_id(client, auth_header, hood_id): def test_telegram_get_bots_invalid_id(client, auth_header, hood_id):
response = client.get('/api/hoods/1337/telegram', headers=auth_header) response = client.get("/api/hoods/1337/telegram", headers=auth_header)
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
response = client.get('/api/hoods/wrong/telegram', headers=auth_header) response = client.get("/api/hoods/wrong/telegram", headers=auth_header)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_telegram_get_bots_unauthorized(client, hood_id): def test_telegram_get_bots_unauthorized(client, hood_id):
response = client.get('/api/hoods/{0}/telegram'.format(hood_id)) response = client.get("/api/hoods/{0}/telegram".format(hood_id))
assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.status_code == status.HTTP_401_UNAUTHORIZED