keytracker/telegram/telegram_bot.py

440 lines
16 KiB
Python

from datetime import datetime
import logging
import sqlite3
import json
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton, ReplyKeyboardMarkup
from telegram.ext import Updater, CommandHandler, MessageHandler, CallbackQueryHandler, Filters
from PIL import Image
from pyzbar.pyzbar import decode
import requests
import configparser
config = configparser.ConfigParser()
config.read("config")
API_PATH = config["Api"]["Url"]
TELEGRAM_TOKEN = config["Telegram"]["Token"]
QR_IMAGE_PATH = config["Default"]["QrImagePath"]
DB_PATH = config["Default"]["DbPath"]
EXPECTED_QR_CODE_VALUE = config["Default"]["QrValue"]
QR_CODE_API_TOKEN = config["Default"]["QrApiToken"]
# === MESSAGES ===
GREETING = """Hey! \U0001F917
Schick mir einfach ein Foto von dem QR Code auf dem Schlüssel.
Um zu sehen wer gerade den Schlüssel hat tippe auf /keyholder
Für eine Übersicht der Befehle tippe auf /help
"""
HELP = """Hier eine Übersicht über die Befehle:
/start Mögliche Aktionen anzeigen
/claim Den Schlüssel übernehmen
/keyholder Anzeigen wer gerade den Schlüssel hat
/loeschen Deine Kontaktdaten löschen (sofern gespeichert)
/allesloeschen Alle zu dir gespeicherten Daten löschen
"""
SCAN_FAILED = "Der QR-Scan ist leider fehlgeschlagen \U0001F641. "\
"Bitte probiere es nochmal und melde dich falls es "\
"wiederholt nicht funktioniert.\n"\
"Du kannst ansonsten den Code auch mit einer anderen App öffnen "\
"und dem Link folgen."
SCAN_INVALID = "Der QR-Code ist leider fehlerhaft"
SCAN_SUCCESS = "Scan erfolgreich!"
CLAIM_SUCCESS = "Perfekt! Der Schlüssel ist jetzt auf dich übertragen"
USER_HAS_KEY = "Der Schlüssel ist aktuell auf dich eingetragen"
USER_HAS_KEY_NEED_PASS = "Um mehr Details zu sehen musst du mir einmalig das Passwort "\
"dazu schicken"
NEED_PASS = "Um zu sehen wer gerade den Schlüssel hat musst du mir einmalig "\
"das Password dazu schicken"
CHOICE_PASS_SEND = "Ich hab das Passwort"
CHOICE_PASS_CANCEL = "Abbrechen / Später"
REQUEST_PASS = "Bitte schicke mir in der nächsten Nachricht das Passwort "\
"(Die Nachricht wird sofort danach wieder gelöscht)"
PASS_ERROR = "Überprüfen des Passworts fehlgeschlagen"
PASS_INVALID = "Das Passwort ist leider falsch"
CHOICE_CLAIM = "Ich hab den Schlüssel"
CHOICE_KEYHOLDER = "Wer hat den Schlüssel"
CLAIM_NO_TOKEN = "Um den Schlüssel zu übernehmen musst du mindestens einmal ein "\
"Foto von dem QR Code gemacht haben oder das Passwort eingegeben"
REQUEST_CONTACT = "Damit es möglich ist, dich zu kontaktieren, wenn du den Schlüssel "\
"hast, brauche ich einmalig deinen Kontakt"
SEND_CONTACT = "Meinen Kontakt senden"
CONTACT_STORED = "Ich habe deine Kontaktdaten für die Zukunft gespeichert. "\
"Wenn du die Daten, die zu dir gespeichert sind wieder löschen "\
"möchtest tippe /loeschen um nur den Kontakt zu löschen oder "\
"/allesloeschen um alle Daten zu löschen."
CHOICE_DELETE_CONFIRM = "Wirklich löschen?"
CHOICE_DELETE_YES = "Löschen"
CHOICE_DELETE_NO = "Abbrechen"
DELETED_CONTACT = "Deine Kontaktdaten wurden gelöscht"
DELETED_ALL = "Ich habe alle Daten zu dir gelöscht"
KEYHOLDER_EMPTY = "Keine Einträge"
KEYHOLDER_TEMPLATE = "Zurzeit hat den Schlüssel: {} (Kontakt: {}, übernommen {})"
CHOICE = "Bitte wähle aus"
CHOICE_START = "Wähle eine Aktion aus"
YOU_CHOSE = "Du hast ausgewählt: {}"
INTERNAL_ERROR = "Tut mir Leid, leider ist ein interner Fehler aufgetreten"
def start(update, context):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
if chat_data["state"] == "start":
context.bot.send_message(chat_id=chat_id, text=GREETING)
elif chat_data["state"] == "idle":
keyboard = [[InlineKeyboardButton(CHOICE_CLAIM, callback_data='claim'),
InlineKeyboardButton(CHOICE_KEYHOLDER, callback_data='keyholder')]]
reply_markup = InlineKeyboardMarkup(keyboard)
context.bot.send_message(chat_id=chat_id, text=CHOICE_START, reply_markup=reply_markup)
def show_help(update, context):
context.bot.send_message(chat_id=update.effective_chat.id, text=HELP)
def claim(update, context):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
if chat_data["state"] == "idle":
if chat_data["token"] is None:
context.bot.send_message(chat_id=chat_id, text=CLAIM_NO_TOKEN)
elif chat_data["name"] is None or chat_data["contact"] is None:
_update_chat_state(chat_id, "wait_contact")
contact_keyboard = KeyboardButton(text=SEND_CONTACT, request_contact=True)
reply_markup = ReplyKeyboardMarkup([[contact_keyboard]],
one_time_keyboard=True,
resize_keyboard=True)
context.bot.send_message(chat_id=chat_id,
text=REQUEST_CONTACT,
reply_markup=reply_markup)
else:
claim_id = _api_claim(chat_data["token"], chat_data["name"], chat_data["contact"])
if claim_id is None:
context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR)
else:
context.bot.send_message(chat_id=chat_id, text=CLAIM_SUCCESS)
_update_chat_last_cid(chat_id, claim_id)
start(update, context)
def keyholder(update, context):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
if chat_data["token"] is None:
context.bot.send_message(chat_id=chat_id, text=NEED_PASS)
_update_chat_state(chat_id, "request_pw")
keyboard = [[InlineKeyboardButton(CHOICE_PASS_SEND, callback_data='send'),
InlineKeyboardButton(CHOICE_PASS_CANCEL, callback_data='cancel')]]
reply_markup = InlineKeyboardMarkup(keyboard)
context.bot.send_message(chat_id=chat_id, text=CHOICE, reply_markup=reply_markup)
else:
user_is_keyholder, keyholder = _api_request_keyholder(chat_data["token"],
chat_data["last_cid"])
if user_is_keyholder is None:
context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR)
if keyholder is None:
if user_is_keyholder:
context.bot.send_message(chat_id=chat_id, text=USER_HAS_KEY)
context.bot.send_message(chat_id=chat_id, text=USER_HAS_KEY_NEED_PASS)
else:
context.bot.send_message(chat_id=chat_id, text=NEED_PASS)
_update_chat_state(chat_id, "request_pw")
keyboard = [[InlineKeyboardButton(CHOICE_PASS_SEND, callback_data='send'),
InlineKeyboardButton(CHOICE_PASS_CANCEL, callback_data='cancel')]]
reply_markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text(CHOICE, reply_markup=reply_markup)
else:
keyholder_msg = _get_keyholder_msg(chat_data)
context.bot.send_message(chat_id=chat_id, text=keyholder_msg)
start(update, context)
def delete(update, context):
_delete(update, context, "delete")
def delete_all(update, context):
_delete(update, context, "delete_all")
def _delete(update, context, delete_scope):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
_update_chat_state(chat_id, delete_scope)
keyboard = [[InlineKeyboardButton(CHOICE_DELETE_YES, callback_data='yes'),
InlineKeyboardButton(CHOICE_DELETE_NO, callback_data='no')]]
reply_markup = InlineKeyboardMarkup(keyboard)
context.bot.send_message(chat_id=chat_id, text=CHOICE_DELETE_CONFIRM, reply_markup=reply_markup)
def onmsg(update, context):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
if chat_data["state"] == "wait_pw":
_update_chat_state(chat_id, "idle")
pw = update.message.text
token = _api_auth(pw)
if token is None:
context.bot.send_message(chat_id=chat_id, text=PASS_ERROR)
elif token is False:
context.bot.send_message(chat_id=chat_id, text=PASS_INVALID)
else:
_update_chat_token(chat_id, token)
chat_data["token"] = token
keyholder_msg = _get_keyholder_msg(chat_data)
context.bot.send_message(chat_id=chat_id, text=keyholder_msg)
update.message.delete()
start(update, context)
def oncallback(update, context):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
query = update.callback_query
for row in query.message.reply_markup["inline_keyboard"]:
for entry in row:
if entry["callback_data"] == query.data:
query.edit_message_text(text=YOU_CHOSE.format(entry["text"]))
if chat_data["state"] == "idle":
if query.data == "claim":
claim(update, context)
elif query.data == "keyholder":
keyholder(update, context)
elif chat_data["state"] == "request_pw":
if query.data == "send":
_update_chat_state(chat_id, "wait_pw")
context.bot.send_message(chat_id=chat_id, text=REQUEST_PASS)
else:
_update_chat_state(chat_id, "idle")
start(update, context)
elif chat_data["state"] == "delete":
_update_chat_state(chat_id, "idle")
if query.data == "yes":
_delete_contact(chat_id)
context.bot.send_message(chat_id=chat_id, text=DELETED_CONTACT)
start(update, context)
elif chat_data["state"] == "delete_all":
if query.data == "yes":
_update_chat_state(chat_id, "start")
_delete_all(chat_id)
context.bot.send_message(chat_id=chat_id, text=DELETED_ALL)
else:
_update_chat_state(chat_id, "idle")
def onimg(update, context):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
f = update.message.photo[-1].get_file()
f.download(QR_IMAGE_PATH)
result = _read_qr(QR_IMAGE_PATH)
if result is None:
context.bot.send_message(chat_id=chat_id, text=SCAN_FAILED)
else:
if result == EXPECTED_QR_CODE_VALUE:
token = QR_CODE_API_TOKEN
if "token" not in chat_data or chat_data["token"] is None:
_update_chat_token(chat_id, token)
_update_chat_state(chat_id, "idle")
context.bot.send_message(chat_id=chat_id, text=SCAN_SUCCESS)
claim(update, context)
else:
context.bot.send_message(chat_id=chat_id, text=SCAN_INVALID)
context.bot.send_message(chat_id=chat_id, text=result)
def oncontact(update, context):
chat_id = update.effective_chat.id
chat_data = _get_chat_data(chat_id)
if chat_data["state"] == "wait_contact":
_update_chat_state(chat_id, "idle")
contact = update.message.contact
name = "{} {}".format(contact.first_name, contact.last_name if contact.last_name is not None else "").strip()
#if "phone_number" in contact:
contact_str = contact.phone_number
#elif "username" in contact:
# contact_str = contact.username
#else:
# contact_str = contact.user_id
claim_id = _api_claim(chat_data["token"], name, contact_str)
if claim_id is None:
context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR)
else:
context.bot.send_message(chat_id=chat_id, text=CLAIM_SUCCESS)
_update_chat_last_cid(chat_id, claim_id)
if chat_data["name"] is None or chat_data["contact"] is None:
_update_chat_contact(chat_id, name, contact_str)
context.bot.send_message(chat_id=chat_id, text=CONTACT_STORED)
start(update, context)
def _init_db():
c = sqlite3.connect(DB_PATH)
c.execute("""
CREATE TABLE IF NOT EXISTS Chats (
Id INTEGER PRIMARY KEY,
ChatId INTEGER NOT NULL,
ChatState TEXT NOT NULL,
Token TEXT,
Name TEXT,
Contact TEXT,
LastCId TEXT
)
""")
def _get_chat_data(chat_id):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("""
SELECT ChatState, Token, Name, Contact, LastCId
FROM Chats WHERE ChatId=?
""", (chat_id,))
row = conn.fetchone()
if row is None:
default = {"state": "start"}
conn.execute("INSERT INTO Chats (ChatId, ChatState) VALUES (?,?)",
(chat_id, default["state"]))
c.commit()
return default
else:
return {
"state": row[0],
"token": row[1],
"name": row[2],
"contact": row[3],
"last_cid": row[4],
}
def _update_chat_state(chat_id, state):
_update_chat_data(chat_id, "ChatState", state)
def _update_chat_token(chat_id, token):
_update_chat_data(chat_id, "Token", token)
def _update_chat_contact(chat_id, name, contact):
_update_chat_data(chat_id, "Name", name)
_update_chat_data(chat_id, "Contact", contact)
def _update_chat_last_cid(chat_id, last_cid):
_update_chat_data(chat_id, "LastCId", last_cid)
def _update_chat_data(chat_id, column, data):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("UPDATE Chats SET " + column + "=? WHERE ChatId=?", (data, chat_id))
c.commit()
def _delete_contact(chat_id):
_update_chat_contact(chat_id, None, None)
def _delete_all(chat_id):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("DELETE FROM Chats WHERE ChatId=?", (chat_id,))
c.commit()
def _read_qr(path):
try:
return decode(Image.open(path))[-1].data.decode("utf-8")
except:
return None
def _get_keyholder_msg(chat_data):
user_is_keyholder, keyholder = _api_request_keyholder(chat_data["token"], chat_data["last_cid"])
if user_is_keyholder is None:
return INTERNAL_ERROR
elif user_is_keyholder == True:
return USER_HAS_KEY
else:
if keyholder is None or len(keyholder) == 0:
return KEYHOLDER_EMPTY
else:
return KEYHOLDER_TEMPLATE.format(keyholder[0]["name"],
keyholder[0]["contact"],
str(datetime.fromtimestamp(keyholder[0]["timestamp"])))
def _api_auth(pw):
r = requests.post(API_PATH + "/auth", data={"pass": pw})
if r.status_code == 200:
return r.text
elif r.status_code == 401:
return False
else:
return None
def _api_claim(token, name, contact):
r = requests.post(API_PATH + "/claim",
data={"name": name, "contact": contact},
headers={"X-Auth-Token": token})
if r.status_code == 200:
return r.text
else:
return None
def _api_request_keyholder(token, last_cid=None):
# The user might not be allowed to view who has the key, but with the last_cid
# it is at least possible to check whether it is the user or someone else
user_is_keyholder = False
if last_cid is not None:
r = requests.get(API_PATH + f"/status/{last_cid}")
if r.status_code == 200 and r.text == "latest":
user_is_keyholder = True
r = requests.get(API_PATH + "/keyholder", headers={"X-Auth-Token": token})
if r.status_code == 200:
return user_is_keyholder, json.loads(r.text)
elif r.status_code == 403:
return user_is_keyholder, None
else:
return None, None
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO)
_init_db()
updater = Updater(token=TELEGRAM_TOKEN, use_context=True)
dispatcher = updater.dispatcher
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(CommandHandler("help", show_help))
dispatcher.add_handler(CommandHandler("claim", claim))
dispatcher.add_handler(CommandHandler("keyholder", keyholder))
dispatcher.add_handler(CommandHandler("loeschen", delete))
dispatcher.add_handler(CommandHandler("allesloeschen", delete_all))
dispatcher.add_handler(MessageHandler(Filters.text, onmsg))
dispatcher.add_handler(MessageHandler(Filters.contact, oncontact))
dispatcher.add_handler(MessageHandler(Filters.photo, onimg))
dispatcher.add_handler(CallbackQueryHandler(oncallback))
updater.start_polling()