Refactoring.

master
Thomas L 2018-03-28 17:36:35 +02:00
parent fbf44525e3
commit e38a32d888
13 changed files with 335 additions and 410 deletions

1
active_bots/__init__.py Normal file
View File

@ -0,0 +1 @@
from mastodonbot import MastodonBot

50
active_bots/mastodonbot.py Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
from bot import Bot
import logging
from mastodon import Mastodon
import re
from report import Report
logger = logging.getLogger(__name__)
class MastodonBot(Bot):
def crawl(self, user):
"""
Crawl mentions from Mastodon.
:return: list of statuses
"""
mentions = []
m = Mastodon(*user.get_masto_credentials())
try:
notifications = m.notifications()
except: # mastodon.Mastodon.MastodonAPIError is unfortunately not in __init__.py
logger.error("Unknown Mastodon API Error.", exc_info=True)
return mentions
for status in notifications:
if (status['type'] == 'mention' and
status['status']['id'] > self.seen_toots):
# save state
self.seen_toots = status['status']['id']
self.save_last()
# add mention to mentions
text = re.sub(r'<[^>]*>', '', status['status']['content'])
text = re.sub(
"(?<=^|(?<=[^a-zA-Z0-9-_.]))@([A-Za-z]+[A-Za-z0-9-_]+)",
"", text)
mentions.append(Report(status['account']['acct'],
self,
text,
status['status']['id'],
status['status']['created_at']))
return mentions
def post(self, user, report):
m = Mastodon(*user.get_masto_credentials())
if report.source == self:
m.status_reblog(report.id)
else:
m.toot(report.text)

View File

@ -1,33 +1,11 @@
#!/usr/bin/env python3
import logging
import time
import sendmail
from db import DB
from bot import Bot
import active_bots
from config import config
from mastodonbot import MastodonBot
from twitterbot import TwitterBot
from mailbot import Mailbot
from trigger import Trigger
def get_users(db):
user_rows = db.get_users()
users = {}
for row in user_rows:
users[row[0]] = []
return users
def init_bots(config, db, users):
for uid in users:
users[uid].append(Trigger(config, uid, db))
users[uid].append(MastodonBot(config, uid, db))
users[uid].append(TwitterBot(config, uid, db))
users[uid].append(Mailbot(config, uid, db))
return users
from db import db
import logging
import sendmail
import time
if __name__ == '__main__':
@ -37,39 +15,28 @@ if __name__ == '__main__':
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
db = DB()
bots = []
for ActiveBot in active_bots.__dict__.values():
if isinstance(ActiveBot, type) and issubclass(ActiveBot, Bot):
bots.append(ActiveBot())
while True:
# get a dictionary { uid : [ Bot objects ] }
users = get_users(db)
# initialize bots
users = init_bots(config, logger, db, users)
try:
for uid in users:
for bot in users[uid]:
reports = bot.crawl()
try:
while True:
for user in db.active_users:
for bot in bots:
reports = bot.crawl(user)
for status in reports:
if not users[uid][0].is_ok(status.text):
if not user.is_appropriate(status):
continue
for bot2 in users[uid]:
if bot == bot2:
bot2.repost(status)
else:
bot2.post(status)
time.sleep(60) # twitter rate limit >.<
except KeyboardInterrupt:
print("Good bye. Remember to restart the bot!")
for bot in bots:
bot.post(user, status)
time.sleep(60) # twitter rate limit >.<
except:
logger.error('Shutdown', exc_info=True)
mailer = sendmail.Mailer(config)
try:
mailer.send('', config['web']['contact'],
'Ticketfrei Crash Report',
attachment=config['logging']['logpath'])
except:
logger.error('Shutdown', exc_info=True)
for uid in users:
for bot in users[uid]:
bot.save_last()
mailer = sendmail.Mailer(config)
try:
mailer.send('', config['web']['contact'],
'Ticketfrei Crash Report',
attachment=config['logging']['logpath'])
except:
logger.error('Mail sending failed', exc_info=True)
logger.error('Mail sending failed', exc_info=True)

