ticketfrei3/backend/src/kibicara/migratefromticketfrei.py

226 lines
8.6 KiB
Python

import sqlite3
import argparse
from time import time, sleep
from os import urandom
from tortoise import Tortoise
from tortoise.exceptions import DoesNotExist
import asyncio
from mastodon import Mastodon
from kibicara.model import Admin, Hood, IncludePattern, ExcludePattern
from kibicara.platforms.mastodon.model import MastodonInstance, MastodonAccount
from kibicara.platforms.telegram.model import Telegram, TelegramSubscriber
from kibicara.platforms.email.model import Email, EmailSubscriber
from kibicara.platforms.twitter.model import Twitter
class OldDataBase:
def __init__(self, old_database_path):
self.conn = sqlite3.connect(old_database_path)
self.cur = self.conn.cursor()
def execute(self, *args, **kwargs):
return self.cur.execute(*args, **kwargs)
def commit(self):
start_time = time()
while 1:
try:
self.conn.commit()
break
except sqlite3.OperationalError:
# another thread may be writing, give it a chance to finish
sleep(0.1)
if time() - start_time > 5:
# if it takes this long, something is wrong
raise
def close(self):
self.conn.close()
class Main:
def __init__(self):
parser = argparse.ArgumentParser()
parser.add_argument(
"old_database_path", help="path to the ticketfrei2 sqlite3 database"
)
parser.add_argument(
"new_database_path",
help="path to the ticketfrei3 sqlite3 database",
default="ticketfrei3.sqlite",
)
args = parser.parse_args()
# open old database
self.old_db = OldDataBase(args.old_database_path)
# open new database
asyncio.run(self.new_database(args.new_database_path))
async def new_database(self, new_database_path):
await Tortoise.init(
db_url=f"sqlite://{new_database_path}",
modules={
"models": [
"kibicara.model",
"kibicara.platforms.email.model",
"kibicara.platforms.mastodon.model",
"kibicara.platforms.telegram.model",
"kibicara.platforms.test.model",
"kibicara.platforms.twitter.model",
]
},
)
await Tortoise.generate_schemas()
# read table per table and write it to new database.
# mastodon instances
old_mastodon_instances = self.old_db.execute(
"SELECT * FROM mastodon_instances"
).fetchall()
for instance in old_mastodon_instances:
url = instance[1]
client_id = instance[2]
client_secret = instance[3]
await MastodonInstance.create(
name=url, client_id=client_id, client_secret=client_secret
)
print(f"Created Mastodon Instance: {url}")
old_users = self.old_db.execute("SELECT * FROM user;").fetchall()
for user in old_users:
user_id = user[0]
user_passhash = user[1]
user_enabled = user[2]
if user_enabled == 0:
print(f"skipping user {user_id}, inactive")
email = self.old_db.execute(
"SELECT email FROM email WHERE user_id=?", (user_id,)
).fetchone()[0]
try:
await Admin.get(email=email)
continue # skip if the user already exists.
except DoesNotExist:
admin = await Admin.create(email=email, passhash=user_passhash)
city = self.old_db.execute(
"SELECT * FROM cities WHERE user_id=?", (user_id,)
).fetchone()
city_name = city[2]
city_markdown = city[3]
hood = await Hood.create(name=city_name, landingpage=city_markdown)
await hood.admins.add(admin)
print()
print(
"Migrated user %s with email %s for the city %s "
"with the following accounts:" % (user_id, email, city_name)
)
patterns = self.old_db.execute(
"SELECT patterns FROM triggerpatterns WHERE user_id=?", (user_id,)
).fetchone()[0]
for pattern in patterns.splitlines():
await IncludePattern.create(hood=hood, pattern=pattern)
badwords = self.old_db.execute(
"SELECT words FROM badwords WHERE user_id=?", (user_id,)
).fetchone()[0]
for badword in badwords.splitlines():
await ExcludePattern.create(hood=hood, pattern=badword)
mastodon_account = self.old_db.execute(
"SELECT * FROM mastodon_accounts WHERE user_id=?", (user_id,)
).fetchone()
if mastodon_account:
instance_url = self.old_db.execute(
"SELECT instance FROM mastodon_instances WHERE id=?",
(mastodon_account[3],),
).fetchone()[0]
new_instance = await MastodonInstance.get(name=instance_url)
access_token = mastodon_account[2]
mastodon_enabled = mastodon_account[4]
await MastodonAccount.create(
hood=hood,
instance=new_instance,
access_token=access_token,
enabled=mastodon_enabled,
)
await dismiss_notifications(hood)
print(f"Mastodon: {instance_url}, {access_token}")
telegram_account = self.old_db.execute(
"SELECT apikey, active FROM telegram_accounts WHERE user_id=?",
(user_id,),
).fetchone()
if telegram_account[0] != "":
telegram_apikey = telegram_account[0]
telegram_enabled = telegram_account[1]
telegram = await Telegram.create(
hood=hood,
api_token=telegram_apikey,
enabled=telegram_enabled,
welcome_message="",
)
telegram_subscribers = self.old_db.execute(
"SELECT subscriber_id FROM telegram_subscribers WHERE user_id=?",
(user_id,),
).fetchall()
for subscriber in telegram_subscribers:
subscriber_id = subscriber[0]
await TelegramSubscriber.create(bot=telegram, user_id=subscriber_id)
print(f"Telegram: {len(telegram_subscribers)} subscribers")
mail_subscribers = self.old_db.execute(
"SELECT email FROM mailinglist WHERE user_id=?", (user_id,)
).fetchall()
if mail_subscribers is not []:
email_name = f"kibicara-{city_name}"
email = await Email.create(
hood=hood, name=email_name, secret=urandom(32).hex()
)
for subscriber in mail_subscribers:
subscriber_email = subscriber[0]
await EmailSubscriber.create(email=subscriber_email, hood=hood)
print(f"E-Mail: {len(mail_subscribers)} subscribers")
twitter = self.old_db.execute(
"SELECT * FROM twitter_accounts WHERE user_id=?", (user_id,)
).fetchone()
if twitter:
client_id = twitter[2]
client_secret = twitter[3]
seen_tweet = self.old_db.execute(
"SELECT tweet_id FROM seen_tweets WHERE user_id=?", (user_id,)
).fetchone()[0]
await Twitter.create(
hood=hood,
dms_since_id=1, # didn't work in ticketfrei2 anyway
mentions_since_id=seen_tweet,
access_token=client_id,
access_token_secret=client_secret,
username=" ",
verified=False,
enabled=True,
)
print(f"Twitter: last ID: {seen_tweet}")
await Tortoise.close_connections()
async def dismiss_notifications(hood):
model = await MastodonAccount.get(hood=hood)
await model.fetch_related("instance")
account = Mastodon(
client_id=model.instance.client_id,
client_secret=model.instance.client_secret,
api_base_url=model.instance.name,
access_token=model.access_token,
)
notifications = account.notifications()
for status in notifications:
account.notifications_dismiss(status["id"])
print(f"Dismissed {len(notifications)} notifications on Mastodon.")