- Neue Routen: /login, /stats, /logout - Session-basierte Authentifizierung - Umfassende Statistiken: Seitenaufrufe, Suchvorgänge, Quellen - Environment-Variablen: ADMIN_PASSWORD, FLASK_SECRET_KEY - Docker-Integration mit docker-compose.yml - Responsive UI mit Charts und Aktivitätsprotokoll
324 lines
12 KiB
Python
324 lines
12 KiB
Python
from pathlib import Path
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Tuple, Dict, List
|
|
from flask import Flask, render_template, request, send_from_directory, session, redirect, url_for, flash
|
|
from functools import wraps
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'dev-secret-key-change-in-production')
|
|
|
|
# Admin-Passwort aus Environment-Variable
|
|
ADMIN_PASSWORD = os.environ.get('ADMIN_PASSWORD', 'admin123')
|
|
|
|
# Logging konfigurieren
|
|
|
|
# Logs-Verzeichnis erstellen, falls es nicht existiert
|
|
logs_dir = Path(__file__).parent / "logs"
|
|
logs_dir.mkdir(exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(logs_dir / 'app.log', encoding='utf-8'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def log_page_view(page: str, user_agent: str = None):
|
|
"""Protokolliert Seitenaufrufe ohne IP-Adressen"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
user_agent_clean = user_agent[:100] if user_agent else 'Unknown'
|
|
logger.info(f"PAGE_VIEW: {page} | User-Agent: {user_agent_clean}")
|
|
|
|
def log_search_query(search_params: dict, user_agent: str = None):
|
|
"""Protokolliert Suchanfragen ohne IP-Adressen"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
user_agent_clean = user_agent[:100] if user_agent else 'Unknown'
|
|
|
|
# Suchparameter für Logging vorbereiten
|
|
pos_str = ''.join(search_params.get('pos', [''] * 5))
|
|
includes = search_params.get('includes', '')
|
|
excludes = search_params.get('excludes', '')
|
|
sources = []
|
|
if search_params.get('use_ot'):
|
|
sources.append('OT')
|
|
if search_params.get('use_wf'):
|
|
sources.append('WF')
|
|
|
|
logger.info(f"SEARCH: pos='{pos_str}' includes='{includes}' excludes='{excludes}' sources={sources} | User-Agent: {user_agent_clean}")
|
|
|
|
def login_required(f):
|
|
"""Decorator für passwortgeschützte Routen"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def get_statistics():
|
|
"""Liest und analysiert die Log-Dateien für Statistiken"""
|
|
stats = {
|
|
'total_page_views': 0,
|
|
'total_searches': 0,
|
|
'page_views_by_page': {},
|
|
'searches_by_source': {'OT': 0, 'WF': 0, 'Both': 0},
|
|
'recent_activity': [],
|
|
'top_search_patterns': {}
|
|
}
|
|
|
|
try:
|
|
# Aktuelle Log-Datei lesen
|
|
log_file = logs_dir / 'app.log'
|
|
if log_file.exists():
|
|
with open(log_file, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
if 'PAGE_VIEW:' in line:
|
|
stats['total_page_views'] += 1
|
|
# Seite extrahieren
|
|
if 'PAGE_VIEW: ' in line:
|
|
page = line.split('PAGE_VIEW: ')[1].split(' |')[0]
|
|
stats['page_views_by_page'][page] = stats['page_views_by_page'].get(page, 0) + 1
|
|
|
|
elif 'SEARCH:' in line:
|
|
stats['total_searches'] += 1
|
|
# Quellen extrahieren
|
|
if 'sources=[' in line:
|
|
sources_part = line.split('sources=')[1].split(']')[0]
|
|
if 'OT' in sources_part and 'WF' in sources_part:
|
|
stats['searches_by_source']['Both'] += 1
|
|
elif 'OT' in sources_part:
|
|
stats['searches_by_source']['OT'] += 1
|
|
elif 'WF' in sources_part:
|
|
stats['searches_by_source']['WF'] += 1
|
|
|
|
# Suchmuster extrahieren
|
|
if 'pos=' in line:
|
|
pos_part = line.split('pos=\'')[1].split('\'')[0]
|
|
if pos_part:
|
|
stats['top_search_patterns'][pos_part] = stats['top_search_patterns'].get(pos_part, 0) + 1
|
|
|
|
# Letzte 10 Aktivitäten
|
|
if len(stats['recent_activity']) < 10:
|
|
timestamp = line.split(' - ')[0] if ' - ' in line else ''
|
|
if timestamp:
|
|
stats['recent_activity'].append({
|
|
'timestamp': timestamp,
|
|
'line': line.strip()
|
|
})
|
|
|
|
# Backup-Dateien auch durchsuchen
|
|
for backup_file in logs_dir.glob("app_*.log.gz"):
|
|
try:
|
|
import gzip
|
|
with gzip.open(backup_file, 'rt', encoding='utf-8') as f:
|
|
for line in f:
|
|
if 'PAGE_VIEW:' in line:
|
|
stats['total_page_views'] += 1
|
|
elif 'SEARCH:' in line:
|
|
stats['total_searches'] += 1
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Lesen der Backup-Datei {backup_file}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Lesen der Statistiken: {e}")
|
|
|
|
return stats
|
|
|
|
def cleanup_old_logs():
|
|
"""Bereinigt Log-Dateien älter als 7 Tage"""
|
|
try:
|
|
log_file = logs_dir / 'app.log'
|
|
if log_file.exists():
|
|
# Prüfe Datei-Alter
|
|
file_age = datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)
|
|
if file_age > timedelta(days=7):
|
|
# Log-Datei komprimieren und umbenennen
|
|
backup_name = f"app_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log.gz"
|
|
backup_path = logs_dir / backup_name
|
|
|
|
# Komprimiere mit gzip (falls verfügbar)
|
|
import gzip
|
|
with open(log_file, 'rb') as f_in:
|
|
with gzip.open(backup_path, 'wb') as f_out:
|
|
f_out.writelines(f_in)
|
|
|
|
# Lösche alte Log-Datei
|
|
log_file.unlink()
|
|
logger.info(f"Log-Datei komprimiert und gesichert: {backup_name}")
|
|
|
|
# Lösche alte Backup-Dateien (älter als 30 Tage)
|
|
for backup_file in logs_dir.glob("app_*.log.gz"):
|
|
backup_age = datetime.now() - datetime.fromtimestamp(backup_file.stat().st_mtime)
|
|
if backup_age > timedelta(days=30):
|
|
backup_file.unlink()
|
|
logger.info(f"Alte Backup-Datei gelöscht: {backup_file.name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Log-Bereinigung: {e}")
|
|
|
|
|
|
def load_words() -> Tuple[List[str], Dict[str, List[str]]]:
|
|
data_dir = Path(__file__).parent / "data"
|
|
txt_path = data_dir / "words_de_5.txt"
|
|
json_path = data_dir / "words_de_5_sources.json"
|
|
|
|
words: List[str] = []
|
|
sources_map: Dict[str, List[str]] = {}
|
|
|
|
if txt_path.exists():
|
|
with txt_path.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
word = line.strip().lower()
|
|
if len(word) == 5 and word.isalpha():
|
|
words.append(word)
|
|
|
|
if json_path.exists():
|
|
try:
|
|
sources_map = json.loads(json_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
sources_map = {}
|
|
|
|
return words, sources_map
|
|
|
|
|
|
def filter_words(words: List[str], position_letters: List[str], includes_text: str, excludes_text: str) -> List[str]:
|
|
results: List[str] = []
|
|
includes_letters = [ch for ch in includes_text.lower() if ch.isalpha()]
|
|
excludes_letters = [ch for ch in excludes_text.lower() if ch.isalpha()]
|
|
for word in words:
|
|
# feste Positionen
|
|
if any(ch and word[idx] != ch for idx, ch in enumerate(position_letters)):
|
|
continue
|
|
# muss-enthalten
|
|
if not all(ch in word for ch in includes_letters):
|
|
continue
|
|
# darf-nicht-enthalten
|
|
if any(ch in word for ch in excludes_letters):
|
|
continue
|
|
results.append(word)
|
|
return results
|
|
|
|
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index():
|
|
# Log-Bereinigung bei jedem Seitenaufruf prüfen (nur alle 24h)
|
|
cleanup_old_logs()
|
|
|
|
# Seitenaufruf protokollieren
|
|
log_page_view("index", request.headers.get('User-Agent'))
|
|
|
|
all_words, sources_map = load_words()
|
|
results_display: List[str] | None = None
|
|
pos: List[str] = ["", "", "", "", ""]
|
|
includes: str = ""
|
|
excludes: str = ""
|
|
use_ot: bool = True
|
|
use_wf: bool = False
|
|
if request.method == "POST":
|
|
pos = [
|
|
(request.form.get("pos1") or "").strip().lower(),
|
|
(request.form.get("pos2") or "").strip().lower(),
|
|
(request.form.get("pos3") or "").strip().lower(),
|
|
(request.form.get("pos4") or "").strip().lower(),
|
|
(request.form.get("pos5") or "").strip().lower(),
|
|
]
|
|
includes = (request.form.get("includes") or "").strip()
|
|
excludes = (request.form.get("excludes") or "").strip()
|
|
use_ot = request.form.get("use_ot") is not None
|
|
use_wf = request.form.get("use_wf") is not None
|
|
|
|
# Suchanfrage protokollieren
|
|
search_params = {
|
|
'pos': pos,
|
|
'includes': includes,
|
|
'excludes': excludes,
|
|
'use_ot': use_ot,
|
|
'use_wf': use_wf
|
|
}
|
|
log_search_query(search_params, request.headers.get('User-Agent'))
|
|
|
|
# 1) Buchstaben-/Positionssuche über alle Wörter
|
|
matched = filter_words(all_words, pos, includes, excludes)
|
|
# 2) Quellen-Filter nur auf Ergebnisansicht anwenden
|
|
allowed = set()
|
|
if use_ot:
|
|
allowed.add("ot")
|
|
if use_wf:
|
|
allowed.add("wf")
|
|
if allowed:
|
|
results_display = [w for w in matched if any(src in allowed for src in sources_map.get(w, []))]
|
|
else:
|
|
# Keine Quelle gewählt → leere Anzeige (Suche wurde dennoch ausgeführt)
|
|
results_display = []
|
|
return render_template(
|
|
"index.html",
|
|
results=results_display,
|
|
pos=pos,
|
|
includes=includes,
|
|
excludes=excludes,
|
|
words_count=len(all_words),
|
|
sources_map=sources_map,
|
|
use_ot=use_ot,
|
|
use_wf=use_wf,
|
|
error_message=None,
|
|
)
|
|
|
|
|
|
@app.route('/manifest.webmanifest')
|
|
def manifest_file():
|
|
log_page_view("manifest", request.headers.get('User-Agent'))
|
|
return send_from_directory(Path(__file__).parent / 'static', 'manifest.webmanifest', mimetype='application/manifest+json')
|
|
|
|
|
|
@app.route('/sw.js')
|
|
def service_worker():
|
|
# Service Worker muss auf Top-Level liegen
|
|
log_page_view("service_worker", request.headers.get('User-Agent'))
|
|
return send_from_directory(Path(__file__).parent / 'static', 'sw.js', mimetype='application/javascript')
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
"""Login-Seite für das Admin-Dashboard"""
|
|
if request.method == 'POST':
|
|
password = request.form.get('password')
|
|
logger.info(f"Login-Versuch: Passwort-Länge: {len(password) if password else 0}, ADMIN_PASSWORD gesetzt: {bool(ADMIN_PASSWORD)}")
|
|
if password == ADMIN_PASSWORD:
|
|
session['logged_in'] = True
|
|
flash('Erfolgreich angemeldet!', 'success')
|
|
logger.info("Login erfolgreich")
|
|
return redirect(url_for('stats'))
|
|
else:
|
|
flash('Falsches Passwort!', 'error')
|
|
logger.warning(f"Login fehlgeschlagen - eingegebenes Passwort: '{password}', erwartetes: '{ADMIN_PASSWORD}'")
|
|
|
|
return render_template('login.html')
|
|
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
"""Logout-Funktion"""
|
|
session.pop('logged_in', None)
|
|
flash('Erfolgreich abgemeldet!', 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/stats')
|
|
@login_required
|
|
def stats():
|
|
"""Statistik-Dashboard (passwortgeschützt)"""
|
|
log_page_view("stats", request.headers.get('User-Agent'))
|
|
statistics = get_statistics()
|
|
return render_template('stats.html', stats=statistics)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger.info(f"App gestartet - ADMIN_PASSWORD gesetzt: {bool(ADMIN_PASSWORD)}, Länge: {len(ADMIN_PASSWORD) if ADMIN_PASSWORD else 0}")
|
|
app.run(debug=True)
|