Add Telegram bot (Still needs a lot of cleaning/refactoring)
This commit is contained in:
parent
f98437c636
commit
c3fd3beaea
1 changed files with 440 additions and 0 deletions
440
telegram/telegram_bot.py
Normal file
440
telegram/telegram_bot.py
Normal file
|
@ -0,0 +1,440 @@
|
|||
from datetime import datetime
|
||||
import logging
|
||||
import sqlite3
|
||||
import json
|
||||
|
||||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton, ReplyKeyboardMarkup
|
||||
from telegram.ext import Updater, CommandHandler, MessageHandler, CallbackQueryHandler, Filters
|
||||
from PIL import Image
|
||||
from pyzbar.pyzbar import decode
|
||||
import requests
|
||||
|
||||
import configparser
|
||||
config = configparser.ConfigParser()
|
||||
config.read("config")
|
||||
|
||||
API_PATH = config["Api"]["Url"]
|
||||
TELEGRAM_TOKEN = config["Telegram"]["Token"]
|
||||
QR_IMAGE_PATH = config["Default"]["QrImagePath"]
|
||||
DB_PATH = config["Default"]["DbPath"]
|
||||
EXPECTED_QR_CODE_VALUE = config["Default"]["QrValue"]
|
||||
QR_CODE_API_TOKEN = config["Default"]["QrApiToken"]
|
||||
|
||||
|
||||
# === MESSAGES ===
|
||||
GREETING = """Hey! \U0001F917
|
||||
Schick mir einfach ein Foto von dem QR Code auf dem Schlüssel.
|
||||
Um zu sehen wer gerade den Schlüssel hat tippe auf /keyholder
|
||||
Für eine Übersicht der Befehle tippe auf /help
|
||||
"""
|
||||
|
||||
HELP = """Hier eine Übersicht über die Befehle:
|
||||
/start Mögliche Aktionen anzeigen
|
||||
/claim Den Schlüssel übernehmen
|
||||
/keyholder Anzeigen wer gerade den Schlüssel hat
|
||||
/loeschen Deine Kontaktdaten löschen (sofern gespeichert)
|
||||
/allesloeschen Alle zu dir gespeicherten Daten löschen
|
||||
"""
|
||||
|
||||
SCAN_FAILED = "Der QR-Scan ist leider fehlgeschlagen \U0001F641. "\
|
||||
"Bitte probiere es nochmal und melde dich falls es "\
|
||||
"wiederholt nicht funktioniert.\n"\
|
||||
"Du kannst ansonsten den Code auch mit einer anderen App öffnen "\
|
||||
"und dem Link folgen."
|
||||
SCAN_INVALID = "Der QR-Code ist leider fehlerhaft"
|
||||
SCAN_SUCCESS = "Scan erfolgreich!"
|
||||
|
||||
CLAIM_SUCCESS = "Perfekt! Der Schlüssel ist jetzt auf dich übertragen"
|
||||
|
||||
USER_HAS_KEY = "Der Schlüssel ist aktuell auf dich eingetragen"
|
||||
USER_HAS_KEY_NEED_PASS = "Um mehr Details zu sehen musst du mir einmalig das Passwort "\
|
||||
"dazu schicken"
|
||||
NEED_PASS = "Um zu sehen wer gerade den Schlüssel hat musst du mir einmalig "\
|
||||
"das Password dazu schicken"
|
||||
CHOICE_PASS_SEND = "Ich hab das Passwort"
|
||||
CHOICE_PASS_CANCEL = "Abbrechen / Später"
|
||||
REQUEST_PASS = "Bitte schicke mir in der nächsten Nachricht das Passwort "\
|
||||
"(Die Nachricht wird sofort danach wieder gelöscht)"
|
||||
PASS_ERROR = "Überprüfen des Passworts fehlgeschlagen"
|
||||
PASS_INVALID = "Das Passwort ist leider falsch"
|
||||
|
||||
CHOICE_CLAIM = "Ich hab den Schlüssel"
|
||||
CHOICE_KEYHOLDER = "Wer hat den Schlüssel"
|
||||
|
||||
CLAIM_NO_TOKEN = "Um den Schlüssel zu übernehmen musst du mindestens einmal ein "\
|
||||
"Foto von dem QR Code gemacht haben oder das Passwort eingegeben"
|
||||
REQUEST_CONTACT = "Damit es möglich ist, dich zu kontaktieren, wenn du den Schlüssel "\
|
||||
"hast, brauche ich einmalig deinen Kontakt"
|
||||
SEND_CONTACT = "Meinen Kontakt senden"
|
||||
CONTACT_STORED = "Ich habe deine Kontaktdaten für die Zukunft gespeichert. "\
|
||||
"Wenn du die Daten, die zu dir gespeichert sind wieder löschen "\
|
||||
"möchtest tippe /loeschen um nur den Kontakt zu löschen oder "\
|
||||
"/allesloeschen um alle Daten zu löschen."
|
||||
CHOICE_DELETE_CONFIRM = "Wirklich löschen?"
|
||||
CHOICE_DELETE_YES = "Löschen"
|
||||
CHOICE_DELETE_NO = "Abbrechen"
|
||||
DELETED_CONTACT = "Deine Kontaktdaten wurden gelöscht"
|
||||
DELETED_ALL = "Ich habe alle Daten zu dir gelöscht"
|
||||
|
||||
KEYHOLDER_EMPTY = "Keine Einträge"
|
||||
KEYHOLDER_TEMPLATE = "Zurzeit hat den Schlüssel: {} (Kontakt: {}, übernommen {})"
|
||||
|
||||
CHOICE = "Bitte wähle aus"
|
||||
CHOICE_START = "Wähle eine Aktion aus"
|
||||
YOU_CHOSE = "Du hast ausgewählt: {}"
|
||||
INTERNAL_ERROR = "Tut mir Leid, leider ist ein interner Fehler aufgetreten"
|
||||
|
||||
|
||||
def start(update, context):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
if chat_data["state"] == "start":
|
||||
context.bot.send_message(chat_id=chat_id, text=GREETING)
|
||||
elif chat_data["state"] == "idle":
|
||||
keyboard = [[InlineKeyboardButton(CHOICE_CLAIM, callback_data='claim'),
|
||||
InlineKeyboardButton(CHOICE_KEYHOLDER, callback_data='keyholder')]]
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
context.bot.send_message(chat_id=chat_id, text=CHOICE_START, reply_markup=reply_markup)
|
||||
|
||||
|
||||
def show_help(update, context):
|
||||
context.bot.send_message(chat_id=update.effective_chat.id, text=HELP)
|
||||
|
||||
|
||||
def claim(update, context):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
if chat_data["state"] == "idle":
|
||||
if chat_data["token"] is None:
|
||||
context.bot.send_message(chat_id=chat_id, text=CLAIM_NO_TOKEN)
|
||||
elif chat_data["name"] is None or chat_data["contact"] is None:
|
||||
_update_chat_state(chat_id, "wait_contact")
|
||||
|
||||
contact_keyboard = KeyboardButton(text=SEND_CONTACT, request_contact=True)
|
||||
reply_markup = ReplyKeyboardMarkup([[contact_keyboard]],
|
||||
one_time_keyboard=True,
|
||||
resize_keyboard=True)
|
||||
context.bot.send_message(chat_id=chat_id,
|
||||
text=REQUEST_CONTACT,
|
||||
reply_markup=reply_markup)
|
||||
|
||||
else:
|
||||
claim_id = _api_claim(chat_data["token"], chat_data["name"], chat_data["contact"])
|
||||
if claim_id is None:
|
||||
context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR)
|
||||
else:
|
||||
context.bot.send_message(chat_id=chat_id, text=CLAIM_SUCCESS)
|
||||
_update_chat_last_cid(chat_id, claim_id)
|
||||
start(update, context)
|
||||
|
||||
|
||||
|
||||
def keyholder(update, context):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
if chat_data["token"] is None:
|
||||
context.bot.send_message(chat_id=chat_id, text=NEED_PASS)
|
||||
_update_chat_state(chat_id, "request_pw")
|
||||
keyboard = [[InlineKeyboardButton(CHOICE_PASS_SEND, callback_data='send'),
|
||||
InlineKeyboardButton(CHOICE_PASS_CANCEL, callback_data='cancel')]]
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
context.bot.send_message(chat_id=chat_id, text=CHOICE, reply_markup=reply_markup)
|
||||
else:
|
||||
user_is_keyholder, keyholder = _api_request_keyholder(chat_data["token"],
|
||||
chat_data["last_cid"])
|
||||
if user_is_keyholder is None:
|
||||
context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR)
|
||||
|
||||
if keyholder is None:
|
||||
if user_is_keyholder:
|
||||
context.bot.send_message(chat_id=chat_id, text=USER_HAS_KEY)
|
||||
context.bot.send_message(chat_id=chat_id, text=USER_HAS_KEY_NEED_PASS)
|
||||
else:
|
||||
context.bot.send_message(chat_id=chat_id, text=NEED_PASS)
|
||||
_update_chat_state(chat_id, "request_pw")
|
||||
keyboard = [[InlineKeyboardButton(CHOICE_PASS_SEND, callback_data='send'),
|
||||
InlineKeyboardButton(CHOICE_PASS_CANCEL, callback_data='cancel')]]
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
update.message.reply_text(CHOICE, reply_markup=reply_markup)
|
||||
else:
|
||||
keyholder_msg = _get_keyholder_msg(chat_data)
|
||||
context.bot.send_message(chat_id=chat_id, text=keyholder_msg)
|
||||
start(update, context)
|
||||
|
||||
|
||||
def delete(update, context):
|
||||
_delete(update, context, "delete")
|
||||
|
||||
def delete_all(update, context):
|
||||
_delete(update, context, "delete_all")
|
||||
|
||||
def _delete(update, context, delete_scope):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
|
||||
_update_chat_state(chat_id, delete_scope)
|
||||
keyboard = [[InlineKeyboardButton(CHOICE_DELETE_YES, callback_data='yes'),
|
||||
InlineKeyboardButton(CHOICE_DELETE_NO, callback_data='no')]]
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
context.bot.send_message(chat_id=chat_id, text=CHOICE_DELETE_CONFIRM, reply_markup=reply_markup)
|
||||
|
||||
|
||||
def onmsg(update, context):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
|
||||
if chat_data["state"] == "wait_pw":
|
||||
_update_chat_state(chat_id, "idle")
|
||||
pw = update.message.text
|
||||
token = _api_auth(pw)
|
||||
if token is None:
|
||||
context.bot.send_message(chat_id=chat_id, text=PASS_ERROR)
|
||||
elif token is False:
|
||||
context.bot.send_message(chat_id=chat_id, text=PASS_INVALID)
|
||||
else:
|
||||
_update_chat_token(chat_id, token)
|
||||
chat_data["token"] = token
|
||||
keyholder_msg = _get_keyholder_msg(chat_data)
|
||||
context.bot.send_message(chat_id=chat_id, text=keyholder_msg)
|
||||
|
||||
update.message.delete()
|
||||
start(update, context)
|
||||
|
||||
|
||||
def oncallback(update, context):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
query = update.callback_query
|
||||
|
||||
for row in query.message.reply_markup["inline_keyboard"]:
|
||||
for entry in row:
|
||||
if entry["callback_data"] == query.data:
|
||||
query.edit_message_text(text=YOU_CHOSE.format(entry["text"]))
|
||||
|
||||
if chat_data["state"] == "idle":
|
||||
if query.data == "claim":
|
||||
claim(update, context)
|
||||
elif query.data == "keyholder":
|
||||
keyholder(update, context)
|
||||
elif chat_data["state"] == "request_pw":
|
||||
if query.data == "send":
|
||||
_update_chat_state(chat_id, "wait_pw")
|
||||
context.bot.send_message(chat_id=chat_id, text=REQUEST_PASS)
|
||||
else:
|
||||
_update_chat_state(chat_id, "idle")
|
||||
start(update, context)
|
||||
elif chat_data["state"] == "delete":
|
||||
_update_chat_state(chat_id, "idle")
|
||||
if query.data == "yes":
|
||||
_delete_contact(chat_id)
|
||||
context.bot.send_message(chat_id=chat_id, text=DELETED_CONTACT)
|
||||
start(update, context)
|
||||
elif chat_data["state"] == "delete_all":
|
||||
if query.data == "yes":
|
||||
_update_chat_state(chat_id, "start")
|
||||
_delete_all(chat_id)
|
||||
context.bot.send_message(chat_id=chat_id, text=DELETED_ALL)
|
||||
else:
|
||||
_update_chat_state(chat_id, "idle")
|
||||
|
||||
|
||||
|
||||
def onimg(update, context):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
|
||||
f = update.message.photo[-1].get_file()
|
||||
f.download(QR_IMAGE_PATH)
|
||||
result = _read_qr(QR_IMAGE_PATH)
|
||||
|
||||
if result is None:
|
||||
context.bot.send_message(chat_id=chat_id, text=SCAN_FAILED)
|
||||
else:
|
||||
if result == EXPECTED_QR_CODE_VALUE:
|
||||
token = QR_CODE_API_TOKEN
|
||||
if "token" not in chat_data or chat_data["token"] is None:
|
||||
_update_chat_token(chat_id, token)
|
||||
_update_chat_state(chat_id, "idle")
|
||||
context.bot.send_message(chat_id=chat_id, text=SCAN_SUCCESS)
|
||||
claim(update, context)
|
||||
else:
|
||||
context.bot.send_message(chat_id=chat_id, text=SCAN_INVALID)
|
||||
context.bot.send_message(chat_id=chat_id, text=result)
|
||||
|
||||
|
||||
def oncontact(update, context):
|
||||
chat_id = update.effective_chat.id
|
||||
chat_data = _get_chat_data(chat_id)
|
||||
|
||||
if chat_data["state"] == "wait_contact":
|
||||
_update_chat_state(chat_id, "idle")
|
||||
contact = update.message.contact
|
||||
name = "{} {}".format(contact.first_name, contact.last_name if contact.last_name is not None else "").strip()
|
||||
#if "phone_number" in contact:
|
||||
contact_str = contact.phone_number
|
||||
#elif "username" in contact:
|
||||
# contact_str = contact.username
|
||||
#else:
|
||||
# contact_str = contact.user_id
|
||||
|
||||
claim_id = _api_claim(chat_data["token"], name, contact_str)
|
||||
if claim_id is None:
|
||||
context.bot.send_message(chat_id=chat_id, text=INTERNAL_ERROR)
|
||||
else:
|
||||
context.bot.send_message(chat_id=chat_id, text=CLAIM_SUCCESS)
|
||||
_update_chat_last_cid(chat_id, claim_id)
|
||||
|
||||
if chat_data["name"] is None or chat_data["contact"] is None:
|
||||
_update_chat_contact(chat_id, name, contact_str)
|
||||
context.bot.send_message(chat_id=chat_id, text=CONTACT_STORED)
|
||||
start(update, context)
|
||||
|
||||
|
||||
|
||||
def _init_db():
|
||||
c = sqlite3.connect(DB_PATH)
|
||||
c.execute("""
|
||||
CREATE TABLE IF NOT EXISTS Chats (
|
||||
Id INTEGER PRIMARY KEY,
|
||||
ChatId INTEGER NOT NULL,
|
||||
ChatState TEXT NOT NULL,
|
||||
Token TEXT,
|
||||
Name TEXT,
|
||||
Contact TEXT,
|
||||
LastCId TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
|
||||
def _get_chat_data(chat_id):
|
||||
c = sqlite3.connect(DB_PATH)
|
||||
conn = c.cursor()
|
||||
conn.execute("""
|
||||
SELECT ChatState, Token, Name, Contact, LastCId
|
||||
FROM Chats WHERE ChatId=?
|
||||
""", (chat_id,))
|
||||
row = conn.fetchone()
|
||||
if row is None:
|
||||
default = {"state": "start"}
|
||||
conn.execute("INSERT INTO Chats (ChatId, ChatState) VALUES (?,?)",
|
||||
(chat_id, default["state"]))
|
||||
c.commit()
|
||||
return default
|
||||
else:
|
||||
return {
|
||||
"state": row[0],
|
||||
"token": row[1],
|
||||
"name": row[2],
|
||||
"contact": row[3],
|
||||
"last_cid": row[4],
|
||||
}
|
||||
|
||||
|
||||
def _update_chat_state(chat_id, state):
|
||||
_update_chat_data(chat_id, "ChatState", state)
|
||||
|
||||
def _update_chat_token(chat_id, token):
|
||||
_update_chat_data(chat_id, "Token", token)
|
||||
|
||||
def _update_chat_contact(chat_id, name, contact):
|
||||
_update_chat_data(chat_id, "Name", name)
|
||||
_update_chat_data(chat_id, "Contact", contact)
|
||||
|
||||
def _update_chat_last_cid(chat_id, last_cid):
|
||||
_update_chat_data(chat_id, "LastCId", last_cid)
|
||||
|
||||
def _update_chat_data(chat_id, column, data):
|
||||
c = sqlite3.connect(DB_PATH)
|
||||
conn = c.cursor()
|
||||
conn.execute("UPDATE Chats SET " + column + "=? WHERE ChatId=?", (data, chat_id))
|
||||
c.commit()
|
||||
|
||||
def _delete_contact(chat_id):
|
||||
_update_chat_contact(chat_id, None, None)
|
||||
|
||||
def _delete_all(chat_id):
|
||||
c = sqlite3.connect(DB_PATH)
|
||||
conn = c.cursor()
|
||||
conn.execute("DELETE FROM Chats WHERE ChatId=?", (chat_id,))
|
||||
c.commit()
|
||||
|
||||
|
||||
def _read_qr(path):
|
||||
try:
|
||||
return decode(Image.open(path))[-1].data.decode("utf-8")
|
||||
except:
|
||||
return None
|
||||
|
||||
def _get_keyholder_msg(chat_data):
|
||||
user_is_keyholder, keyholder = _api_request_keyholder(chat_data["token"], chat_data["last_cid"])
|
||||
if user_is_keyholder is None:
|
||||
return INTERNAL_ERROR
|
||||
elif user_is_keyholder == True:
|
||||
return USER_HAS_KEY
|
||||
else:
|
||||
if keyholder is None or len(keyholder) == 0:
|
||||
return KEYHOLDER_EMPTY
|
||||
else:
|
||||
return KEYHOLDER_TEMPLATE.format(keyholder[0]["name"],
|
||||
keyholder[0]["contact"],
|
||||
str(datetime.fromtimestamp(keyholder[0]["timestamp"])))
|
||||
|
||||
|
||||
def _api_auth(pw):
|
||||
r = requests.post(API_PATH + "/auth", data={"pass": pw})
|
||||
if r.status_code == 200:
|
||||
return r.text
|
||||
elif r.status_code == 401:
|
||||
return False
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def _api_claim(token, name, contact):
|
||||
r = requests.post(API_PATH + "/claim",
|
||||
data={"name": name, "contact": contact},
|
||||
headers={"X-Auth-Token": token})
|
||||
if r.status_code == 200:
|
||||
return r.text
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def _api_request_keyholder(token, last_cid=None):
|
||||
# The user might not be allowed to view who has the key, but with the last_cid
|
||||
# it is at least possible to check whether it is the user or someone else
|
||||
user_is_keyholder = False
|
||||
if last_cid is not None:
|
||||
r = requests.get(API_PATH + f"/status/{last_cid}")
|
||||
if r.status_code == 200 and r.text == "latest":
|
||||
user_is_keyholder = True
|
||||
|
||||
r = requests.get(API_PATH + "/keyholder", headers={"X-Auth-Token": token})
|
||||
if r.status_code == 200:
|
||||
return user_is_keyholder, json.loads(r.text)
|
||||
elif r.status_code == 403:
|
||||
return user_is_keyholder, None
|
||||
else:
|
||||
return None, None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
level=logging.INFO)
|
||||
_init_db()
|
||||
|
||||
updater = Updater(token=TELEGRAM_TOKEN, use_context=True)
|
||||
dispatcher = updater.dispatcher
|
||||
|
||||
dispatcher.add_handler(CommandHandler("start", start))
|
||||
dispatcher.add_handler(CommandHandler("help", show_help))
|
||||
dispatcher.add_handler(CommandHandler("claim", claim))
|
||||
dispatcher.add_handler(CommandHandler("keyholder", keyholder))
|
||||
dispatcher.add_handler(CommandHandler("loeschen", delete))
|
||||
dispatcher.add_handler(CommandHandler("allesloeschen", delete_all))
|
||||
dispatcher.add_handler(MessageHandler(Filters.text, onmsg))
|
||||
dispatcher.add_handler(MessageHandler(Filters.contact, oncontact))
|
||||
dispatcher.add_handler(MessageHandler(Filters.photo, onimg))
|
||||
dispatcher.add_handler(CallbackQueryHandler(oncallback))
|
||||
|
||||
updater.start_polling()
|
Loading…
Reference in a new issue