- Werkzeug ProxyFix für X-Forwarded-* Header - nginx-proxy.example.conf mit korrekten proxy_set_header - README: Hinweise zu 502-Beseitigung hinter nginx Made-with: Cursor
289 lines
11 KiB
Python
289 lines
11 KiB
Python
import math
|
|
import os
|
|
import time
|
|
import logging
|
|
import sqlite3
|
|
import uuid
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from flask import Flask, render_template, request, session, send_from_directory, g, redirect, url_for
|
|
from flask_bootstrap import Bootstrap
|
|
|
|
app = Flask(__name__)
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
|
|
app.config['SECRET_KEY'] = 'j69ol5mcHLsEtLg4Y/+myd9wWD4pp56E'
|
|
|
|
# setup logging
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s %(levelname)s %(process)d ---- %(threadName)s '
|
|
'%(module)s : %(funcName)s {%(pathname)s:%(lineno)d} %(message)s','%Y-%m-%dT%H:%M:%SZ')
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(formatter)
|
|
app.logger.setLevel(logging.DEBUG)
|
|
app.logger.addHandler(handler)
|
|
|
|
Bootstrap(app)
|
|
|
|
version = "1.1.0/2026-02-24"
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect('kasse.db')
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def init_db():
|
|
conn = get_db_connection()
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS instances (
|
|
id TEXT PRIMARY KEY,
|
|
password TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
try:
|
|
conn.execute('ALTER TABLE instances ADD COLUMN password TEXT')
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS products (
|
|
instance_id TEXT,
|
|
position INTEGER,
|
|
name TEXT,
|
|
price REAL,
|
|
icon TEXT,
|
|
color_class TEXT,
|
|
FOREIGN KEY (instance_id) REFERENCES instances (id),
|
|
PRIMARY KEY (instance_id, position)
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
init_db()
|
|
|
|
app.logger.info('Starting erdbeerhannah ' + version)
|
|
|
|
@app.before_request
|
|
def before_request():
|
|
g.request_start_time = time.time()
|
|
g.request_time = lambda: "%.4fs" % (time.time() - g.request_start_time)
|
|
|
|
@app.after_request
|
|
def add_header(r):
|
|
r.headers["Cache-Control"] = "no-store, max-age=0"
|
|
return r
|
|
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def landing():
|
|
if request.method == "POST":
|
|
password = request.form.get('password', '')
|
|
instance_id = str(uuid.uuid4())
|
|
conn = get_db_connection()
|
|
conn.execute('INSERT INTO instances (id, password) VALUES (?, ?)', (instance_id, generate_password_hash(password)))
|
|
default_products = [
|
|
(instance_id, 1, "500g Erdbeeren", 4.9, "🍓", "btn-primary"),
|
|
(instance_id, 2, "Marmelade groß", 4.8, "🫙🫙", "btn-danger"),
|
|
(instance_id, 3, "Marmelade klein", 3.3, "🫙", "btn-danger"),
|
|
(instance_id, 4, "500g Kirschen", 5.0, "🍒", "btn-warning"),
|
|
(instance_id, 5, "500g Himbeeren", 4.5, "🫐", "btn-warning"),
|
|
(instance_id, 6, "Tragetasche", 0.2, "🛍️", "btn-success")
|
|
]
|
|
conn.executemany('INSERT INTO products (instance_id, position, name, price, icon, color_class) VALUES (?, ?, ?, ?, ?, ?)', default_products)
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect(url_for('admin', instance_id=instance_id))
|
|
return render_template("landing.html", version=version)
|
|
|
|
@app.route("/<instance_id>", methods=["GET", "POST"])
|
|
def index(instance_id):
|
|
conn = get_db_connection()
|
|
instance = conn.execute('SELECT * FROM instances WHERE id = ?', (instance_id,)).fetchone()
|
|
if not instance:
|
|
conn.close()
|
|
return "Instance not found", 404
|
|
|
|
products_rows = conn.execute('SELECT * FROM products WHERE instance_id = ? ORDER BY position', (instance_id,)).fetchall()
|
|
conn.close()
|
|
|
|
products = {row['position']: dict(row) for row in products_rows}
|
|
|
|
session_prefix = f"kasse_{instance_id}_"
|
|
|
|
gesamtwert = session.get(f'{session_prefix}gesamtwert', 0.0)
|
|
change = session.get(f'{session_prefix}change', "0.00")
|
|
givenfloat = session.get(f'{session_prefix}given', 0.0)
|
|
items = {p: session.get(f'{session_prefix}item{p}', 0) for p in products}
|
|
|
|
background = "bg-white"
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get('action')
|
|
position = request.form.get('position', type=int)
|
|
|
|
if action == "reset":
|
|
gesamtwert = 0.0
|
|
change = "0.00"
|
|
givenfloat = 0.0
|
|
items = {p: 0 for p in products}
|
|
background = "bg-white"
|
|
elif action == "calculate_change":
|
|
given = request.form.get('given', "0", type=float)
|
|
givenfloat = given
|
|
change_val = givenfloat - gesamtwert
|
|
change = f"{change_val:.2f}"
|
|
background = "bg-danger" if change_val < 0 else "bg-white"
|
|
elif position and position in products:
|
|
price = products[position]['price']
|
|
gesamtwert += price
|
|
items[position] += 1
|
|
|
|
session[f'{session_prefix}gesamtwert'] = round(gesamtwert, 2)
|
|
session[f'{session_prefix}change'] = change
|
|
session[f'{session_prefix}given'] = round(givenfloat, 2)
|
|
for p in products:
|
|
session[f'{session_prefix}item{p}'] = items[p]
|
|
|
|
# Update our local variables after processing so they reflect what is rendered
|
|
gesamtwert = round(gesamtwert, 2)
|
|
|
|
n = len(products)
|
|
grid_cols = math.ceil(math.sqrt(n)) if n else 1
|
|
grid_rows = math.ceil(n / grid_cols) if n else 1
|
|
|
|
return render_template("index.html",
|
|
instance_id=instance_id,
|
|
products=products,
|
|
grid_cols=grid_cols,
|
|
grid_rows=grid_rows,
|
|
gesamtwert=f"{gesamtwert:.2f}",
|
|
change=change,
|
|
given=f"{givenfloat:.2f}" if givenfloat > 0 else "0",
|
|
items=items,
|
|
background=background,
|
|
version=version
|
|
)
|
|
|
|
@app.route("/<instance_id>/admin", methods=["GET", "POST"])
|
|
def admin(instance_id):
|
|
conn = get_db_connection()
|
|
instance = conn.execute('SELECT * FROM instances WHERE id = ?', (instance_id,)).fetchone()
|
|
if not instance:
|
|
conn.close()
|
|
return "Instance not found", 404
|
|
|
|
auth_key = f'admin_auth_{instance_id}'
|
|
|
|
# Check if instance has no password (empty or None)
|
|
stored_password = instance['password']
|
|
has_no_password = (
|
|
stored_password is None or
|
|
check_password_hash(stored_password, '')
|
|
)
|
|
|
|
# Handle Login Submission
|
|
if request.method == "POST" and 'admin_password' in request.form:
|
|
entered = request.form['admin_password']
|
|
if has_no_password and entered == '':
|
|
session[auth_key] = True
|
|
conn.close()
|
|
return redirect(url_for('admin', instance_id=instance_id))
|
|
elif not has_no_password and check_password_hash(stored_password, entered):
|
|
session[auth_key] = True
|
|
conn.close()
|
|
return redirect(url_for('admin', instance_id=instance_id))
|
|
else:
|
|
conn.close()
|
|
return render_template("login.html", instance_id=instance_id, error="Falsches Passwort", version=version)
|
|
|
|
# Require Authentication
|
|
if not session.get(auth_key):
|
|
conn.close()
|
|
return render_template("login.html", instance_id=instance_id, error=None, version=version)
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get('action')
|
|
|
|
if action == "delete":
|
|
conn.execute('DELETE FROM products WHERE instance_id = ?', (instance_id,))
|
|
conn.execute('DELETE FROM instances WHERE id = ?', (instance_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
session.pop(auth_key, None)
|
|
return redirect(url_for('landing'))
|
|
|
|
for key in request.form:
|
|
if key.startswith('delete_') and request.form[key]:
|
|
pos = int(key.split('_')[1])
|
|
conn.execute('DELETE FROM products WHERE instance_id = ? AND position = ?',
|
|
(instance_id, pos))
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect(url_for('admin', instance_id=instance_id))
|
|
|
|
if action == "add_product":
|
|
name = request.form.get('add_name', '').strip()
|
|
price_str = request.form.get('add_price', '').replace(',', '.').strip()
|
|
try:
|
|
price = float(price_str) if price_str else None
|
|
except ValueError:
|
|
price = None
|
|
icon = request.form.get('add_icon', '🛒').strip() or '🛒'
|
|
color = request.form.get('add_color', 'btn-primary')
|
|
if name and price is not None:
|
|
max_pos = conn.execute(
|
|
'SELECT COALESCE(MAX(position), 0) FROM products WHERE instance_id = ?',
|
|
(instance_id,)
|
|
).fetchone()[0]
|
|
conn.execute(
|
|
'INSERT INTO products (instance_id, position, name, price, icon, color_class) VALUES (?, ?, ?, ?, ?, ?)',
|
|
(instance_id, max_pos + 1, name, price, icon, color)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect(url_for('admin', instance_id=instance_id))
|
|
|
|
# Update existing products
|
|
products_rows = conn.execute(
|
|
'SELECT position FROM products WHERE instance_id = ?', (instance_id,)
|
|
).fetchall()
|
|
for row in products_rows:
|
|
pos = row['position']
|
|
name = request.form.get(f'name_{pos}')
|
|
price_str = request.form.get(f'price_{pos}', '').replace(',', '.').strip()
|
|
try:
|
|
price = float(price_str) if price_str else None
|
|
except ValueError:
|
|
price = None
|
|
icon = request.form.get(f'icon_{pos}')
|
|
color = request.form.get(f'color_{pos}')
|
|
if name is not None and price is not None:
|
|
conn.execute('''
|
|
UPDATE products SET name = ?, price = ?, icon = ?, color_class = ?
|
|
WHERE instance_id = ? AND position = ?
|
|
''', (name, price, icon, color, instance_id, pos))
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect(url_for('index', instance_id=instance_id))
|
|
|
|
products_rows = conn.execute('SELECT * FROM products WHERE instance_id = ? ORDER BY position', (instance_id,)).fetchall()
|
|
conn.close()
|
|
|
|
products = {row['position']: dict(row) for row in products_rows}
|
|
return render_template("admin.html", instance_id=instance_id, products=products, version=version)
|
|
|
|
@app.route('/favicon.ico')
|
|
def favicon():
|
|
return send_from_directory(os.path.join(app.root_path, 'static'),
|
|
'favicon.ico', mimetype='image/vnd.microsoft.icon')
|
|
|
|
@app.route('/manifest.json')
|
|
def manifest():
|
|
return send_from_directory('static', 'manifest.json')
|
|
|
|
@app.route('/sw.js')
|
|
def service_worker():
|
|
return send_from_directory('static', 'sw.js', mimetype='application/javascript')
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, host='127.0.0.1') |