Refactoring.

This commit is contained in:
Thomas L 2018-03-28 17:36:35 +02:00
parent 890e720c91
commit 1dd75c10d5
13 changed files with 335 additions and 410 deletions

1
active_bots/__init__.py Normal file
View file

@ -0,0 +1 @@
from mastodonbot import MastodonBot

50
active_bots/mastodonbot.py Executable file
View file

@ -0,0 +1,50 @@
#!/usr/bin/env python3
from bot import Bot
import logging
from mastodon import Mastodon
import re
from report import Report
logger = logging.getLogger(__name__)
class MastodonBot(Bot):
def crawl(self, user):
"""
Crawl mentions from Mastodon.
:return: list of statuses
"""
mentions = []
m = Mastodon(*user.get_masto_credentials())
try:
notifications = m.notifications()
except: # mastodon.Mastodon.MastodonAPIError is unfortunately not in __init__.py
logger.error("Unknown Mastodon API Error.", exc_info=True)
return mentions
for status in notifications:
if (status['type'] == 'mention' and
status['status']['id'] > self.seen_toots):
# save state
self.seen_toots = status['status']['id']
self.save_last()
# add mention to mentions
text = re.sub(r'<[^>]*>', '', status['status']['content'])
text = re.sub(
"(?<=^|(?<=[^a-zA-Z0-9-_.]))@([A-Za-z]+[A-Za-z0-9-_]+)",
"", text)
mentions.append(Report(status['account']['acct'],
self,
text,
status['status']['id'],
status['status']['created_at']))
return mentions
def post(self, user, report):
m = Mastodon(*user.get_masto_credentials())
if report.source == self:
m.status_reblog(report.id)
else:
m.toot(report.text)

View file

@ -1,33 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from bot import Bot
import logging import active_bots
import time
import sendmail
from db import DB
from config import config from config import config
from db import db
from mastodonbot import MastodonBot import logging
from twitterbot import TwitterBot import sendmail
from mailbot import Mailbot import time
from trigger import Trigger
def get_users(db):
user_rows = db.get_users()
users = {}
for row in user_rows:
users[row[0]] = []
return users
def init_bots(config, db, users):
for uid in users:
users[uid].append(Trigger(config, uid, db))
users[uid].append(MastodonBot(config, uid, db))
users[uid].append(TwitterBot(config, uid, db))
users[uid].append(Mailbot(config, uid, db))
return users
if __name__ == '__main__': if __name__ == '__main__':
@ -37,39 +15,28 @@ if __name__ == '__main__':
fh.setLevel(logging.DEBUG) fh.setLevel(logging.DEBUG)
logger.addHandler(fh) logger.addHandler(fh)
db = DB() bots = []
for ActiveBot in active_bots.__dict__.values():
if isinstance(ActiveBot, type) and issubclass(ActiveBot, Bot):
bots.append(ActiveBot())
while True: try:
# get a dictionary { uid : [ Bot objects ] } while True:
users = get_users(db) for user in db.active_users:
for bot in bots:
# initialize bots reports = bot.crawl(user)
users = init_bots(config, logger, db, users)
try:
for uid in users:
for bot in users[uid]:
reports = bot.crawl()
for status in reports: for status in reports:
if not users[uid][0].is_ok(status.text): if not user.is_appropriate(status):
continue continue
for bot2 in users[uid]: for bot in bots:
if bot == bot2: bot.post(user, status)
bot2.repost(status) time.sleep(60) # twitter rate limit >.<
else: except:
bot2.post(status) logger.error('Shutdown', exc_info=True)
time.sleep(60) # twitter rate limit >.< mailer = sendmail.Mailer(config)
except KeyboardInterrupt: try:
print("Good bye. Remember to restart the bot!") mailer.send('', config['web']['contact'],
'Ticketfrei Crash Report',
attachment=config['logging']['logpath'])
except: except:
logger.error('Shutdown', exc_info=True) logger.error('Mail sending failed', exc_info=True)
for uid in users:
for bot in users[uid]:
bot.save_last()
mailer = sendmail.Mailer(config)
try:
mailer.send('', config['web']['contact'],
'Ticketfrei Crash Report',
attachment=config['logging']['logpath'])
except:
logger.error('Mail sending failed', exc_info=True)

