keytracker/server/server.py
2020-03-09 21:14:31 +01:00

164 lines
4 KiB
Python

from datetime import datetime
import sqlite3
import hashlib
import secrets
from flask import Flask, request, abort
from flask.json import jsonify
DB_PATH = "history.db"
PASS_FILE = "pass"
PASS_SALT = b"IgTp9iQH"
CLAIM_TOKEN_FILE = "claim_token"
app = Flask("Schluesselverfolgung")
@app.route("/auth", methods=["POST"])
def auth():
if "pass" in request.form:
hashed = hashlib.pbkdf2_hmac("sha512",
request.form["pass"].encode("utf-8"),
PASS_SALT,
10000)
with open(PASS_FILE, "rb") as f:
if hashed == f.read():
new_token = secrets.token_hex(16)
_add_token(new_token, "claim,query")
return new_token
else:
abort(401)
else:
abort(400)
@app.route("/claim", methods=["POST"])
def claim():
permissions = _check_token()
if "claim" not in permissions:
abort(403)
if "name" not in request.form or "contact" not in request.form:
abort(400)
name = request.form["name"]
contact = request.form["contact"]
claim_id = _add_claim(name, contact)
return claim_id
@app.route("/status/<cid>")
def status(cid):
return _get_claim_status(cid)
@app.route("/keyholder")
def keyholder():
permissions = _check_token()
if "query" not in permissions:
abort(403)
return jsonify(_get_keyholder())
def _init_db():
c = sqlite3.connect(DB_PATH)
c.execute("""
CREATE TABLE IF NOT EXISTS History (
Id INTEGER PRIMARY KEY,
CId TEXT NOT NULL,
Name TEXT NOT NULL,
Contact TEXT NOT NULL,
Timestamp INTEGER NOT NULL
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS Token (
Id INTEGER PRIMARY KEY,
Token TEXT NOT NULL UNIQUE,
Permissions TEXT NOT NULL,
Timestamp INTEGER NOT NULL
)
""")
with open(CLAIM_TOKEN_FILE) as f:
c.execute("""
INSERT OR IGNORE INTO Token (Token, Permissions, Timestamp)
VALUES (?,?,?)
""", (f.read(), "claim", datetime.now().timestamp()))
c.commit()
def _add_token(token, permissions):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("""
INSERT INTO Token (Token, Permissions, Timestamp)
VALUES (?,?,?)
""", (token, permissions, datetime.now().timestamp()))
c.commit()
def _check_token():
if "X-Auth-Token" in request.headers:
token = request.headers["X-Auth-Token"]
else:
abort(401)
print(repr(token))
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("SELECT Permissions FROM Token WHERE Token=?", (token,))
row = conn.fetchone()
if row is None:
return set()
else:
return set(row[0].split(","))
def _add_claim(name, contact):
claim_id = secrets.token_hex(8)
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("""
INSERT INTO History (CId, Name, Contact, Timestamp)
VALUES (?,?,?,?)
""", (claim_id, name, contact, datetime.now().timestamp()))
c.commit()
return claim_id
def _get_claim_status(claim_id):
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("SELECT CId FROM History ORDER BY Timestamp DESC")
row = conn.fetchone()
if row is None:
return "unknown"
else:
if row[0] == claim_id:
return "latest"
else:
return "outdated"
def _get_keyholder():
c = sqlite3.connect(DB_PATH)
conn = c.cursor()
conn.execute("""
SELECT Name, Contact, Timestamp
FROM History ORDER BY Timestamp DESC LIMIT 3
""")
# Timestamp precision is one minute
keyholder = [{"name": row[0], "contact": row[1], "timestamp": int(row[2] // 60 * 60)}
for row in conn]
return keyholder
if __name__ == "__main__":
_init_db()
app.run()