8
bot.py Normal file
View File

@ -0,0 +1,8 @@
class Bot(object):
# returns a list of Report objects
def crawl(user):
pass
# post/boost Report object
def post(user, report):
pass

194
db.py
View File

@ -1,10 +1,8 @@
from bottle import redirect, request
from functools import wraps
from inspect import Signature
from config import config
import jwt
import logging
from os import urandom
from pylibscrypt import scrypt_mcf, scrypt_mcf_check
from pylibscrypt import scrypt_mcf
import sqlite3
from user import User
@ -19,156 +17,142 @@ class DB(object):
self.create()
self.secret = urandom(32)
def execute(self, *args, **kwargs):
return self.cur.execute(*args, **kwargs)
def commit(self):
self.conn.commit()
def close(self):
self.conn.close()
def create(self):
# init db
self.cur.executescript('''
CREATE TABLE IF NOT EXISTS user (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
email TEXT,
passhash TEXT,
enabled INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS twitter_request_tokens (
CREATE TABLE IF NOT EXISTS email (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
request_token TEXT,
user_id INTEGER,
email TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS twitter_accounts (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
client_id TEXT,
client_secret TEXT,
CREATE TABLE IF NOT EXISTS triggerpatterns (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
pattern TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS trigger_good (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
words TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS trigger_bad (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
words TEXT,
CREATE TABLE IF NOT EXISTS badwords (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
word TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS mastodon_instances (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
instance TEXT,
client_id TEXT,
client_secret TEXT
);
CREATE TABLE IF NOT EXISTS mastodon_accounts (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
access_token TEXT,
instance_id TEXT,
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
access_token TEXT,
instance_id INTEGER,
active INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id),
FOREIGN KEY(instance_id) REFERENCES mastodon_instances(id)
);
CREATE TABLE IF NOT EXISTS seen_toots (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
mastodon_accounts_id INTEGER,
toot_id TEXT,
toot_id TEXT,
FOREIGN KEY(user_id) REFERENCES user(id),
FOREIGN KEY(mastodon_accounts_id)
REFERENCES mastodon_accounts(id)
);
CREATE TABLE IF NOT EXISTS mail (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
email TEXT,
CREATE TABLE IF NOT EXISTS twitter_request_tokens (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
request_token TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS twitter_accounts (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
client_id TEXT,
client_secret TEXT,
active INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS seen_mails (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
mail_id INTEGER,
mail_date INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id),
FOREIGN KEY(mail_id) REFERENCES mail(id)
);
CREATE TABLE IF NOT EXISTS seen_tweets (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
twitter_accounts_id INTEGER,
tweet_id TEXT,
FOREIGN KEY(user_id) REFERENCES user(id)
);
CREATE TABLE IF NOT EXISTS mail (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
email TEXT,
active INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id)
FOREIGN KEY(twitter_accounts_id)
REFERENCES twitter_accounts(id)
);
CREATE TABLE IF NOT EXISTS seen_mails (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
user_id INTEGER,
mail_id INTEGER,
mail_date INTEGER,
FOREIGN KEY(user_id) REFERENCES user(id)
FOREIGN KEY(mail_id) REFERENCES mail(id)
);
''')
def token(self, email, password):
def user_token(self, email, password):
return jwt.encode({
'email': email,
'passhash': scrypt_mcf(password.encode('utf-8')).decode('ascii')
'email': email,
'passhash': scrypt_mcf(
password.encode('utf-8')
).decode('ascii')
}, self.secret).decode('ascii')
def register(self, token):
json = jwt.decode(token, self.secret)
# create user
self.cur.execute("INSERT INTO user (email, passhash) VALUES(?, ?);",
(json['email'], json['passhash']))
self.conn.commit()
return User(self, self.cur.lastrowid)
def authenticate(self, email, password):
# check email/password
self.cur.execute("SELECT id, passhash FROM user WHERE email=?;",
(email, ))
row = self.cur.fetchone()
if not row:
return None # No user with this email
if not scrypt_mcf_check(row[1].encode('ascii'),
password.encode('utf-8')):
return None # Wrong passphrase
return User(self, row[0])
def confirm(self, token):
try:
json = jwt.decode(token, self.secret)
except jwt.DecodeError:
return None # invalid token
if 'passhash' in json.keys():
# create user
self.execute("INSERT INTO user (passhash) VALUES(?, ?);",
(json['passhash'], ))
uid = self.cur.lastrowid
else:
uid = json['uid']
self.execute("INSERT INTO email (user_id, email) VALUES(?, ?);",
(uid, json['email']))
self.commit()
return User(uid)
def by_email(self, email):
self.cur.execute("SELECT id FROM user WHERE email=?;", (email, ))
row = self.cur.fetchone()
if not row:
self.execute("SELECT user_id FROM email WHERE email=?;", (email, ))
try:
uid, = self.cur.fetchone()
except TypeError:
return None
return User(self, row[0])
return User(uid)
def close(self):
self.conn.close()
def get_users(self):
self.cur.execute("SELECT id FROM user WHERE enabled=1;")
return self.cur.fetchall()
@property
def active_users(self):
self.execute("SELECT id FROM user WHERE enabled=1;")
return [User(uid) for uid, in self.cur.fetchall()]
class DBPlugin(object):
name = 'DBPlugin'
api = 2
def __init__(self, dbfile, loginpage):
self.db = DB(dbfile)
self.loginpage = loginpage
def close(self):
self.db.close()
def apply(self, callback, route):
uservar = route.config.get('user', None)
dbvar = route.config.get('db', None)
signature = Signature.from_callable(route.callback)
@wraps(callback)
def wrapper(*args, **kwargs):
if uservar and uservar in signature.parameters:
uid = request.get_cookie('uid', secret=self.db.secret)
if uid is None:
return redirect(self.loginpage)
kwargs[uservar] = User(self.db, uid)
if dbvar and dbvar in signature.parameters:
kwargs[dbvar] = self.db
return callback(*args, **kwargs)
return wrapper
db = DB(config['database']['db_path'])

