publickey-bot/src/keyserver_bot/hooks.py

114 lines
3.6 KiB
Python

#!/usr/bin/env python3
import pgpy
from deltachat_rpc_client import events, run_bot_cli, EventType, Message
from email_validator import validate_email, EmailNotValidError
from keyserver_bot.wkd import request_from_wkd
from keyserver_bot.koo import request_from_koo
from keyserver_bot.vcard import construct_vcard, save_vcard
hooks = events.HookCollection()
HELP_MSG = (
"If you send me an email address, "
"I will fetch it's public PGP key "
"(from WKD and keys.openpgp.org) "
"and send you a contact you can encrypt to."
)
@hooks.on(events.NewMessage)
def command(event):
snapshot = event.message_snapshot
if snapshot.text == "/generate-invite":
return snapshot.chat.send_text(snapshot.chat.account.get_qr_code())
if snapshot.text == "Messages are guaranteed to be end-to-end encrypted from now on.":
return
email = snapshot.text
public_key = None
display_name = None
if snapshot.get("file"):
public_key, email_from_uid, display_name = import_key_from_attachment(snapshot.file)
email = email if email else email_from_uid
try:
email = validate_email(email, allow_display_name=True).ascii_email
if not display_name:
display_name = validate_email(email, allow_display_name=True).display_name
except EmailNotValidError:
return snapshot.chat.send_text(HELP_MSG + f"\n\n{email} is not an email address :/")
if not public_key:
public_key = request_key_by_email(email)
if not public_key:
return snapshot.chat.send_text(f"Sorry, I could not find a key for {email}.")
vcard = construct_vcard(email, public_key, display_name)
vcard_path = f"/tmp/{email}.vcf"
save_vcard(vcard_path, vcard)
return snapshot.chat.send_file(vcard_path)
def import_key_from_attachment(file_path: str) -> (str, str, str):
"""Import a PGP public key from an attachment.
:param file_path: the path to the attachment
:return: a tuple with the public key, the email address, and a displayname
"""
try:
key = pgpy.PGPKey.from_file(file_path)[0]
except AttributeError:
return "", ""
except pgpy.errors.PGPError:
return "", ""
ascii_key = str(key)
keyparts = []
for line in ascii_key.splitlines():
if line != "" and not line.startswith("Comment: ") and not line.startswith("-----"):
keyparts.append(line)
uid = key.get_uid("").userid
email = validate_email(uid, allow_display_name=True).ascii_email
display_name = validate_email(uid, allow_display_name=True).display_name
return "".join(keyparts), email, display_name
def request_key_by_email(email) -> str:
domain = email.split("@")[1]
public_key = request_from_wkd(email, domain)
if not public_key:
public_key = request_from_wkd(email, f"openpgpkey.{domain}")
if not public_key:
public_key = request_from_koo(email)
return public_key
@hooks.on(events.RawEvent)
def cleanup(event):
print(event)
if event.kind == EventType.MSG_DELIVERED or event.kind == EventType.MSG_FAILED:
msg = Message(event.account, event.msg_id).get_snapshot()
delete_data(msg)
if event.kind == EventType.SECUREJOIN_INVITER_PROGRESS:
if event.progress == 1000:
contact = event.account.create_contact(event.contact_id)
chat = contact.create_chat()
chat.send_text(HELP_MSG)
def delete_data(msg):
contacts = msg.chat.get_contacts()
msg.chat.delete()
for member in contacts:
member.delete()
def main():
run_bot_cli(hooks)
if __name__ == "__main__":
main()