8
bot.py Normal file
View file

@ -0,0 +1,8 @@
class Bot(object):
# returns a list of Report objects
def crawl(user):
pass
# post/boost Report object
def post(user, report):
pass

194
db.py
View file

@ -1,10 +1,8 @@
from bottle import redirect, request from config import config
from functools import wraps
from inspect import Signature
import jwt import jwt
import logging import logging
from os import urandom from os import urandom
from pylibscrypt import scrypt_mcf, scrypt_mcf_check from pylibscrypt import scrypt_mcf
import sqlite3 import sqlite3
from user import User from user import User
@ -19,156 +17,142 @@ class DB(object):
self.create() self.create()
self.secret = urandom(32) self.secret = urandom(32)
def execute(self, *args, **kwargs):
return self.cur.execute(*args, **kwargs)
def commit(self):
self.conn.commit()
def close(self):
self.conn.close()
def create(self): def create(self):
# init db # init db
self.cur.executescript(''' self.cur.executescript('''
CREATE TABLE IF NOT EXISTS user ( CREATE TABLE IF NOT EXISTS user (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
email TEXT,
passhash TEXT, passhash TEXT,
enabled INTEGER DEFAULT 1 enabled INTEGER DEFAULT 1
); );
CREATE TABLE IF NOT EXISTS twitter_request_tokens ( CREATE TABLE IF NOT EXISTS email (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER, user_id INTEGER,
request_token TEXT, email TEXT,
FOREIGN KEY(user_id) REFERENCES user(id) FOREIGN KEY(user_id) REFERENCES user(id)
); );
CREATE TABLE IF NOT EXISTS twitter_accounts ( CREATE TABLE IF NOT EXISTS triggerpatterns (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER, user_id INTEGER,
client_id TEXT, pattern TEXT,
client_secret TEXT,
FOREIGN KEY(user_id) REFERENCES user(id) FOREIGN KEY(user_id) REFERENCES user(id)
); );
CREATE TABLE IF NOT EXISTS trigger_good ( CREATE TABLE IF NOT EXISTS badwords (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER, user_id INTEGER,
words TEXT, word TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS trigger_bad (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
words TEXT,
FOREIGN KEY(user_id) REFERENCES user(id) FOREIGN KEY(user_id) REFERENCES user(id)
); );
CREATE TABLE IF NOT EXISTS mastodon_instances ( CREATE TABLE IF NOT EXISTS mastodon_instances (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
instance TEXT, instance TEXT,
client_id TEXT, client_id TEXT,
client_secret TEXT client_secret TEXT
); );
CREATE TABLE IF NOT EXISTS mastodon_accounts ( CREATE TABLE IF NOT EXISTS mastodon_accounts (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER, user_id INTEGER,
access_token TEXT, access_token TEXT,
instance_id TEXT, instance_id INTEGER,
active INTEGER, active INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id), FOREIGN KEY(user_id) REFERENCES user(id),
FOREIGN KEY(instance_id) REFERENCES mastodon_instances(id) FOREIGN KEY(instance_id) REFERENCES mastodon_instances(id)
); );
CREATE TABLE IF NOT EXISTS seen_toots ( CREATE TABLE IF NOT EXISTS seen_toots (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER, user_id INTEGER,
mastodon_accounts_id INTEGER, mastodon_accounts_id INTEGER,
toot_id TEXT, toot_id TEXT,
FOREIGN KEY(user_id) REFERENCES user(id), FOREIGN KEY(user_id) REFERENCES user(id),
FOREIGN KEY(mastodon_accounts_id) FOREIGN KEY(mastodon_accounts_id)
REFERENCES mastodon_accounts(id) REFERENCES mastodon_accounts(id)
); );
CREATE TABLE IF NOT EXISTS mail ( CREATE TABLE IF NOT EXISTS twitter_request_tokens (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER, user_id INTEGER,
email TEXT, request_token TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS twitter_accounts (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
client_id TEXT,
client_secret TEXT,
active INTEGER, active INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id) FOREIGN KEY(user_id) REFERENCES user(id)
); );
CREATE TABLE IF NOT EXISTS seen_mails (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
mail_id INTEGER,
mail_date INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id),
FOREIGN KEY(mail_id) REFERENCES mail(id)
);
CREATE TABLE IF NOT EXISTS seen_tweets ( CREATE TABLE IF NOT EXISTS seen_tweets (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER, user_id INTEGER,
twitter_accounts_id INTEGER, twitter_accounts_id INTEGER,
tweet_id TEXT, tweet_id TEXT,
FOREIGN KEY(user_id) REFERENCES user(id) FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS mail (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
email TEXT,
active INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id)
FOREIGN KEY(twitter_accounts_id) FOREIGN KEY(twitter_accounts_id)
REFERENCES twitter_accounts(id) REFERENCES twitter_accounts(id)
); );
CREATE TABLE IF NOT EXISTS seen_mails (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
mail_id INTEGER,
mail_date INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id)
FOREIGN KEY(mail_id) REFERENCES mail(id)
);
''') ''')
def token(self, email, password): def user_token(self, email, password):
return jwt.encode({ return jwt.encode({
'email': email, 'email': email,
'passhash': scrypt_mcf(password.encode('utf-8')).decode('ascii') 'passhash': scrypt_mcf(
password.encode('utf-8')
).decode('ascii')
}, self.secret).decode('ascii') }, self.secret).decode('ascii')
def register(self, token): def confirm(self, token):
json = jwt.decode(token, self.secret) try:
# create user json = jwt.decode(token, self.secret)
self.cur.execute("INSERT INTO user (email, passhash) VALUES(?, ?);", except jwt.DecodeError:
(json['email'], json['passhash'])) return None # invalid token
self.conn.commit() if 'passhash' in json.keys():
return User(self, self.cur.lastrowid) # create user
self.execute("INSERT INTO user (passhash) VALUES(?, ?);",
def authenticate(self, email, password): (json['passhash'], ))
# check email/password uid = self.cur.lastrowid
self.cur.execute("SELECT id, passhash FROM user WHERE email=?;", else:
(email, )) uid = json['uid']
row = self.cur.fetchone() self.execute("INSERT INTO email (user_id, email) VALUES(?, ?);",
if not row: (uid, json['email']))
return None # No user with this email self.commit()
if not scrypt_mcf_check(row[1].encode('ascii'), return User(uid)
password.encode('utf-8')):
return None # Wrong passphrase
return User(self, row[0])
def by_email(self, email): def by_email(self, email):
self.cur.execute("SELECT id FROM user WHERE email=?;", (email, )) self.execute("SELECT user_id FROM email WHERE email=?;", (email, ))
row = self.cur.fetchone() try:
if not row: uid, = self.cur.fetchone()
except TypeError:
return None return None
return User(self, row[0]) return User(uid)
def close(self): @property
self.conn.close() def active_users(self):
self.execute("SELECT id FROM user WHERE enabled=1;")
def get_users(self): return [User(uid) for uid, in self.cur.fetchall()]
self.cur.execute("SELECT id FROM user WHERE enabled=1;")
return self.cur.fetchall()
class DBPlugin(object): db = DB(config['database']['db_path'])
name = 'DBPlugin'
api = 2
def __init__(self, dbfile, loginpage):
self.db = DB(dbfile)
self.loginpage = loginpage
def close(self):
self.db.close()
def apply(self, callback, route):
uservar = route.config.get('user', None)
dbvar = route.config.get('db', None)
signature = Signature.from_callable(route.callback)
@wraps(callback)
def wrapper(*args, **kwargs):
if uservar and uservar in signature.parameters:
uid = request.get_cookie('uid', secret=self.db.secret)
if uid is None:
return redirect(self.loginpage)
kwargs[uservar] = User(self.db, uid)
if dbvar and dbvar in signature.parameters:
kwargs[dbvar] = self.db
return callback(*args, **kwargs)
return wrapper

