from pathlib import Path import json import logging import os from datetime import datetime, timedelta from typing import Tuple, Dict, List from flask import Flask, render_template, request, send_from_directory, session, redirect, url_for, flash from functools import wraps app = Flask(__name__) app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'dev-secret-key-change-in-production') # Admin-Passwort aus Environment-Variable ADMIN_PASSWORD = os.environ.get('ADMIN_PASSWORD', 'admin123') # Logging konfigurieren # Logs-Verzeichnis erstellen, falls es nicht existiert logs_dir = Path(__file__).parent / "logs" logs_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(logs_dir / 'app.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def log_page_view(page: str, user_agent: str = None): """Protokolliert Seitenaufrufe ohne IP-Adressen""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') user_agent_clean = user_agent[:100] if user_agent else 'Unknown' logger.info(f"PAGE_VIEW: {page} | User-Agent: {user_agent_clean}") def log_search_query(search_params: dict, user_agent: str = None): """Protokolliert Suchanfragen ohne IP-Adressen""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') user_agent_clean = user_agent[:100] if user_agent else 'Unknown' # Suchparameter für Logging vorbereiten pos_str = ''.join(search_params.get('pos', [''] * 5)) includes = search_params.get('includes', '') excludes = search_params.get('excludes', '') sources = [] if search_params.get('use_ot'): sources.append('OT') if search_params.get('use_wf'): sources.append('WF') logger.info(f"SEARCH: pos='{pos_str}' includes='{includes}' excludes='{excludes}' sources={sources} | User-Agent: {user_agent_clean}") def login_required(f): """Decorator für passwortgeschützte Routen""" @wraps(f) def decorated_function(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function def get_statistics(): """Liest und analysiert die Log-Dateien für Statistiken""" stats = { 'total_page_views': 0, 'total_searches': 0, 'page_views_by_page': {}, 'searches_by_source': {'OT': 0, 'WF': 0, 'Both': 0}, 'recent_activity': [], 'top_search_patterns': {} } try: # Aktuelle Log-Datei lesen log_file = logs_dir / 'app.log' if log_file.exists(): with open(log_file, 'r', encoding='utf-8') as f: for line in f: if 'PAGE_VIEW:' in line: stats['total_page_views'] += 1 # Seite extrahieren if 'PAGE_VIEW: ' in line: page = line.split('PAGE_VIEW: ')[1].split(' |')[0] stats['page_views_by_page'][page] = stats['page_views_by_page'].get(page, 0) + 1 elif 'SEARCH:' in line: stats['total_searches'] += 1 # Quellen extrahieren if 'sources=[' in line: sources_part = line.split('sources=')[1].split(']')[0] if 'OT' in sources_part and 'WF' in sources_part: stats['searches_by_source']['Both'] += 1 elif 'OT' in sources_part: stats['searches_by_source']['OT'] += 1 elif 'WF' in sources_part: stats['searches_by_source']['WF'] += 1 # Suchmuster extrahieren if 'pos=' in line: pos_part = line.split('pos=\'')[1].split('\'')[0] if pos_part: stats['top_search_patterns'][pos_part] = stats['top_search_patterns'].get(pos_part, 0) + 1 # Letzte 10 Aktivitäten if len(stats['recent_activity']) < 10: timestamp = line.split(' - ')[0] if ' - ' in line else '' if timestamp: stats['recent_activity'].append({ 'timestamp': timestamp, 'line': line.strip() }) # Backup-Dateien auch durchsuchen for backup_file in logs_dir.glob("app_*.log.gz"): try: import gzip with gzip.open(backup_file, 'rt', encoding='utf-8') as f: for line in f: if 'PAGE_VIEW:' in line: stats['total_page_views'] += 1 elif 'SEARCH:' in line: stats['total_searches'] += 1 except Exception as e: logger.error(f"Fehler beim Lesen der Backup-Datei {backup_file}: {e}") except Exception as e: logger.error(f"Fehler beim Lesen der Statistiken: {e}") return stats def cleanup_old_logs(): """Bereinigt Log-Dateien älter als 7 Tage""" try: log_file = logs_dir / 'app.log' if log_file.exists(): # Prüfe Datei-Alter file_age = datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime) if file_age > timedelta(days=7): # Log-Datei komprimieren und umbenennen backup_name = f"app_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log.gz" backup_path = logs_dir / backup_name # Komprimiere mit gzip (falls verfügbar) import gzip with open(log_file, 'rb') as f_in: with gzip.open(backup_path, 'wb') as f_out: f_out.writelines(f_in) # Lösche alte Log-Datei log_file.unlink() logger.info(f"Log-Datei komprimiert und gesichert: {backup_name}") # Lösche alte Backup-Dateien (älter als 30 Tage) for backup_file in logs_dir.glob("app_*.log.gz"): backup_age = datetime.now() - datetime.fromtimestamp(backup_file.stat().st_mtime) if backup_age > timedelta(days=30): backup_file.unlink() logger.info(f"Alte Backup-Datei gelöscht: {backup_file.name}") except Exception as e: logger.error(f"Fehler bei der Log-Bereinigung: {e}") def load_words() -> Tuple[List[str], Dict[str, List[str]]]: data_dir = Path(__file__).parent / "data" txt_path = data_dir / "words_de_5.txt" json_path = data_dir / "words_de_5_sources.json" words: List[str] = [] sources_map: Dict[str, List[str]] = {} if txt_path.exists(): with txt_path.open("r", encoding="utf-8") as f: for line in f: word = line.strip().lower() if len(word) == 5 and word.isalpha(): words.append(word) if json_path.exists(): try: sources_map = json.loads(json_path.read_text(encoding="utf-8")) except Exception: sources_map = {} return words, sources_map def filter_words(words: List[str], position_letters: List[str], includes_text: str, excludes_text: str) -> List[str]: results: List[str] = [] includes_letters = [ch for ch in includes_text.lower() if ch.isalpha()] excludes_letters = [ch for ch in excludes_text.lower() if ch.isalpha()] for word in words: # feste Positionen if any(ch and word[idx] != ch for idx, ch in enumerate(position_letters)): continue # muss-enthalten if not all(ch in word for ch in includes_letters): continue # darf-nicht-enthalten if any(ch in word for ch in excludes_letters): continue results.append(word) return results @app.route("/", methods=["GET", "POST"]) def index(): # Log-Bereinigung bei jedem Seitenaufruf prüfen (nur alle 24h) cleanup_old_logs() # Seitenaufruf protokollieren log_page_view("index", request.headers.get('User-Agent')) all_words, sources_map = load_words() results_display: List[str] | None = None pos: List[str] = ["", "", "", "", ""] includes: str = "" excludes: str = "" use_ot: bool = True use_wf: bool = False if request.method == "POST": pos = [ (request.form.get("pos1") or "").strip().lower(), (request.form.get("pos2") or "").strip().lower(), (request.form.get("pos3") or "").strip().lower(), (request.form.get("pos4") or "").strip().lower(), (request.form.get("pos5") or "").strip().lower(), ] includes = (request.form.get("includes") or "").strip() excludes = (request.form.get("excludes") or "").strip() use_ot = request.form.get("use_ot") is not None use_wf = request.form.get("use_wf") is not None # Falls keine Quelle gewählt ist, standardmäßig OpenThesaurus aktivieren if not use_ot and not use_wf: use_ot = True # Suchanfrage protokollieren search_params = { 'pos': pos, 'includes': includes, 'excludes': excludes, 'use_ot': use_ot, 'use_wf': use_wf } log_search_query(search_params, request.headers.get('User-Agent')) # 1) Buchstaben-/Positionssuche über alle Wörter matched = filter_words(all_words, pos, includes, excludes) # 2) Quellen-Filter nur auf Ergebnisansicht anwenden allowed = set() if use_ot: allowed.add("ot") if use_wf: allowed.add("wf") if allowed: results_display = [w for w in matched if any(src in allowed for src in sources_map.get(w, []))] else: # Keine Quelle gewählt → leere Anzeige (Suche wurde dennoch ausgeführt) results_display = [] return render_template( "index.html", results=results_display, pos=pos, includes=includes, excludes=excludes, words_count=len(all_words), sources_map=sources_map, use_ot=use_ot, use_wf=use_wf, error_message=None, ) @app.route('/manifest.webmanifest') def manifest_file(): log_page_view("manifest", request.headers.get('User-Agent')) return send_from_directory(Path(__file__).parent / 'static', 'manifest.webmanifest', mimetype='application/manifest+json') @app.route('/sw.js') def service_worker(): # Service Worker muss auf Top-Level liegen log_page_view("service_worker", request.headers.get('User-Agent')) return send_from_directory(Path(__file__).parent / 'static', 'sw.js', mimetype='application/javascript') @app.route('/screenshot.png') def screenshot_image(): """Liefert das OpenGraph/Twitter Vorschaubild aus dem Projektstamm.""" log_page_view("screenshot", request.headers.get('User-Agent')) return send_from_directory(Path(__file__).parent, 'screenshot.png', mimetype='image/png') @app.route('/login', methods=['GET', 'POST']) def login(): """Login-Seite für das Admin-Dashboard""" if request.method == 'POST': password = request.form.get('password') logger.info(f"Login-Versuch: Passwort-Länge: {len(password) if password else 0}, ADMIN_PASSWORD gesetzt: {bool(ADMIN_PASSWORD)}") if password == ADMIN_PASSWORD: session['logged_in'] = True flash('Erfolgreich angemeldet!', 'success') logger.info("Login erfolgreich") return redirect(url_for('stats')) else: flash('Falsches Passwort!', 'error') logger.warning(f"Login fehlgeschlagen - eingegebenes Passwort: '{password}', erwartetes: '{ADMIN_PASSWORD}'") return render_template('login.html') @app.route('/logout') def logout(): """Logout-Funktion""" session.pop('logged_in', None) flash('Erfolgreich abgemeldet!', 'success') return redirect(url_for('index')) @app.route('/stats') @login_required def stats(): """Statistik-Dashboard (passwortgeschützt)""" log_page_view("stats", request.headers.get('User-Agent')) statistics = get_statistics() return render_template('stats.html', stats=statistics) if __name__ == "__main__": logger.info(f"App gestartet - ADMIN_PASSWORD gesetzt: {bool(ADMIN_PASSWORD)}, Länge: {len(ADMIN_PASSWORD) if ADMIN_PASSWORD else 0}") app.run(debug=True)