View File

@ -1,10 +1,11 @@
import bottle
from bottle import get, post, redirect, request, response, view
from config import config
from db import DBPlugin
from db import db
import logging
import tweepy
import sendmail
from session import SessionPlugin
import smtplib
from mastodon import Mastodon
@ -18,9 +19,9 @@ def propaganda():
pass
@post('/register', db='db')
@post('/register')
@view('template/register.tpl')
def register_post(db):
def register_post():
email = request.forms.get('email', '')
password = request.forms.get('pass', '')
password_repeat = request.forms.get('pass-repeat', '')
@ -29,7 +30,7 @@ def register_post(db):
if db.by_email(email):
return dict(error='Email address already in use.')
# send confirmation mail
confirm_link = request.url + "/../confirm/" + db.token(email, password)
confirm_link = request.url + "/../confirm/" + db.user_token(email, password)
send_confirmation_mail(confirm_link, email)
return dict(info='Confirmation mail sent.')
@ -43,33 +44,33 @@ def send_confirmation_mail(confirm_link, email):
return "Please enter a valid E-Mail address."
@get('/confirm/<token>', db='db')
@get('/confirm/<token>')
@view('template/propaganda.tpl')
def confirm(db, token):
def confirm(token):
# create db-entry
if db.register(token):
if db.confirm(token):
# :todo show info "Account creation successful."
return redirect('/settings')
return dict(error='Account creation failed.')
return dict(error='Email confirmation failed.')
@post('/login', db='db')
@post('/login')
@view('template/login.tpl')
def login_post(db):
def login_post():
# check login
if db.authenticate(request.forms.get('email', ''),
request.forms.get('pass', '')):
if db.by_email(request.forms.get('email', '')) \
.check_password(request.forms.get('pass', '')):
return redirect('/settings')
return dict(error='Authentication failed.')
@get('/settings', user='user')
@get('/settings')
@view('template/settings.tpl')
def settings(user):
return user.state()
@get('/api/state', user='user')
@get('/api/state')
def api_enable(user):
return user.state()
@ -87,7 +88,7 @@ def logout():
return redirect('/')
@get('/login/twitter', user='user')
@get('/login/twitter')
def login_twitter(user):
"""
Starts the twitter OAuth authentication process.
@ -107,7 +108,7 @@ def login_twitter(user):
return bottle.redirect(redirect_url)
@get('/login/twitter/callback', user="user")
@get('/login/twitter/callback')
def twitter_callback(user):
"""
Gets the callback
@ -126,7 +127,7 @@ def twitter_callback(user):
return bottle.redirect("/settings")
@post('/login/mastodon', user="user")
@post('/login/mastodon')
def login_mastodon(user):
"""
Starts the mastodon OAuth authentication process.
@ -152,10 +153,9 @@ def login_mastodon(user):
return dict(error='Login to Mastodon failed.')
application = bottle.default_app()
bottle.install(SessionPlugin('/'))
if __name__ == '__main__':
# testing only
bottle.install(DBPlugin(':memory:', '/'))
bottle.run(host='localhost', port=8080)
else:
bottle.install(DBPlugin(config['database']['db_path'], '/'))
application = bottle.default_app()

View File

@ -1,97 +0,0 @@
#!/usr/bin/env python3
import logging
import mastodon
import re
import report
from user import User
logger = logging.getLogger(__name__)
class MastodonBot(object):
def __init__(self, uid, db):
self.user = User(db, uid)
client_id, client_secret, access_token, instance_url = \
self.user.get_masto_credentials()
self.m = mastodon.Mastodon(
client_id=client_id,
client_secret=client_secret,
access_token=access_token,
api_base_url=instance_url
)
# load state
try:
self.seen_toots = self.user.get_seen_toot()
except TypeError:
self.seen_toots = 0
def save_last(self):
self.user.save_seen_toot(self.seen_toots)
def crawl(self):
"""
Crawl mentions from Mastodon.
:return: list of statuses
"""
mentions = []
try:
notifications = self.m.notifications()
except: # mastodon.Mastodon.MastodonAPIError is unfortunately not in __init__.py
logger.error("Unknown Mastodon API Error.", exc_info=True)
return mentions
for status in notifications:
if (status['type'] == 'mention' and
status['status']['id'] > self.seen_toots):
# save state
self.seen_toots = status['status']['id']
self.save_last()
# add mention to mentions
text = re.sub(r'<[^>]*>', '', status['status']['content'])
text = re.sub(
"(?<=^|(?<=[^a-zA-Z0-9-_.]))@([A-Za-z]+[A-Za-z0-9-_]+)",
"", text)
mentions.append(report.Report(status['account']['acct'],
"mastodon",
text,
status['status']['id'],
status['status']['created_at']))
return mentions
def repost(self, mention):
"""
Retoots a mention.
:param mention: (report.Report object)
"""
logger.info('Boosting toot from %s' % (
mention.format()))
self.m.status_reblog(mention.id)
def post(self, report):
"""
Toots a report from other sources.
:param report: (report.Report object)
"""
toot = report.format()
self.m.toot(toot)
def flow(self, trigger, reports=()):
# toot external provided messages
for report in reports:
self.post(report)
# boost mentions
retoots = []
for mention in self.crawl():
if not trigger.is_ok(mention.text):
continue
self.repost(mention)
retoots.append(mention)
# return mentions for mirroring
return retoots

View File

@ -23,15 +23,3 @@ class Report(object):
self.text = text
self.timestamp = timestamp
self.id = id
def format(self):
"""
Format the report for bot.post()
:rtype: string
:return: toot: text to be tooted, e.g. "There are
uniformed controllers in the U2 at Opernhaus."
"""
# strng = self.author + ": " + self.text # deprecated;
# we don't want to put the names of people too public.
return self.text

28
session.py Normal file
View File

@ -0,0 +1,28 @@
from bottle import redirect, request
from db import db
from functools import wraps
from inspect import Signature
from user import User
class SessionPlugin(object):
name = 'SessionPlugin'
keyword = 'user'
api = 2
def __init__(self, loginpage):
self.loginpage = loginpage
def apply(self, callback, route):
if self.keyword in Signature.from_callable(route.callback).parameters:
@wraps(callback)
def wrapper(*args, **kwargs):
uid = request.get_cookie('uid', secret=db.secret)
if uid is None:
return redirect(self.loginpage)
kwargs[self.keyword] = User(db, uid)
return callback(*args, **kwargs)
return wrapper
else:
return callback

View File

@ -1,67 +0,0 @@
#!/usr/bin/env python
import re
from user import User
class Trigger(object):
"""
This class provides a filter to test a string against.
"""
def __init__(self, config, uid, db):
self.config = config
self.db = db
self.user = User(db, uid)
# load goodlists
self.goodlist = []
raw = self.user.get_trigger_words("trigger_good")
print(raw)
print(type(raw))
for pattern in raw:
pattern = pattern.strip()
if pattern:
self.goodlist.append(re.compile(pattern, re.IGNORECASE))
# load blacklists
self.blacklist = set()
raw = self.user.get_trigger_words("trigger_bad")
for word in raw:
word = word.strip()
if word:
self.blacklist.add(word)
def is_ok(self, message):
"""
checks if a string contains no bad words and at least 1 good word.
:param message: A given string. Tweet or Toot, cleaned from html.
:return: If the string passes the test
"""
for pattern in self.goodlist:
if pattern.search(message) is not None:
break
else:
# no pattern matched
return False
for word in message.lower().split():
if word in self.blacklist:
return False
return True
"""
if __name__ == "__main__":
import prepare
config = prepare.get_config()
print("testing the trigger")
trigger = Trigger(config)
print("Printing words which trigger the bot:")
for i in trigger.goodlist:
print(i)
print()
print("Printing words which block a bot:")
for i in trigger.blacklist:
print(i)
"""

159
user.py
View File

@ -1,100 +1,163 @@
from bottle import response
from db import db
import jwt
from mastodon import Mastodon
from pylibscrypt import scrypt_mcf, scrypt_mcf_check
class User(object):
def __init__(self, db, uid):
def __init__(self, uid):
# set cookie
response.set_cookie('uid', uid, secret=db.secret, path='/')
self.db = db
self.uid = uid
def check_password(self, password):
db.execute("SELECT passhash FROM user WHERE id=?;", (self.uid, ))
passhash, = db.cur.fetchone()
return scrypt_mcf_check(passhash.encode('ascii'),
password.encode('utf-8'))
def password(self, password):
passhash = scrypt_mcf(password.encode('utf-8')).decode('ascii')
db.execute("UPDATE user SET passhash=? WHERE id=?;",
(passhash, self.uid))
db.commit()
password = property(None, password) # setter only, can't read back
@property
def enabled(self):
db.execute("SELECT enabled FROM user WHERE user_id=?;", (self.uid, ))
return bool(db.cur.fetchone()[0])
@enabled.setter
def enabled(self, enabled):
db.execute("UPDATE user SET enabled=? WHERE id=?",
(1 if enabled else 0, self.uid))
db.commit()
@property
def emails(self):
db.execute("SELECT email FROM email WHERE user_id=?;", (self.uid, ))
return (*db.cur.fetchall(), )
def delete_email(self, email):
db.execute("SELECT COUNT(*) FROM email WHERE user_id=?", (self.uid, ))
if db.cur.fetchone()[0] == 1:
return False # don't allow to delete last email
db.execute("DELETE FROM email WHERE user_id=? AND email=?;",
(self.uid, email))
db.commit()
return True
def email_token(self, email):
return jwt.encode({
'email': email,
'uid': self.uid
}, self.secret).decode('ascii')
def is_appropriate(self, report):
db.execute("SELECT pattern FROM triggerpatterns WHERE user_id=?;",
(self.uid, ))
for pattern, in db.cur.fetchall():
if pattern.search(report.text) is not None:
break
else:
# no pattern matched
return False
db.execute("SELECT word FROM badwords WHERE user_id=?;",
(self.uid, ))
badwords = [word.lower() for word, in db.cur.fetchall()]
for word in report.text.lower().split():
if word in badwords:
return False
return True
def get_masto_credentials(self):
self.db.cur.execute("SELECT access_token, instance_id FROM mastodon_accounts WHERE user_id = ? AND active = 1;",
(self.uid, ))
row = self.db.cur.fetchone()
self.db.cur.execute("SELECT instance, client_id, client_secret FROM mastodon_instances WHERE id = ?;",
(row[1], ))
instance = self.db.cur.fetchone()
db.execute("SELECT access_token, instance_id FROM mastodon_accounts WHERE user_id = ? AND active = 1;",
(self.uid, ))
row = db.cur.fetchone()
db.execute("SELECT instance, client_id, client_secret FROM mastodon_instances WHERE id = ?;",
(row[1], ))
instance = db.cur.fetchone()
return instance[1], instance[2], row[0], instance[0]
def get_seen_toot(self):
self.db.cur.execute("SELECT toot_id FROM seen_toots WHERE user_id = ?;",
(self.uid, ))
return self.db.cur.fetchone()[0]
db.execute("SELECT toot_id FROM seen_toots WHERE user_id = ?;",
(self.uid, ))
return db.cur.fetchone()[0]
def save_seen_toot(self, toot_id):
self.db.cur.execute("UPDATE seen_toots SET toot_id = ? WHERE user_id = ?;",
(toot_id, self.uid))
db.execute("UPDATE seen_toots SET toot_id = ? WHERE user_id = ?;",
(toot_id, self.uid))
def get_seen_tweet(self):
self.db.cur.execute("SELECT tweet_id FROM seen_tweets WHERE user_id = ?;",
(self.uid, ))
return self.db.cur.fetchone()[0]
db.execute("SELECT tweet_id FROM seen_tweets WHERE user_id = ?;",
(self.uid, ))
return db.cur.fetchone()[0]
def save_seen_tweet(self, tweet_id):
self.db.cur.execute("UPDATE seen_tweets SET tweet_id = ? WHERE user_id = ?;",
(tweet_id, self.uid))
db.execute("UPDATE seen_tweets SET tweet_id = ? WHERE user_id = ?;",
(tweet_id, self.uid))
def get_mail(self):
self.db.cur.execute("SELECT email FROM mail WHERE user_id = ?;", (self.uid, ))
db.execute("SELECT email FROM mail WHERE user_id = ?;", (self.uid, ))
def get_seen_mail(self):
self.db.cur.execute("SELECT mail_date FROM seen_mails WHERE user_id = ?;", (self.uid, ))
return self.db.cur.fetchone()[0]
db.execute("SELECT mail_date FROM seen_mails WHERE user_id = ?;", (self.uid, ))
return db.cur.fetchone()[0]
def save_seen_mail(self, mail_date):
self.db.cur.execute("UPDATE seen_mail SET mail_date = ? WHERE user_id = ?;",
(mail_date, self.uid))
db.execute("UPDATE seen_mail SET mail_date = ? WHERE user_id = ?;",
(mail_date, self.uid))
def get_trigger_words(self, table):
self.db.cur.execute("SELECT words FROM ? WHERE user_id = ?;", (table, self.uid,))
return self.db.cur.fetchone()[0]
db.execute("SELECT words FROM ? WHERE user_id = ?;", (table, self.uid,))
return db.cur.fetchone()[0]
def state(self):
return dict(foo='bar')
def save_request_token(self, token):
self.db.cur.execute("INSERT INTO twitter_request_tokens(user_id, request_token) VALUES(?, ?);",
(self.uid, token))
self.db.conn.commit()
db.execute("INSERT INTO twitter_request_tokens(user_id, request_token) VALUES(?, ?);",
(self.uid, token))
db.commit()
def get_request_token(self):
self.db.cur.execute("SELECT request_token FROM twitter_request_tokens WHERE user_id = ?;", (id,))
request_token = self.db.cur.fetchone()[0]
self.db.cur.execute("DELETE FROM twitter_request_tokens WHERE user_id = ?;", (id,))
self.db.conn.commit()
db.execute("SELECT request_token FROM twitter_request_tokens WHERE user_id = ?;", (id,))
request_token = db.cur.fetchone()[0]
db.execute("DELETE FROM twitter_request_tokens WHERE user_id = ?;", (id,))
db.commit()
return request_token
def save_twitter_token(self, access_token, access_token_secret):
self.db.cur.execute(
db.execute(
"INSERT INTO twitter_accounts(user_id, access_token_key, access_token_secret) VALUES(?, ?, ?);",
(id, access_token, access_token_secret))
self.db.conn.commit()
db.commit()
def get_twitter_token(self):
self.db.cur.execute("SELECT access_token, access_token_secret FROM twitter_accouts WHERE user_id = ?;",
(self.uid, ))
return self.db.cur.fetchall()
db.execute("SELECT access_token, access_token_secret FROM twitter_accouts WHERE user_id = ?;",
(self.uid, ))
return db.cur.fetchall()
def get_mastodon_app_keys(self, instance):
self.db.cur.execute("SELECT client_id, client_secret FROM mastodon_instances WHERE instance = ?;", (instance, ))
db.execute("SELECT client_id, client_secret FROM mastodon_instances WHERE instance = ?;", (instance, ))
try:
row = self.db.cur.fetchone()
row = db.cur.fetchone()
client_id = row[0]
client_secret = row[1]
return client_id, client_secret
except TypeError:
app_name = "ticketfrei" + str(self.db.secret)[0:4]
app_name = "ticketfrei" + str(db.secret)[0:4]
client_id, client_secret = Mastodon.create_app(app_name, api_base_url=instance)
self.db.cur.execute("INSERT INTO mastodon_instances(instance, client_id, client_secret) VALUES(?, ?, ?);",
(instance, client_id, client_secret))
self.db.conn.commit()
db.execute("INSERT INTO mastodon_instances(instance, client_id, client_secret) VALUES(?, ?, ?);",
(instance, client_id, client_secret))
db.commit()
return client_id, client_secret
def save_masto_token(self, access_token, instance):
self.db.cur.execute("SELECT id FROM mastodon_instances WHERE instance = ?;", (instance, ))
instance_id = self.db.cur.fetchone()[0]
self.db.cur.execute("INSERT INTO mastodon_accounts(user_id, access_token, instance_id, active) "
"VALUES(?, ?, ?, ?);", (self.uid, access_token, instance_id, 1))
self.db.conn.commit()
db.execute("SELECT id FROM mastodon_instances WHERE instance = ?;", (instance, ))
instance_id = db.cur.fetchone()[0]
db.execute("INSERT INTO mastodon_accounts(user_id, access_token, instance_id, active) "
"VALUES(?, ?, ?, ?);", (self.uid, access_token, instance_id, 1))
db.commit()