View file

@ -1,10 +1,11 @@
import bottle import bottle
from bottle import get, post, redirect, request, response, view from bottle import get, post, redirect, request, response, view
from config import config from config import config
from db import DBPlugin from db import db
import logging import logging
import tweepy import tweepy
import sendmail import sendmail
from session import SessionPlugin
import smtplib import smtplib
from mastodon import Mastodon from mastodon import Mastodon
@ -18,9 +19,9 @@ def propaganda():
pass pass
@post('/register', db='db') @post('/register')
@view('template/register.tpl') @view('template/register.tpl')
def register_post(db): def register_post():
email = request.forms.get('email', '') email = request.forms.get('email', '')
password = request.forms.get('pass', '') password = request.forms.get('pass', '')
password_repeat = request.forms.get('pass-repeat', '') password_repeat = request.forms.get('pass-repeat', '')
@ -29,7 +30,7 @@ def register_post(db):
if db.by_email(email): if db.by_email(email):
return dict(error='Email address already in use.') return dict(error='Email address already in use.')
# send confirmation mail # send confirmation mail
confirm_link = request.url + "/../confirm/" + db.token(email, password) confirm_link = request.url + "/../confirm/" + db.user_token(email, password)
send_confirmation_mail(confirm_link, email) send_confirmation_mail(confirm_link, email)
return dict(info='Confirmation mail sent.') return dict(info='Confirmation mail sent.')
@ -43,33 +44,33 @@ def send_confirmation_mail(confirm_link, email):
return "Please enter a valid E-Mail address." return "Please enter a valid E-Mail address."
@get('/confirm/<token>', db='db') @get('/confirm/<token>')
@view('template/propaganda.tpl') @view('template/propaganda.tpl')
def confirm(db, token): def confirm(token):
# create db-entry # create db-entry
if db.register(token): if db.confirm(token):
# :todo show info "Account creation successful." # :todo show info "Account creation successful."
return redirect('/settings') return redirect('/settings')
return dict(error='Account creation failed.') return dict(error='Email confirmation failed.')
@post('/login', db='db') @post('/login')
@view('template/login.tpl') @view('template/login.tpl')
def login_post(db): def login_post():
# check login # check login
if db.authenticate(request.forms.get('email', ''), if db.by_email(request.forms.get('email', '')) \
request.forms.get('pass', '')): .check_password(request.forms.get('pass', '')):
return redirect('/settings') return redirect('/settings')
return dict(error='Authentication failed.') return dict(error='Authentication failed.')
@get('/settings', user='user') @get('/settings')
@view('template/settings.tpl') @view('template/settings.tpl')
def settings(user): def settings(user):
return user.state() return user.state()
@get('/api/state', user='user') @get('/api/state')
def api_enable(user): def api_enable(user):
return user.state() return user.state()
@ -87,7 +88,7 @@ def logout():
return redirect('/') return redirect('/')
@get('/login/twitter', user='user') @get('/login/twitter')
def login_twitter(user): def login_twitter(user):
""" """
Starts the twitter OAuth authentication process. Starts the twitter OAuth authentication process.
@ -107,7 +108,7 @@ def login_twitter(user):
return bottle.redirect(redirect_url) return bottle.redirect(redirect_url)
@get('/login/twitter/callback', user="user") @get('/login/twitter/callback')
def twitter_callback(user): def twitter_callback(user):
""" """
Gets the callback Gets the callback
@ -126,7 +127,7 @@ def twitter_callback(user):
return bottle.redirect("/settings") return bottle.redirect("/settings")
@post('/login/mastodon', user="user") @post('/login/mastodon')
def login_mastodon(user): def login_mastodon(user):
""" """
Starts the mastodon OAuth authentication process. Starts the mastodon OAuth authentication process.
@ -152,10 +153,9 @@ def login_mastodon(user):
return dict(error='Login to Mastodon failed.') return dict(error='Login to Mastodon failed.')
application = bottle.default_app()
bottle.install(SessionPlugin('/'))
if __name__ == '__main__': if __name__ == '__main__':
# testing only # testing only
bottle.install(DBPlugin(':memory:', '/'))
bottle.run(host='localhost', port=8080) bottle.run(host='localhost', port=8080)
else:
bottle.install(DBPlugin(config['database']['db_path'], '/'))
application = bottle.default_app()

