keytracker/server/server.py

210 lines
5.5 KiB
Python

from datetime import datetime
import sqlite3
import hashlib
import secrets
from flask import Flask, request, abort
from flask.json import jsonify
import configparser
config = configparser.ConfigParser()
config.read("config")
PORT = config["Default"]["Port"]
DB_PATH = config["Default"]["DbPath"]
CORS_ORIGIN = None if config["Default"]["CorsOrigin"].lower() == "off" else config["Default"]["CorsOrigin"]
KEYS = {}
PASS_SALT = b"IgTp9iQH" # Static for now
app = Flask("Schluesseltracker")
@app.route("/auth", methods=["POST"])
def auth():
if "pass" in request.form:
key_id, permissions = _check_token()
hashed = hashlib.pbkdf2_hmac("sha256",
request.form["pass"].encode("utf-8"),
PASS_SALT,
10000).hex()
if key_id in KEYS:
if hashed == KEYS[key_id]["pass_hash"]:
new_token = "{}:{}".format(key_id, secrets.token_hex(12))
_add_token(new_token, "claim,query")
return new_token
else:
abort(401)
else:
abort(400)
else:
abort(400)
@app.route("/claim", methods=["POST"])
def claim():
key_id, permissions = _check_token()
if "claim" not in permissions:
abort(403)
if "name" not in request.form or "contact" not in request.form:
abort(400)
name = request.form["name"].strip()
contact = request.form["contact"].strip()
# These are arbitrary values but there to prevent sending an empty form
if len(name) < 2 or len(contact) < 2:
abort(400)
claim_id = _add_claim(key_id, name, contact)
return claim_id
@app.route("/status/<cid>")
def status(cid):
return _get_claim_status(cid)
@app.route("/keyname")
def keyname():
key_id, permissions = _check_token()
if key_id in KEYS:
return KEYS[key_id]["name"]
else:
return ""
@app.route("/keyholder")
def keyholder():
key_id, permissions = _check_token()
if "query" not in permissions:
abort(403)
return jsonify(_get_keyholder(key_id))
@app.after_request
def add_header(response):
if CORS_ORIGIN is not None:
response.headers['Access-Control-Allow-Origin'] = CORS_ORIGIN
response.headers['Access-Control-Allow-Headers'] = 'X-Auth-Token'
return response
def _init_db():
c = sqlite3.connect(DB_PATH)
c.execute("""
CREATE TABLE IF NOT EXISTS History (
Id INTEGER PRIMARY KEY,
KId TEXT NOT NULL,
CId TEXT NOT NULL,
Name TEXT NOT NULL,
Contact TEXT NOT NULL,
Timestamp INTEGER NOT NULL
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS Token (
Id INTEGER PRIMARY KEY,
Token TEXT NOT NULL UNIQUE,
Permissions TEXT NOT NULL,
Timestamp INTEGER NOT NULL
)
""")
for key_id, key_data in KEYS.items():
c.execute("""
INSERT OR IGNORE INTO Token (Token, Permissions, Timestamp)
VALUES (?,?,?)
""", (f"{key_id}:{key_data['claim_token']}", "claim", datetime.now().timestamp()))
c.commit()
def _add_token(token, permissions):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("""
INSERT INTO Token (Token, Permissions, Timestamp)
VALUES (?,?,?)
""", (token, permissions, datetime.now().timestamp()))
c.commit()
def _check_token():
if "X-Auth-Token" in request.headers:
token = request.headers["X-Auth-Token"]
else:
abort(401)
parts = token.split(":")
if len(parts) != 2 or len(parts[0]) == 0:
abort(400)
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("SELECT Permissions FROM Token WHERE Token=?", (token,))
row = conn.fetchone()
if row is None:
return None, set()
else:
return parts[0], set(row[0].split(","))
def _add_claim(key_id, name, contact):
claim_id = secrets.token_hex(8)
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("""
INSERT INTO History (CId, KId, Name, Contact, Timestamp)
VALUES (?,?,?,?,?)
""", (claim_id, key_id, name, contact, datetime.now().timestamp()))
c.commit()
return claim_id
def _get_claim_status(claim_id):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("SELECT KId FROM History WHERE CId=?", (claim_id,))
row = conn.fetchone()
if row is None:
return "unknown"
else:
key_id = row[0]
conn.execute("SELECT CId FROM History WHERE KId=? ORDER BY Timestamp DESC", (key_id,))
row = conn.fetchone()
if row[0] == claim_id:
return "latest"
else:
return "outdated"
def _get_keyholder(key_id):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("""
SELECT Name, Contact, Timestamp FROM History
WHERE KId=?
ORDER BY Timestamp DESC LIMIT 3
""", (key_id,))
# Timestamp precision is one minute
keyholder = [{"name": row[0], "contact": row[1], "timestamp": int(row[2] // 60 * 60)}
for row in conn]
return keyholder
if __name__ == "__main__":
for key_id in config["Keys"]:
parts = config["Keys"][key_id].split(";")
if len(parts) != 3:
print("Invalid key configuration")
exit(1)
KEYS[key_id] = {"name": parts[0], "claim_token": parts[1], "pass_hash": parts[2]}
_init_db()
app.run(port=PORT)