[telegram] Add start, stop, status, update REST endpoints and /stop command

This commit is contained in:
Cathy Hu 2020-07-17 15:07:24 +02:00
parent fe3fc6485c
commit 2aab9d3273
3 changed files with 88 additions and 11 deletions

View file

@ -3,11 +3,13 @@
# SPDX-License-Identifier: 0BSD
from aiogram import Bot, Dispatcher, exceptions, types
from asyncio import gather, sleep
from asyncio import gather, sleep, CancelledError
from kibicara.config import config
from kibicara.platformapi import Censor, Message, Spawner
from kibicara.platforms.telegram.model import Telegram, TelegramUser
from logging import getLogger
from ormantic.exceptions import NoMatch
from sqlite3 import IntegrityError
logger = getLogger(__name__)
@ -17,15 +19,37 @@ class TelegramBot(Censor):
def __init__(self, telegram_model):
super().__init__(telegram_model.hood)
self.telegram_model = telegram_model
try:
self.bot = Bot(token=telegram_model.api_token)
self.dp = Dispatcher(self.bot)
self.dp.register_message_handler(self._send_welcome, commands=['start'])
self.dp.register_message_handler(self._receive_message)
self.dp = self._create_dispatcher()
except exceptions.ValidationError as e:
self.telegram_model.enabled = False
finally:
self.enabled = self.telegram_model.enabled
def _create_dispatcher(self):
dp = Dispatcher(self.bot)
dp.register_message_handler(self._send_welcome, commands=['start'])
dp.register_message_handler(self._remove_user, commands=['stop'])
dp.register_message_handler(self._receive_message)
return dp
async def run(self):
await gather(self.dp.start_polling(), self.push())
try:
if not self.dp:
self.dp = self._create_dispatcher()
logger.debug(f'Bot {self.telegram_model.hood.name} starting.')
await gather(self.dp.start_polling(), self._push())
except CancelledError:
logger.debug(f'Bot {self.telegram_model.hood.name} received Cancellation.')
self.dp = None
raise
except exceptions.ValidationError:
logger.debug(f'Bot {self.telegram_model.hood.name} has invalid auth token.')
finally:
logger.debug(f'Bot {self.telegram_model.hood.name} stopped.')
async def push(self):
async def _push(self):
while True:
message = await self.receive()
logger.debug(
@ -79,6 +103,16 @@ class TelegramBot(Censor):
except IntegrityError:
await message.reply('Error: You are already registered.')
async def _remove_user(self, message: types.Message):
try:
telegram_user = await TelegramUser.objects.get(
user_id=message.from_user.id, bot=self.telegram_model
)
await telegram_user.delete()
await message.reply('You were removed successfully from this bot.')
except NoMatch:
await message.reply('Error: You are not subscribed to this bot.')
async def _receive_message(self, message: types.Message):
if not message.text:
await message.reply('Error: Only text messages are allowed.')

View file

@ -9,8 +9,9 @@ from ormantic import Boolean, Integer, ForeignKey, Model, Text
class Telegram(Model):
id: Integer(primary_key=True) = None
hood: ForeignKey(Hood)
api_token: Text()
api_token: Text(unique=True)
welcome_message: Text()
enabled: Boolean() = True
class Mapping(Mapping):
table_name = 'telegrambots'

View file

@ -2,15 +2,17 @@
#
# SPDX-License-Identifier: 0BSD
from aiogram.bot.api import check_token
from aiogram import exceptions
from fastapi import APIRouter, Depends, HTTPException, Response, status
from kibicara.config import config
from kibicara.platforms.telegram.bot import spawner
from kibicara.platforms.telegram.model import Telegram
from kibicara.webapi.hoods import get_hood
from logging import getLogger
from sqlite3 import IntegrityError
from sqlite3 import IntegrityError, OperationalError
from ormantic.exceptions import NoMatch
from pydantic import BaseModel
from pydantic import BaseModel, validator
logger = getLogger(__name__)
@ -20,6 +22,14 @@ class BodyTelegram(BaseModel):
api_token: str
welcome_message: str = 'Welcome!'
@validator('api_token')
def valid_api_token(cls, value):
try:
check_token(value)
return value
except exceptions.ValidationError as e:
raise ValueError(e)
async def get_telegram(telegram_id: int, hood=Depends(get_hood)):
try:
@ -49,7 +59,9 @@ async def telegram_delete(telegram=Depends(get_telegram)):
@router.post('/', status_code=status.HTTP_201_CREATED)
async def telegram_create(values: BodyTelegram, hood=Depends(get_hood)):
async def telegram_create(
response: Response, values: BodyTelegram, hood=Depends(get_hood)
):
try:
telegram = await Telegram.objects.create(hood=hood, **values.__dict__)
spawner.start(telegram)
@ -57,3 +69,33 @@ async def telegram_create(values: BodyTelegram, hood=Depends(get_hood)):
return telegram
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.put('/{telegram_id}', status_code=status.HTTP_202_ACCEPTED)
async def telegram_update(values: BodyTelegram, telegram=Depends(get_telegram)):
try:
spawner.stop(telegram)
await telegram.update(**values.__dict__)
spawner.start(telegram)
return telegram
except IntegrityError:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.get('/{telegram_id}/status', status_code=status.HTTP_200_OK)
async def telegram_status(telegram=Depends(get_telegram)):
return {'status': spawner.get(telegram).status.name}
@router.post('/{telegram_id}/start', status_code=status.HTTP_200_OK)
async def telegram_start(telegram=Depends(get_telegram)):
await telegram.update(enabled=True)
spawner.get(telegram).start()
return {}
@router.post('/{telegram_id}/stop', status_code=status.HTTP_200_OK)
async def telegram_stop(telegram=Depends(get_telegram)):
await telegram.update(enabled=False)
spawner.get(telegram).stop()
return {}