View file

@ -1,97 +0,0 @@
#!/usr/bin/env python3
import logging
import mastodon
import re
import report
from user import User
logger = logging.getLogger(__name__)
class MastodonBot(object):
def __init__(self, uid, db):
self.user = User(db, uid)
client_id, client_secret, access_token, instance_url = \
self.user.get_masto_credentials()
self.m = mastodon.Mastodon(
client_id=client_id,
client_secret=client_secret,
access_token=access_token,
api_base_url=instance_url
)
# load state
try:
self.seen_toots = self.user.get_seen_toot()
except TypeError:
self.seen_toots = 0
def save_last(self):
self.user.save_seen_toot(self.seen_toots)
def crawl(self):
"""
Crawl mentions from Mastodon.
:return: list of statuses
"""
mentions = []
try:
notifications = self.m.notifications()
except: # mastodon.Mastodon.MastodonAPIError is unfortunately not in __init__.py
logger.error("Unknown Mastodon API Error.", exc_info=True)
return mentions
for status in notifications:
if (status['type'] == 'mention' and
status['status']['id'] > self.seen_toots):
# save state
self.seen_toots = status['status']['id']
self.save_last()
# add mention to mentions
text = re.sub(r'<[^>]*>', '', status['status']['content'])
text = re.sub(
"(?<=^|(?<=[^a-zA-Z0-9-_.]))@([A-Za-z]+[A-Za-z0-9-_]+)",
"", text)
mentions.append(report.Report(status['account']['acct'],
"mastodon",
text,
status['status']['id'],
status['status']['created_at']))
return mentions
def repost(self, mention):
"""
Retoots a mention.
:param mention: (report.Report object)
"""
logger.info('Boosting toot from %s' % (
mention.format()))
self.m.status_reblog(mention.id)
def post(self, report):
"""
Toots a report from other sources.
:param report: (report.Report object)
"""
toot = report.format()
self.m.toot(toot)
def flow(self, trigger, reports=()):
# toot external provided messages
for report in reports:
self.post(report)
# boost mentions
retoots = []
for mention in self.crawl():
if not trigger.is_ok(mention.text):
continue
self.repost(mention)
retoots.append(mention)
# return mentions for mirroring
return retoots

