activate and deactivate a chat
This commit is contained in:
parent
67a94e390a
commit
09feb1a61c
|
@ -19,3 +19,53 @@ def remind_user(user: deltachat.Contact):
|
||||||
file_path = get_file_path(messages)
|
file_path = get_file_path(messages)
|
||||||
lines = get_lines_from_file(file_path)
|
lines = get_lines_from_file(file_path)
|
||||||
print(lines)
|
print(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def activate_chat(msg: deltachat.Message):
|
||||||
|
"""Activate a Chat after the user sent /start"""
|
||||||
|
messages = [message.text for message in msg.chat.get_messages()]
|
||||||
|
file_path = get_file_path(messages)
|
||||||
|
if not file_path:
|
||||||
|
reply(
|
||||||
|
msg.chat,
|
||||||
|
"You first need to send me a file path with the /file command.",
|
||||||
|
quote=msg,
|
||||||
|
)
|
||||||
|
msg.mark_seen()
|
||||||
|
return
|
||||||
|
reply(msg.chat, f"I will send you daily reminders from the file {file_path}.")
|
||||||
|
msg.chat.set_ephemeral_timer(60 * 60 * 24)
|
||||||
|
remind_user(msg.get_sender_contact())
|
||||||
|
|
||||||
|
|
||||||
|
def handle_incoming_message(msg: deltachat.Message):
|
||||||
|
print(str(msg.chat.id), msg.text)
|
||||||
|
if "/start" in msg.text:
|
||||||
|
activate_chat(msg)
|
||||||
|
elif "/stop" in msg.text:
|
||||||
|
msg.chat.set_ephemeral_timer(0)
|
||||||
|
reply(msg.chat, "I will stop sending you messages now.", quote=msg)
|
||||||
|
msg.mark_seen()
|
||||||
|
|
||||||
|
|
||||||
|
def reply(
|
||||||
|
chat: deltachat.Chat,
|
||||||
|
text: str,
|
||||||
|
quote: deltachat.Message = None,
|
||||||
|
attachment: str = None,
|
||||||
|
):
|
||||||
|
"""Reply to a message from a user.
|
||||||
|
|
||||||
|
:param chat: the chat where the user's message appeared
|
||||||
|
:param text: the text of the reply
|
||||||
|
:param quote: (optional) if you want it as a reply to a specific message
|
||||||
|
:param attachment: the file path of an attachment
|
||||||
|
"""
|
||||||
|
print(str(chat.id), text)
|
||||||
|
our_reply = deltachat.message.Message.new_empty(chat.account, "text")
|
||||||
|
our_reply.set_text(text)
|
||||||
|
if quote:
|
||||||
|
our_reply.quote = quote
|
||||||
|
if attachment:
|
||||||
|
our_reply.set_file(attachment)
|
||||||
|
chat.send_msg(our_reply)
|
||||||
|
|
|
@ -2,7 +2,7 @@ import time
|
||||||
|
|
||||||
import deltachat
|
import deltachat
|
||||||
|
|
||||||
from remember_remember_bot.commands import remind_user
|
from remember_remember_bot.commands import remind_user, handle_incoming_message
|
||||||
from remember_remember_bot.util import check_new_day, update_day
|
from remember_remember_bot.util import check_new_day, update_day
|
||||||
|
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ def loop(ac: deltachat.Account):
|
||||||
user.get_chat()
|
user.get_chat()
|
||||||
): # does this return None if there is no existing chat?
|
): # does this return None if there is no existing chat?
|
||||||
remind_user(user)
|
remind_user(user)
|
||||||
print(current_day)
|
|
||||||
current_day = update_day()
|
current_day = update_day()
|
||||||
|
for msg in ac.get_fresh_messages():
|
||||||
|
handle_incoming_message(msg)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
|
@ -23,6 +23,13 @@ def chat_is_active(messages: [str]) -> bool:
|
||||||
active = True
|
active = True
|
||||||
if "/stop" in message_text:
|
if "/stop" in message_text:
|
||||||
active = False
|
active = False
|
||||||
|
if "I will stop sending you messages now." in message_text:
|
||||||
|
active = False
|
||||||
|
if (
|
||||||
|
"You first need to send me a file path with the /file command."
|
||||||
|
in message_text
|
||||||
|
):
|
||||||
|
active = False
|
||||||
return active
|
return active
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,6 +43,7 @@ def get_file_path(messages: [str]) -> str:
|
||||||
for message_text in messages:
|
for message_text in messages:
|
||||||
if "file" in message_text:
|
if "file" in message_text:
|
||||||
file_path = message_text.split("file ", 1)[1]
|
file_path = message_text.split("file ", 1)[1]
|
||||||
|
if not get_lines_from_file(file_path) == [f"File not found: {file_path}"]:
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
|
||||||
|
@ -45,6 +53,9 @@ def get_lines_from_file(file_path: str) -> [str]:
|
||||||
:param file_path: the path to the file with the user's reminders
|
:param file_path: the path to the file with the user's reminders
|
||||||
:return: a list with all the user's reminders
|
:return: a list with all the user's reminders
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
with open(file_path, "r", encoding="utf-8") as f:
|
with open(file_path, "r", encoding="utf-8") as f:
|
||||||
lines = f.readlines()
|
lines = f.readlines()
|
||||||
|
except (FileNotFoundError, TypeError):
|
||||||
|
return [f"File not found: {file_path}"]
|
||||||
return [line.strip("\n") for line in lines]
|
return [line.strip("\n") for line in lines]
|
||||||
|
|
10
tests/conftest.py
Normal file
10
tests/conftest.py
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_file_path(request, tmpdir):
|
||||||
|
if request.param:
|
||||||
|
path = str(tmpdir) + "/" + str(request.param)
|
||||||
|
with open(path, "w+", encoding="utf-8") as f:
|
||||||
|
f.write("test")
|
||||||
|
return path
|
|
@ -20,6 +20,15 @@ def test_update_day():
|
||||||
(["/start"], True),
|
(["/start"], True),
|
||||||
(["/start", "/stop"], False),
|
(["/start", "/stop"], False),
|
||||||
(["/start", "/stop", "Let's get this party started"], False),
|
(["/start", "/stop", "Let's get this party started"], False),
|
||||||
|
(
|
||||||
|
[
|
||||||
|
"/start",
|
||||||
|
"Message deletion timer is set to 1 day by",
|
||||||
|
"Message deletion timer is disabled by",
|
||||||
|
"I will stop sending you messages now.",
|
||||||
|
],
|
||||||
|
False,
|
||||||
|
),
|
||||||
(["/start", "/stop", "/start"], True),
|
(["/start", "/stop", "/start"], True),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -28,13 +37,19 @@ def test_chat_is_active(messages, active):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
("messages", "file_path"),
|
"messages, tmp_file_path",
|
||||||
[
|
[
|
||||||
(["/file /home/user/test"], "/home/user/test"),
|
(["/file %file_path%"], "test1"),
|
||||||
(["/file /home/user/test", "/stop"], "/home/user/test"),
|
(["/file %file_path%", "/stop"], "test2"),
|
||||||
(["I stored your reminders at the file /home/user/test"], "/home/user/test"),
|
(["I stored your reminders at the file %file_path%"], "test3"),
|
||||||
(["/start", "/stop", "/start"], None),
|
(["/start", "/stop", "/start"], None),
|
||||||
],
|
],
|
||||||
|
indirect=["tmp_file_path"],
|
||||||
)
|
)
|
||||||
def test_get_file_path(messages: [], file_path: str):
|
def test_get_file_path(messages: [], tmp_file_path: str):
|
||||||
assert get_file_path(messages) == file_path
|
if tmp_file_path:
|
||||||
|
for listitem in range(len(messages)):
|
||||||
|
messages[listitem] = messages[listitem].replace(
|
||||||
|
"%file_path%", tmp_file_path
|
||||||
|
)
|
||||||
|
assert get_file_path(messages) == tmp_file_path
|
||||||
|
|
Loading…
Reference in a new issue