mailcow-scripts/create-mailcow-accounts.py

121 lines
4.3 KiB
Python
Executable File

import os
import time
import requests as r
class MailcowConnection:
"""Class to manage requests to the mailcow instance.
:param mailcow_endpoint: the URL to the mailcow API
:param mailcow_token: the access token to the mailcow API
"""
def __init__(self, mailcow_endpoint, mailcow_token):
self.mailcow_endpoint = mailcow_endpoint
self.auth = {"X-API-Key": mailcow_token}
def add_user_mailcow(self, addr, password, token, quota=0):
"""HTTP Request to add a user to the mailcow instance.
:param addr: the email address of the new account
:param password: the password of the new account
:param token: the mailadm token used for account creation
:param quota: the maximum mailbox storage in MB. default: unlimited
"""
url = self.mailcow_endpoint + "add/mailbox"
payload = {
"local_part": addr.split("@")[0],
"domain": addr.split("@")[1],
"quota": quota,
"password": password,
"password2": password,
"active": True,
"force_pw_update": False,
"tls_enforce_in": False,
"tls_enforce_out": False,
"tags": ["mailadm:" + token]
}
result = r.post(url, json=payload, headers=self.auth, timeout=30)
if type(result.json()) != list or result.json()[0].get("type") != "success":
raise MailcowError(result.json())
def del_user_mailcow(self, addr):
"""HTTP Request to delete a user from the mailcow instance.
:param addr: the email account to be deleted
"""
url = self.mailcow_endpoint + "delete/mailbox"
result = r.post(url, json=[addr], headers=self.auth, timeout=30)
json = result.json()
if not isinstance(json, list) or json[0].get("type" != "success"):
raise MailcowError(json)
def get_user(self, addr):
"""HTTP Request to get a specific mailcow user (not only mailadm-generated ones)."""
url = self.mailcow_endpoint + "get/mailbox/" + addr
result = r.get(url, headers=self.auth, timeout=30)
json = result.json()
if json == {}:
return None
if type(json) == dict:
if json.get("type") == "error":
raise MailcowError(json)
return MailcowUser(json)
def get_user_list(self):
"""HTTP Request to get all mailcow users (not only mailadm-generated ones)."""
url = self.mailcow_endpoint + "get/mailbox/all"
begin = time.time()
result = r.get(url, headers=self.auth, timeout=30)
end = time.time()
print("Request took %.2f seconds." % (end - begin,))
json = result.json()
if json == {}:
return []
if type(json) == dict:
if json.get("type") == "error":
raise MailcowError(json)
return [MailcowUser(user) for user in json]
class MailcowUser(object):
def __init__(self, json):
self.json = json
self.addr = json.get("username")
self.quota = json.get("quota")
self.last_seen = time.time() - json.get("last_imap_login")
for tag in json.get("tags", []):
if "mailadm:" in tag:
self.token = tag.strip("mailadm:")
break
if __name__ == "__main__":
mailcow_token = os.getenv("MAILCOW_TOKEN")
mailcow_endpoint = os.getenv("MAILCOW_ENDPOINT", "https://dc.develcow.de/api/v1/")
mail_domain = os.getenv("MAIL_DOMAIN", "x.testrun.org")
m = MailcowConnection(mailcow_endpoint, mailcow_token)
print("Cleaning up first...")
users = m.get_user_list()
count = 0
for u in users:
if "benchmark" in getattr(u, "token", ""):
count += 1
m.del_user_mailcow(u.addr)
print("Cleaned up %s users." % (count,))
input("Creating 4000 users - continue? (if no, press ctrl+c) ")
count = 0
begin = time.time()
now = begin
while count < 4000:
if count % 100 == 0:
print("Created %s accounts; the last 100 accounts took %.2f seconds" % (count, time.time() - now))
now = time.time()
addr = "bmark%s@%s" % (count, mail_domain)
m.add_user_mailcow(addr, "asdf1234", "benchmark:add")
count += 1
print("Created %s accounts; in total it took %.2f seconds" % (count, time.time() - begin))