View file

@ -23,15 +23,3 @@ class Report(object):
self.text = text self.text = text
self.timestamp = timestamp self.timestamp = timestamp
self.id = id self.id = id
def format(self):
"""
Format the report for bot.post()
:rtype: string
:return: toot: text to be tooted, e.g. "There are
uniformed controllers in the U2 at Opernhaus."
"""
# strng = self.author + ": " + self.text # deprecated;
# we don't want to put the names of people too public.
return self.text

28
session.py Normal file
View file

@ -0,0 +1,28 @@
from bottle import redirect, request
from db import db
from functools import wraps
from inspect import Signature
from user import User
class SessionPlugin(object):
name = 'SessionPlugin'
keyword = 'user'
api = 2
def __init__(self, loginpage):
self.loginpage = loginpage
def apply(self, callback, route):
if self.keyword in Signature.from_callable(route.callback).parameters:
@wraps(callback)
def wrapper(*args, **kwargs):
uid = request.get_cookie('uid', secret=db.secret)
if uid is None:
return redirect(self.loginpage)
kwargs[self.keyword] = User(db, uid)
return callback(*args, **kwargs)
return wrapper
else:
return callback

View file

@ -1,67 +0,0 @@
#!/usr/bin/env python
import re
from user import User
class Trigger(object):
"""
This class provides a filter to test a string against.
"""
def __init__(self, config, uid, db):
self.config = config
self.db = db
self.user = User(db, uid)
# load goodlists
self.goodlist = []
raw = self.user.get_trigger_words("trigger_good")
print(raw)
print(type(raw))
for pattern in raw:
pattern = pattern.strip()
if pattern:
self.goodlist.append(re.compile(pattern, re.IGNORECASE))
# load blacklists
self.blacklist = set()
raw = self.user.get_trigger_words("trigger_bad")
for word in raw:
word = word.strip()
if word:
self.blacklist.add(word)
def is_ok(self, message):
"""
checks if a string contains no bad words and at least 1 good word.
:param message: A given string. Tweet or Toot, cleaned from html.
:return: If the string passes the test
"""
for pattern in self.goodlist:
if pattern.search(message) is not None:
break
else:
# no pattern matched
return False
for word in message.lower().split():
if word in self.blacklist:
return False
return True
"""
if __name__ == "__main__":
import prepare
config = prepare.get_config()
print("testing the trigger")
trigger = Trigger(config)
print("Printing words which trigger the bot:")
for i in trigger.goodlist:
print(i)
print()
print("Printing words which block a bot:")
for i in trigger.blacklist:
print(i)
"""

