202 lines
6.2 KiB
Python
202 lines
6.2 KiB
Python
import os
|
|
import time
|
|
|
|
import click
|
|
import deltachat
|
|
from deltachat.capi import lib as dclib
|
|
from deltachat.cutil import as_dc_charpointer
|
|
|
|
|
|
def get_control_group(ac: deltachat.account.Account):
|
|
for chat in ac.get_chats():
|
|
if chat.num_contacts() == 1 and chat.is_protected() and chat.is_group():
|
|
if chat.get_name() == "verification bot control group":
|
|
return chat
|
|
return
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--db-path",
|
|
type=str,
|
|
help="The directory where the Delta Chat database should be stored",
|
|
default=os.getenv("HOME") + "/.config/DeltaChat/verificationbot/db.sqlite",
|
|
show_default=True,
|
|
)
|
|
@click.option(
|
|
"--from-backup",
|
|
type=str,
|
|
help="Specify backup file of your account to initialize account",
|
|
default=" ",
|
|
)
|
|
@click.option(
|
|
"--verbose",
|
|
"-v",
|
|
type=bool,
|
|
is_flag=True,
|
|
help="Show Delta Chat log output",
|
|
default=False,
|
|
)
|
|
@click.pass_context
|
|
def init(ctx, db_path, from_backup, verbose) -> None:
|
|
ac = deltachat.account.Account(db_path)
|
|
if verbose:
|
|
ac.add_account_plugin(deltachat.events.FFIEventLogger(ac))
|
|
if not ac.is_configured():
|
|
if from_backup != " ":
|
|
ac.import_all(from_backup)
|
|
else:
|
|
ctx.fail(
|
|
"Account not configured. Please specify a backup file with --from-backup"
|
|
)
|
|
|
|
# ensure that the account owner has bcc_self enabled
|
|
if ac.get_config("bcc_self") != "1":
|
|
ctx.fail(
|
|
"You need to enable 'Send Copy to Self' in the Advanced settings for this bot to work."
|
|
)
|
|
|
|
# some config defaults; see https://c.delta.chat/classdc__context__t.html#aff3b894f6cfca46cab5248fdffdf083d
|
|
ac.set_config("bot", "1")
|
|
# disable read receipts; we don't want the bot to send out read receipts if the actual account
|
|
# owner didn't read the message yet
|
|
ac.set_config("mdns_enabled", "0")
|
|
# delete from device after 1 day to save disk space
|
|
ac.set_config("delete_device_after", "86400")
|
|
|
|
# create control group
|
|
if not get_control_group(ac):
|
|
ac.start_io()
|
|
control_group = ac.create_group_chat(
|
|
"verification bot control group", verified=True
|
|
)
|
|
control_group.send_text(
|
|
"This is the control group for the verification bot. Send /help for available commands."
|
|
)
|
|
# wait until message is sent out
|
|
while not control_group.get_messages()[-1].is_out_delivered():
|
|
time.sleep(0.1)
|
|
|
|
click.secho(
|
|
"Verification bot for %s was set up successfully. To start the bot, run:"
|
|
% (ac.get_config("addr"),)
|
|
)
|
|
click.secho("")
|
|
click.secho(" verificationbot run\n")
|
|
|
|
|
|
def process_command(
|
|
ac: deltachat.Account, command: deltachat.Message
|
|
) -> deltachat.Message:
|
|
"""Handle incoming commands, increment the last_command_id.
|
|
|
|
:param ac: deltachat account object of the bot
|
|
:param command: the deltachat message to be handled
|
|
:returns the reply to the command
|
|
"""
|
|
if "/help" in command.text:
|
|
replytext = (
|
|
"Just send me screenshots of QR codes; "
|
|
"I will scan them so others can join in on them."
|
|
)
|
|
if command.text.startswith("OPENPGP4FPR:"):
|
|
try:
|
|
qr = ac.check_qr(command.text)
|
|
if qr._dc_lot.state() == 512:
|
|
dclib.dc_set_config_from_qr(
|
|
ac._dc_context, as_dc_charpointer(command.text)
|
|
)
|
|
replytext = str(qr._dc_lot.state()) + ". scanned " + command.text
|
|
except ValueError:
|
|
replytext = "Not a valid OPENPGP4FPR code"
|
|
if command.filename:
|
|
with open(command.filename, "r") as f:
|
|
ac.qr_join_chat(f)
|
|
replytext = "scanning QR codes is not supported right now."
|
|
reply = deltachat.Message.new_empty(ac, "text")
|
|
reply.set_text(replytext)
|
|
reply.quote = command
|
|
sent_id = dclib.dc_send_msg(ac._dc_context, command.chat.id, reply._dc_msg)
|
|
assert sent_id == reply.id
|
|
assert reply
|
|
|
|
|
|
def get_next_message_from_group(
|
|
group: deltachat.Chat, last_message_id: int
|
|
) -> deltachat.Message:
|
|
"""Get next message from a Delta Chat group from the ID of the last message.
|
|
|
|
:param group: the specific Delta Chat group
|
|
:param last_message_id: the ID of the last known message
|
|
:return: the next Delta Chat message in the group
|
|
"""
|
|
for msg in range(len(group.get_messages())):
|
|
if group.get_messages()[msg].id == last_message_id:
|
|
return group.get_messages()[msg + 1]
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--db-path",
|
|
type=str,
|
|
help="The directory where the Delta Chat database is stored",
|
|
default=os.getenv("HOME") + "/.config/DeltaChat/verificationbot/db.sqlite",
|
|
show_default=True,
|
|
)
|
|
@click.option(
|
|
"--verbose",
|
|
"-v",
|
|
type=bool,
|
|
is_flag=True,
|
|
help="Show Delta Chat log output",
|
|
default=False,
|
|
)
|
|
@click.pass_context
|
|
def run(ctx, db_path, verbose):
|
|
ac = deltachat.account.Account(db_path)
|
|
if verbose:
|
|
ac.add_account_plugin(deltachat.events.FFIEventLogger(ac))
|
|
if not ac.is_configured():
|
|
ctx.fail(
|
|
"Please run this first to initialize the bot:\n\n verificationbot init\n"
|
|
)
|
|
|
|
last_command_id = get_control_group(ac).get_messages()[-1].id
|
|
ac.start_io()
|
|
try:
|
|
while True:
|
|
control_group = get_control_group(ac)
|
|
if control_group.get_messages()[-1].id == last_command_id:
|
|
time.sleep(0.1)
|
|
continue
|
|
print(control_group.get_messages()[-1].id)
|
|
|
|
next_message = get_next_message_from_group(control_group, last_command_id)
|
|
if next_message.quote:
|
|
last_command_id = next_message.id
|
|
continue # skip replies, they could be ours
|
|
|
|
try:
|
|
reply = process_command(ac, next_message)
|
|
finally:
|
|
last_command_id = next_message.id
|
|
finally:
|
|
click.secho("Exiting.")
|
|
|
|
|
|
@click.command(
|
|
cls=click.Group, context_settings={"help_option_names": ["-h", "--help"]}
|
|
)
|
|
@click.version_option()
|
|
@click.pass_context
|
|
def cli_main(ctx):
|
|
"""e-mail account creation admin tool and web service."""
|
|
|
|
|
|
cli_main.add_command(init)
|
|
cli_main.add_command(run)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli_main()
|