from datetime import datetime import sqlite3 import hashlib import secrets from flask import Flask, request, abort from flask.json import jsonify DB_PATH = "history.db" PASS_FILE = "pass" PASS_SALT = b"IgTp9iQH" CLAIM_TOKEN_FILE = "claim_token" CORS_ORIGIN = "*" app = Flask("Schluesselverfolgung") @app.route("/auth", methods=["POST"]) def auth(): if "pass" in request.form: hashed = hashlib.pbkdf2_hmac("sha512", request.form["pass"].encode("utf-8"), PASS_SALT, 10000) with open(PASS_FILE, "rb") as f: if hashed == f.read(): new_token = secrets.token_hex(16) _add_token(new_token, "claim,query") return new_token else: abort(401) else: abort(400) @app.route("/claim", methods=["POST"]) def claim(): permissions = _check_token() if "claim" not in permissions: abort(403) if "name" not in request.form or "contact" not in request.form: abort(400) name = request.form["name"].strip() contact = request.form["contact"].strip() # These are arbitrary values but there to prevent sending an empty form if len(name) < 3 or len(contact) < 3: abort(400) claim_id = _add_claim(name, contact) return claim_id @app.route("/status/") def status(cid): return _get_claim_status(cid) @app.route("/keyholder") def keyholder(): permissions = _check_token() if "query" not in permissions: abort(403) return jsonify(_get_keyholder()) @app.after_request def add_header(response): if CORS_ORIGIN is not None: response.headers['Access-Control-Allow-Origin'] = CORS_ORIGIN response.headers['Access-Control-Allow-Headers'] = 'X-Auth-Token' return response def _init_db(): c = sqlite3.connect(DB_PATH) c.execute(""" CREATE TABLE IF NOT EXISTS History ( Id INTEGER PRIMARY KEY, CId TEXT NOT NULL, Name TEXT NOT NULL, Contact TEXT NOT NULL, Timestamp INTEGER NOT NULL ) """) c.execute(""" CREATE TABLE IF NOT EXISTS Token ( Id INTEGER PRIMARY KEY, Token TEXT NOT NULL UNIQUE, Permissions TEXT NOT NULL, Timestamp INTEGER NOT NULL ) """) with open(CLAIM_TOKEN_FILE) as f: c.execute(""" INSERT OR IGNORE INTO Token (Token, Permissions, Timestamp) VALUES (?,?,?) """, (f.read(), "claim", datetime.now().timestamp())) c.commit() def _add_token(token, permissions): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute(""" INSERT INTO Token (Token, Permissions, Timestamp) VALUES (?,?,?) """, (token, permissions, datetime.now().timestamp())) c.commit() def _check_token(): if "X-Auth-Token" in request.headers: token = request.headers["X-Auth-Token"] else: abort(401) c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute("SELECT Permissions FROM Token WHERE Token=?", (token,)) row = conn.fetchone() if row is None: return set() else: return set(row[0].split(",")) def _add_claim(name, contact): claim_id = secrets.token_hex(8) c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute(""" INSERT INTO History (CId, Name, Contact, Timestamp) VALUES (?,?,?,?) """, (claim_id, name, contact, datetime.now().timestamp())) c.commit() return claim_id def _get_claim_status(claim_id): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute("SELECT CId FROM History ORDER BY Timestamp DESC") row = conn.fetchone() if row is None: return "unknown" else: if row[0] == claim_id: return "latest" else: return "outdated" def _get_keyholder(): c = sqlite3.connect(DB_PATH) conn = c.cursor() conn.execute(""" SELECT Name, Contact, Timestamp FROM History ORDER BY Timestamp DESC LIMIT 3 """) # Timestamp precision is one minute keyholder = [{"name": row[0], "contact": row[1], "timestamp": int(row[2] // 60 * 60)} for row in conn] return keyholder if __name__ == "__main__": _init_db() app.run()