159
user.py
View file

@ -1,100 +1,163 @@
from bottle import response from bottle import response
from db import db
import jwt
from mastodon import Mastodon from mastodon import Mastodon
from pylibscrypt import scrypt_mcf, scrypt_mcf_check
class User(object): class User(object):
def __init__(self, db, uid): def __init__(self, uid):
# set cookie # set cookie
response.set_cookie('uid', uid, secret=db.secret, path='/') response.set_cookie('uid', uid, secret=db.secret, path='/')
self.db = db
self.uid = uid self.uid = uid
def check_password(self, password):
db.execute("SELECT passhash FROM user WHERE id=?;", (self.uid, ))
passhash, = db.cur.fetchone()
return scrypt_mcf_check(passhash.encode('ascii'),
password.encode('utf-8'))
def password(self, password):
passhash = scrypt_mcf(password.encode('utf-8')).decode('ascii')
db.execute("UPDATE user SET passhash=? WHERE id=?;",
(passhash, self.uid))
db.commit()
password = property(None, password) # setter only, can't read back
@property
def enabled(self):
db.execute("SELECT enabled FROM user WHERE user_id=?;", (self.uid, ))
return bool(db.cur.fetchone()[0])
@enabled.setter
def enabled(self, enabled):
db.execute("UPDATE user SET enabled=? WHERE id=?",
(1 if enabled else 0, self.uid))
db.commit()
@property
def emails(self):
db.execute("SELECT email FROM email WHERE user_id=?;", (self.uid, ))
return (*db.cur.fetchall(), )
def delete_email(self, email):
db.execute("SELECT COUNT(*) FROM email WHERE user_id=?", (self.uid, ))
if db.cur.fetchone()[0] == 1:
return False # don't allow to delete last email
db.execute("DELETE FROM email WHERE user_id=? AND email=?;",
(self.uid, email))
db.commit()
return True
def email_token(self, email):
return jwt.encode({
'email': email,
'uid': self.uid
}, self.secret).decode('ascii')
def is_appropriate(self, report):
db.execute("SELECT pattern FROM triggerpatterns WHERE user_id=?;",
(self.uid, ))
for pattern, in db.cur.fetchall():
if pattern.search(report.text) is not None:
break
else:
# no pattern matched
return False
db.execute("SELECT word FROM badwords WHERE user_id=?;",
(self.uid, ))
badwords = [word.lower() for word, in db.cur.fetchall()]
for word in report.text.lower().split():
if word in badwords:
return False
return True
def get_masto_credentials(self): def get_masto_credentials(self):
self.db.cur.execute("SELECT access_token, instance_id FROM mastodon_accounts WHERE user_id = ? AND active = 1;", db.execute("SELECT access_token, instance_id FROM mastodon_accounts WHERE user_id = ? AND active = 1;",
(self.uid, )) (self.uid, ))
row = self.db.cur.fetchone() row = db.cur.fetchone()
self.db.cur.execute("SELECT instance, client_id, client_secret FROM mastodon_instances WHERE id = ?;", db.execute("SELECT instance, client_id, client_secret FROM mastodon_instances WHERE id = ?;",
(row[1], )) (row[1], ))
instance = self.db.cur.fetchone() instance = db.cur.fetchone()
return instance[1], instance[2], row[0], instance[0] return instance[1], instance[2], row[0], instance[0]
def get_seen_toot(self): def get_seen_toot(self):
self.db.cur.execute("SELECT toot_id FROM seen_toots WHERE user_id = ?;", db.execute("SELECT toot_id FROM seen_toots WHERE user_id = ?;",
(self.uid, )) (self.uid, ))
return self.db.cur.fetchone()[0] return db.cur.fetchone()[0]
def save_seen_toot(self, toot_id): def save_seen_toot(self, toot_id):
self.db.cur.execute("UPDATE seen_toots SET toot_id = ? WHERE user_id = ?;", db.execute("UPDATE seen_toots SET toot_id = ? WHERE user_id = ?;",
(toot_id, self.uid)) (toot_id, self.uid))
def get_seen_tweet(self): def get_seen_tweet(self):
self.db.cur.execute("SELECT tweet_id FROM seen_tweets WHERE user_id = ?;", db.execute("SELECT tweet_id FROM seen_tweets WHERE user_id = ?;",
(self.uid, )) (self.uid, ))
return self.db.cur.fetchone()[0] return db.cur.fetchone()[0]
def save_seen_tweet(self, tweet_id): def save_seen_tweet(self, tweet_id):
self.db.cur.execute("UPDATE seen_tweets SET tweet_id = ? WHERE user_id = ?;", db.execute("UPDATE seen_tweets SET tweet_id = ? WHERE user_id = ?;",
(tweet_id, self.uid)) (tweet_id, self.uid))
def get_mail(self): def get_mail(self):
self.db.cur.execute("SELECT email FROM mail WHERE user_id = ?;", (self.uid, )) db.execute("SELECT email FROM mail WHERE user_id = ?;", (self.uid, ))
def get_seen_mail(self): def get_seen_mail(self):
self.db.cur.execute("SELECT mail_date FROM seen_mails WHERE user_id = ?;", (self.uid, )) db.execute("SELECT mail_date FROM seen_mails WHERE user_id = ?;", (self.uid, ))
return self.db.cur.fetchone()[0] return db.cur.fetchone()[0]
def save_seen_mail(self, mail_date): def save_seen_mail(self, mail_date):
self.db.cur.execute("UPDATE seen_mail SET mail_date = ? WHERE user_id = ?;", db.execute("UPDATE seen_mail SET mail_date = ? WHERE user_id = ?;",
(mail_date, self.uid)) (mail_date, self.uid))
def get_trigger_words(self, table): def get_trigger_words(self, table):
self.db.cur.execute("SELECT words FROM ? WHERE user_id = ?;", (table, self.uid,)) db.execute("SELECT words FROM ? WHERE user_id = ?;", (table, self.uid,))
return self.db.cur.fetchone()[0] return db.cur.fetchone()[0]
def state(self): def state(self):
return dict(foo='bar') return dict(foo='bar')
def save_request_token(self, token): def save_request_token(self, token):
self.db.cur.execute("INSERT INTO twitter_request_tokens(user_id, request_token) VALUES(?, ?);", db.execute("INSERT INTO twitter_request_tokens(user_id, request_token) VALUES(?, ?);",
(self.uid, token)) (self.uid, token))
self.db.conn.commit() db.commit()
def get_request_token(self): def get_request_token(self):
self.db.cur.execute("SELECT request_token FROM twitter_request_tokens WHERE user_id = ?;", (id,)) db.execute("SELECT request_token FROM twitter_request_tokens WHERE user_id = ?;", (id,))
request_token = self.db.cur.fetchone()[0] request_token = db.cur.fetchone()[0]
self.db.cur.execute("DELETE FROM twitter_request_tokens WHERE user_id = ?;", (id,)) db.execute("DELETE FROM twitter_request_tokens WHERE user_id = ?;", (id,))
self.db.conn.commit() db.commit()
return request_token return request_token
def save_twitter_token(self, access_token, access_token_secret): def save_twitter_token(self, access_token, access_token_secret):
self.db.cur.execute( db.execute(
"INSERT INTO twitter_accounts(user_id, access_token_key, access_token_secret) VALUES(?, ?, ?);", "INSERT INTO twitter_accounts(user_id, access_token_key, access_token_secret) VALUES(?, ?, ?);",
(id, access_token, access_token_secret)) (id, access_token, access_token_secret))
self.db.conn.commit() db.commit()
def get_twitter_token(self): def get_twitter_token(self):
self.db.cur.execute("SELECT access_token, access_token_secret FROM twitter_accouts WHERE user_id = ?;", db.execute("SELECT access_token, access_token_secret FROM twitter_accouts WHERE user_id = ?;",
(self.uid, )) (self.uid, ))
return self.db.cur.fetchall() return db.cur.fetchall()
def get_mastodon_app_keys(self, instance): def get_mastodon_app_keys(self, instance):
self.db.cur.execute("SELECT client_id, client_secret FROM mastodon_instances WHERE instance = ?;", (instance, )) db.execute("SELECT client_id, client_secret FROM mastodon_instances WHERE instance = ?;", (instance, ))
try: try:
row = self.db.cur.fetchone() row = db.cur.fetchone()
client_id = row[0] client_id = row[0]
client_secret = row[1] client_secret = row[1]
return client_id, client_secret return client_id, client_secret
except TypeError: except TypeError:
app_name = "ticketfrei" + str(self.db.secret)[0:4] app_name = "ticketfrei" + str(db.secret)[0:4]
client_id, client_secret = Mastodon.create_app(app_name, api_base_url=instance) client_id, client_secret = Mastodon.create_app(app_name, api_base_url=instance)
self.db.cur.execute("INSERT INTO mastodon_instances(instance, client_id, client_secret) VALUES(?, ?, ?);", db.execute("INSERT INTO mastodon_instances(instance, client_id, client_secret) VALUES(?, ?, ?);",
(instance, client_id, client_secret)) (instance, client_id, client_secret))
self.db.conn.commit() db.commit()
return client_id, client_secret return client_id, client_secret
def save_masto_token(self, access_token, instance): def save_masto_token(self, access_token, instance):
self.db.cur.execute("SELECT id FROM mastodon_instances WHERE instance = ?;", (instance, )) db.execute("SELECT id FROM mastodon_instances WHERE instance = ?;", (instance, ))
instance_id = self.db.cur.fetchone()[0] instance_id = db.cur.fetchone()[0]
self.db.cur.execute("INSERT INTO mastodon_accounts(user_id, access_token, instance_id, active) " db.execute("INSERT INTO mastodon_accounts(user_id, access_token, instance_id, active) "
"VALUES(?, ?, ?, ?);", (self.uid, access_token, instance_id, 1)) "VALUES(?, ?, ?, ?);", (self.uid, access_token, instance_id, 1))
self.db.conn.commit() db.commit()