from datetime import datetime import sqlite3 import hashlib import secrets from flask import Flask, request, abort from flask.json import jsonify import configparser config = configparser.ConfigParser() config.read("config") PORT = config["Default"]["Port"] DB_PATH = config["Default"]["DbPath"] CORS_ORIGIN = None if config["Default"]["CorsOrigin"].lower() == "off" else config["Default"]["CorsOrigin"] KEYS = {} PASS_SALT = b"IgTp9iQH" # Static for now app = Flask("Schluesseltracker") @app.route("/auth", methods=["POST"]) def auth(): if "pass" in request.form: key_id, permissions = _check_token() hashed = hashlib.pbkdf2_hmac("sha256", request.form["pass"].encode("utf-8"), PASS_SALT, 10000).hex() if key_id in KEYS: if hashed == KEYS[key_id]["pass_hash"]: new_token = "{}:{}".format(key_id, secrets.token_hex(12)) _add_token(new_token, "claim,query") return new_token else: abort(401) else: abort(400) else: abort(400) @app.route("/claim", methods=["POST"]) def claim(): key_id, permissions = _check_token() if "claim" not in permissions: abort(403) if "name" not in request.form or "contact" not in request.form: abort(400) name = request.form["name"].strip() contact = request.form["contact"].strip() # These are arbitrary values but there to prevent sending an empty form if len(name) < 2 or len(contact) < 2: abort(400) claim_id = _add_claim(key_id, name, contact) return claim_id @app.route("/status/") def status(cid): return _get_claim_status(cid) @app.route("/keyname") def keyname(): key_id, permissions = _check_token() if key_id in KEYS: return KEYS[key_id]["name"] else: return "" @app.route("/keyholder") def keyholder(): key_id, permissions = _check_token() if "query" not in permissions: abort(403) return jsonify(_get_keyholder(key_id)) @app.after_request def add_header(response): if CORS_ORIGIN is not None: response.headers['Access-Control-Allow-Origin'] = CORS_ORIGIN response.headers['Access-Control-Allow-Headers'] = 'X-Auth-Token' return response def _init_db(): c = sqlite3.connect(DB_PATH) c.execute(""" CREATE TABLE IF NOT EXISTS History ( Id INTEGER PRIMARY KEY, KId TEXT NOT NULL, CId TEXT NOT NULL, Name TEXT NOT NULL, Contact TEXT NOT NULL, Timestamp INTEGER NOT NULL ) """) c.execute(""" CREATE TABLE IF NOT EXISTS Token ( Id INTEGER PRIMARY KEY, Token TEXT NOT NULL UNIQUE, Permissions TEXT NOT NULL, Timestamp INTEGER NOT NULL ) """) for key_id, key_data in KEYS.items(): c.execute(""" INSERT OR IGNORE INTO Token (Token, Permissions, Timestamp) VALUES (?,?,?) """, (f"{key_id}:{key_data['claim_token']}", "claim", datetime.now().timestamp())) c.commit() def _add_token(token, permissions): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute(""" INSERT INTO Token (Token, Permissions, Timestamp) VALUES (?,?,?) """, (token, permissions, datetime.now().timestamp())) c.commit() def _check_token(): if "X-Auth-Token" in request.headers: token = request.headers["X-Auth-Token"] else: abort(401) parts = token.split(":") if len(parts) != 2 or len(parts[0]) == 0: abort(400) c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute("SELECT Permissions FROM Token WHERE Token=?", (token,)) row = conn.fetchone() if row is None: return None, set() else: return parts[0], set(row[0].split(",")) def _add_claim(key_id, name, contact): claim_id = secrets.token_hex(8) c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute(""" INSERT INTO History (CId, KId, Name, Contact, Timestamp) VALUES (?,?,?,?,?) """, (claim_id, key_id, name, contact, datetime.now().timestamp())) c.commit() return claim_id def _get_claim_status(claim_id): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute("SELECT KId FROM History WHERE CId=?", (claim_id,)) row = conn.fetchone() if row is None: return "unknown" else: key_id = row[0] conn.execute("SELECT CId FROM History WHERE KId=? ORDER BY Timestamp DESC", (key_id,)) row = conn.fetchone() if row[0] == claim_id: return "latest" else: return "outdated" def _get_keyholder(key_id): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute(""" SELECT Name, Contact, Timestamp FROM History WHERE KId=? ORDER BY Timestamp DESC LIMIT 3 """, (key_id,)) # Timestamp precision is one minute keyholder = [{"name": row[0], "contact": row[1], "timestamp": int(row[2] // 60 * 60)} for row in conn] return keyholder if __name__ == "__main__": for key_id in config["Keys"]: parts = config["Keys"][key_id].split(";") if len(parts) != 3: print("Invalid key configuration") exit(1) KEYS[key_id] = {"name": parts[0], "claim_token": parts[1], "pass_hash": parts[2]} _init_db() app.run(port=PORT)