from datetime import datetime import logging import sqlite3 import json from telegram import InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton, ReplyKeyboardMarkup from telegram.ext import Updater, CommandHandler, MessageHandler, CallbackQueryHandler, Filters from PIL import Image from pyzbar.pyzbar import decode import requests import configparser config = configparser.ConfigParser() config.read("config") API_PATH = config["Api"]["Url"] TELEGRAM_TOKEN = config["Telegram"]["Token"] QR_IMAGE_PATH = config["Default"]["QrImagePath"] DB_PATH = config["Default"]["DbPath"] EXPECTED_QR_CODE_VALUE = config["Default"]["QrValue"] QR_CODE_API_TOKEN = config["Default"]["QrApiToken"] # === MESSAGES === GREETING = """Hey! \U0001F917 Schick mir einfach ein Foto von dem QR Code auf dem Schlüssel. Um zu sehen wer gerade den Schlüssel hat tippe auf /keyholder Für eine Übersicht der Befehle tippe auf /help """ HELP = """Hier eine Übersicht über die Befehle: /start Mögliche Aktionen anzeigen /claim Den Schlüssel übernehmen /keyholder Anzeigen wer gerade den Schlüssel hat /loeschen Deine Kontaktdaten löschen (sofern gespeichert) /allesloeschen Alle zu dir gespeicherten Daten löschen """ SCAN_FAILED = "Der QR-Scan ist leider fehlgeschlagen \U0001F641. "\ "Bitte probiere es nochmal und melde dich falls es "\ "wiederholt nicht funktioniert.\n"\ "Du kannst ansonsten den Code auch mit einer anderen App öffnen "\ "und dem Link folgen." SCAN_INVALID = "Der QR-Code ist leider fehlerhaft" SCAN_SUCCESS = "Scan erfolgreich!" CLAIM_SUCCESS = "Perfekt! Der Schlüssel ist jetzt auf dich übertragen" USER_HAS_KEY = "Der Schlüssel ist aktuell auf dich eingetragen" USER_HAS_KEY_NEED_PASS = "Um mehr Details zu sehen musst du mir einmalig das Passwort "\ "dazu schicken" NEED_PASS = "Um zu sehen wer gerade den Schlüssel hat musst du mir einmalig "\ "das Password dazu schicken" CHOICE_PASS_SEND = "Ich hab das Passwort" CHOICE_PASS_CANCEL = "Abbrechen / Später" REQUEST_PASS = "Bitte schicke mir in der nächsten Nachricht das Passwort "\ "(Die Nachricht wird sofort danach wieder gelöscht)" PASS_ERROR = "Überprüfen des Passworts fehlgeschlagen" PASS_INVALID = "Das Passwort ist leider falsch" CHOICE_CLAIM = "Ich hab den Schlüssel" CHOICE_KEYHOLDER = "Wer hat den Schlüssel" CLAIM_NO_TOKEN = "Um den Schlüssel zu übernehmen musst du mindestens einmal ein "\ "Foto von dem QR Code gemacht haben oder das Passwort eingegeben" REQUEST_CONTACT = "Damit es möglich ist, dich zu kontaktieren, wenn du den Schlüssel "\ "hast, brauche ich einmalig deinen Kontakt" SEND_CONTACT = "Meinen Kontakt senden" CONTACT_STORED = "Ich habe deine Kontaktdaten für die Zukunft gespeichert. "\ "Wenn du die Daten, die zu dir gespeichert sind wieder löschen "\ "möchtest tippe /loeschen um nur den Kontakt zu löschen oder "\ "/allesloeschen um alle Daten zu löschen." CHOICE_DELETE_CONFIRM = "Wirklich löschen?" CHOICE_DELETE_YES = "Löschen" CHOICE_DELETE_NO = "Abbrechen" DELETED_CONTACT = "Deine Kontaktdaten wurden gelöscht" DELETED_ALL = "Ich habe alle Daten zu dir gelöscht" KEYHOLDER_EMPTY = "Keine Einträge" KEYHOLDER_TEMPLATE = "Zurzeit hat den Schlüssel: {} (Kontakt: {}, übernommen {})" CHOICE = "Bitte wähle aus" CHOICE_START = "Wähle eine Aktion aus" YOU_CHOSE = "Du hast ausgewählt: {}" INTERNAL_ERROR = "Tut mir Leid, leider ist ein interner Fehler aufgetreten" def start(update, context): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) if chat_data["state"] == "start": context.bot.send_message(chat_id=chat_id, text=GREETING) elif chat_data["state"] == "idle": keyboard = [[InlineKeyboardButton(CHOICE_CLAIM, callback_data='claim'), InlineKeyboardButton(CHOICE_KEYHOLDER, callback_data='keyholder')]] reply_markup = InlineKeyboardMarkup(keyboard) context.bot.send_message(chat_id=chat_id, text=CHOICE_START, reply_markup=reply_markup) def show_help(update, context): context.bot.send_message(chat_id=update.effective_chat.id, text=HELP) def claim(update, context): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) if chat_data["state"] == "idle": if chat_data["token"] is None: context.bot.send_message(chat_id=chat_id, text=CLAIM_NO_TOKEN) elif chat_data["name"] is None or chat_data["contact"] is None: _update_chat_state(chat_id, "wait_contact") contact_keyboard = KeyboardButton(text=SEND_CONTACT, request_contact=True) reply_markup = ReplyKeyboardMarkup([[contact_keyboard]], one_time_keyboard=True, resize_keyboard=True) context.bot.send_message(chat_id=chat_id, text=REQUEST_CONTACT, reply_markup=reply_markup) else: claim_id = _api_claim(chat_data["token"], chat_data["name"], chat_data["contact"]) if claim_id is None: context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR) else: context.bot.send_message(chat_id=chat_id, text=CLAIM_SUCCESS) _update_chat_last_cid(chat_id, claim_id) start(update, context) def keyholder(update, context): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) if chat_data["token"] is None: context.bot.send_message(chat_id=chat_id, text=NEED_PASS) _update_chat_state(chat_id, "request_pw") keyboard = [[InlineKeyboardButton(CHOICE_PASS_SEND, callback_data='send'), InlineKeyboardButton(CHOICE_PASS_CANCEL, callback_data='cancel')]] reply_markup = InlineKeyboardMarkup(keyboard) context.bot.send_message(chat_id=chat_id, text=CHOICE, reply_markup=reply_markup) else: user_is_keyholder, keyholder = _api_request_keyholder(chat_data["token"], chat_data["last_cid"]) if user_is_keyholder is None: context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR) if keyholder is None: if user_is_keyholder: context.bot.send_message(chat_id=chat_id, text=USER_HAS_KEY) context.bot.send_message(chat_id=chat_id, text=USER_HAS_KEY_NEED_PASS) else: context.bot.send_message(chat_id=chat_id, text=NEED_PASS) _update_chat_state(chat_id, "request_pw") keyboard = [[InlineKeyboardButton(CHOICE_PASS_SEND, callback_data='send'), InlineKeyboardButton(CHOICE_PASS_CANCEL, callback_data='cancel')]] reply_markup = InlineKeyboardMarkup(keyboard) update.message.reply_text(CHOICE, reply_markup=reply_markup) else: keyholder_msg = _get_keyholder_msg(chat_data) context.bot.send_message(chat_id=chat_id, text=keyholder_msg) start(update, context) def delete(update, context): _delete(update, context, "delete") def delete_all(update, context): _delete(update, context, "delete_all") def _delete(update, context, delete_scope): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) _update_chat_state(chat_id, delete_scope) keyboard = [[InlineKeyboardButton(CHOICE_DELETE_YES, callback_data='yes'), InlineKeyboardButton(CHOICE_DELETE_NO, callback_data='no')]] reply_markup = InlineKeyboardMarkup(keyboard) context.bot.send_message(chat_id=chat_id, text=CHOICE_DELETE_CONFIRM, reply_markup=reply_markup) def onmsg(update, context): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) if chat_data["state"] == "wait_pw": _update_chat_state(chat_id, "idle") pw = update.message.text token = _api_auth(pw) if token is None: context.bot.send_message(chat_id=chat_id, text=PASS_ERROR) elif token is False: context.bot.send_message(chat_id=chat_id, text=PASS_INVALID) else: _update_chat_token(chat_id, token) chat_data["token"] = token keyholder_msg = _get_keyholder_msg(chat_data) context.bot.send_message(chat_id=chat_id, text=keyholder_msg) update.message.delete() start(update, context) def oncallback(update, context): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) query = update.callback_query for row in query.message.reply_markup["inline_keyboard"]: for entry in row: if entry["callback_data"] == query.data: query.edit_message_text(text=YOU_CHOSE.format(entry["text"])) if chat_data["state"] == "idle": if query.data == "claim": claim(update, context) elif query.data == "keyholder": keyholder(update, context) elif chat_data["state"] == "request_pw": if query.data == "send": _update_chat_state(chat_id, "wait_pw") context.bot.send_message(chat_id=chat_id, text=REQUEST_PASS) else: _update_chat_state(chat_id, "idle") start(update, context) elif chat_data["state"] == "delete": _update_chat_state(chat_id, "idle") if query.data == "yes": _delete_contact(chat_id) context.bot.send_message(chat_id=chat_id, text=DELETED_CONTACT) start(update, context) elif chat_data["state"] == "delete_all": if query.data == "yes": _update_chat_state(chat_id, "start") _delete_all(chat_id) context.bot.send_message(chat_id=chat_id, text=DELETED_ALL) else: _update_chat_state(chat_id, "idle") def onimg(update, context): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) f = update.message.photo[-1].get_file() f.download(QR_IMAGE_PATH) result = _read_qr(QR_IMAGE_PATH) if result is None: context.bot.send_message(chat_id=chat_id, text=SCAN_FAILED) else: if result == EXPECTED_QR_CODE_VALUE: token = QR_CODE_API_TOKEN if "token" not in chat_data or chat_data["token"] is None: _update_chat_token(chat_id, token) _update_chat_state(chat_id, "idle") context.bot.send_message(chat_id=chat_id, text=SCAN_SUCCESS) claim(update, context) else: context.bot.send_message(chat_id=chat_id, text=SCAN_INVALID) context.bot.send_message(chat_id=chat_id, text=result) def oncontact(update, context): chat_id = update.effective_chat.id chat_data = _get_chat_data(chat_id) if chat_data["state"] == "wait_contact": _update_chat_state(chat_id, "idle") contact = update.message.contact name = "{} {}".format(contact.first_name, contact.last_name if contact.last_name is not None else "").strip() #if "phone_number" in contact: contact_str = contact.phone_number #elif "username" in contact: # contact_str = contact.username #else: # contact_str = contact.user_id claim_id = _api_claim(chat_data["token"], name, contact_str) if claim_id is None: context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR) else: context.bot.send_message(chat_id=chat_id, text=CLAIM_SUCCESS) _update_chat_last_cid(chat_id, claim_id) if chat_data["name"] is None or chat_data["contact"] is None: _update_chat_contact(chat_id, name, contact_str) context.bot.send_message(chat_id=chat_id, text=CONTACT_STORED) start(update, context) def _init_db(): c = sqlite3.connect(DB_PATH) c.execute(""" CREATE TABLE IF NOT EXISTS Chats ( Id INTEGER PRIMARY KEY, ChatId INTEGER NOT NULL, ChatState TEXT NOT NULL, Token TEXT, Name TEXT, Contact TEXT, LastCId TEXT ) """) def _get_chat_data(chat_id): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute(""" SELECT ChatState, Token, Name, Contact, LastCId FROM Chats WHERE ChatId=? """, (chat_id,)) row = conn.fetchone() if row is None: default = {"state": "start"} conn.execute("INSERT INTO Chats (ChatId, ChatState) VALUES (?,?)", (chat_id, default["state"])) c.commit() return default else: return { "state": row[0], "token": row[1], "name": row[2], "contact": row[3], "last_cid": row[4], } def _update_chat_state(chat_id, state): _update_chat_data(chat_id, "ChatState", state) def _update_chat_token(chat_id, token): _update_chat_data(chat_id, "Token", token) def _update_chat_contact(chat_id, name, contact): _update_chat_data(chat_id, "Name", name) _update_chat_data(chat_id, "Contact", contact) def _update_chat_last_cid(chat_id, last_cid): _update_chat_data(chat_id, "LastCId", last_cid) def _update_chat_data(chat_id, column, data): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute("UPDATE Chats SET " + column + "=? WHERE ChatId=?", (data, chat_id)) c.commit() def _delete_contact(chat_id): _update_chat_contact(chat_id, None, None) def _delete_all(chat_id): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute("DELETE FROM Chats WHERE ChatId=?", (chat_id,)) c.commit() def _read_qr(path): try: return decode(Image.open(path))[-1].data.decode("utf-8") except: return None def _get_keyholder_msg(chat_data): user_is_keyholder, keyholder = _api_request_keyholder(chat_data["token"], chat_data["last_cid"]) if user_is_keyholder is None: return INTERNAL_ERROR elif user_is_keyholder == True: return USER_HAS_KEY else: if keyholder is None or len(keyholder) == 0: return KEYHOLDER_EMPTY else: return KEYHOLDER_TEMPLATE.format(keyholder[0]["name"], keyholder[0]["contact"], str(datetime.fromtimestamp(keyholder[0]["timestamp"]))) def _api_auth(pw): r = requests.post(API_PATH + "/auth", data={"pass": pw}) if r.status_code == 200: return r.text elif r.status_code == 401: return False else: return None def _api_claim(token, name, contact): r = requests.post(API_PATH + "/claim", data={"name": name, "contact": contact}, headers={"X-Auth-Token": token}) if r.status_code == 200: return r.text else: return None def _api_request_keyholder(token, last_cid=None): # The user might not be allowed to view who has the key, but with the last_cid # it is at least possible to check whether it is the user or someone else user_is_keyholder = False if last_cid is not None: r = requests.get(API_PATH + f"/status/{last_cid}") if r.status_code == 200 and r.text == "latest": user_is_keyholder = True r = requests.get(API_PATH + "/keyholder", headers={"X-Auth-Token": token}) if r.status_code == 200: return user_is_keyholder, json.loads(r.text) elif r.status_code == 403: return user_is_keyholder, None else: return None, None if __name__ == "__main__": logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) _init_db() updater = Updater(token=TELEGRAM_TOKEN, use_context=True) dispatcher = updater.dispatcher dispatcher.add_handler(CommandHandler("start", start)) dispatcher.add_handler(CommandHandler("help", show_help)) dispatcher.add_handler(CommandHandler("claim", claim)) dispatcher.add_handler(CommandHandler("keyholder", keyholder)) dispatcher.add_handler(CommandHandler("loeschen", delete)) dispatcher.add_handler(CommandHandler("allesloeschen", delete_all)) dispatcher.add_handler(MessageHandler(Filters.text, onmsg)) dispatcher.add_handler(MessageHandler(Filters.contact, oncontact)) dispatcher.add_handler(MessageHandler(Filters.photo, onimg)) dispatcher.add_handler(CallbackQueryHandler(oncallback)) updater.start_polling()