from flask import Flask, render_template, request, jsonify, url_for, redirect, session import pandas as pd import os import logging import numpy as np from datetime import datetime, timedelta from dotenv import load_dotenv import requests from collections import defaultdict import ipaddress app = Flask(__name__, static_folder='static') app.secret_key = 'your_secret_key' # Setzen Sie einen sicheren geheimen Schlüssel für die Session logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Version der Anwendung VERSION = "1.0.6" # Pfad zur CSV-Datei CSV_FILE = "data/customers.csv" # Lade Umgebungsvariablen load_dotenv() # Statisches Passwort aus der .env Datei STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'changeme') def clean_dataframe(df): """Konvertiert NaN-Werte in None für JSON-Kompatibilität""" return df.replace({np.nan: None}) # CSV-Datei laden def load_data(): try: logger.info("Versuche CSV-Datei zu laden...") if not os.path.exists(CSV_FILE): logger.error(f"CSV-Datei '{CSV_FILE}' nicht gefunden!") return None # Lade CSV mit Komma als Trennzeichen df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"') # Entferne Anführungszeichen aus den Spaltennamen df.columns = df.columns.str.strip('"') # Entferne Anführungszeichen aus den Werten for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].str.strip('"') df = clean_dataframe(df) logger.info(f"CSV-Datei erfolgreich geladen. {len(df)} Einträge gefunden.") return df except Exception as e: logger.error(f"Fehler beim Laden der CSV-Datei: {str(e)}") return None @app.route('/login', methods=['GET', 'POST']) def login(): # Versuche, die tatsächliche Client-IP aus dem X-Forwarded-For-Header zu erhalten client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '').split(',') logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") # Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt client_ip_obj = ipaddress.ip_address(client_ip) for ip_range in allowed_ip_ranges: try: network = ipaddress.ip_network(ip_range.strip(), strict=False) logger.info(f"Überprüfe Netzwerk: {network}") if client_ip_obj in network: logger.info("Client-IP ist im erlaubten Bereich.") session['logged_in'] = True return redirect(url_for('index')) except ValueError: logger.error(f"Ungültiges Netzwerkformat: {ip_range}") if request.method == 'POST': password = request.form.get('password') if password == STATIC_PASSWORD: session['logged_in'] = True return redirect(url_for('index')) else: return render_template('login.html', error="Falsches Passwort") return render_template('login.html') @app.route('/') def index(): if not session.get('logged_in'): return redirect(url_for('login')) return render_template('index.html') @app.route('/search') def search(): if not session.get('logged_in'): return redirect(url_for('login')) try: # CSV-Datei laden df = load_data() if df is None: return jsonify({"error": "Datenbank konnte nicht geladen werden"}), 500 # Suchparameter aus der URL holen name = request.args.get('name', '').strip() ort = request.args.get('ort', '').strip() kundennummer = request.args.get('kundennummer', '').strip() fachrichtung = request.args.get('fachrichtung', '').strip() telefon = request.args.get('telefon', '').strip() query = request.args.get('q', '').strip() # Initialisiere die Maske für die Filterung mask = pd.Series(True, index=df.index) # Wenn eine allgemeine Suche angegeben ist if query: query_mask = ( df['Vorname'].str.contains(query, case=False, na=False) | df['Nachname'].str.contains(query, case=False, na=False) | df['Ort'].str.contains(query, case=False, na=False) | df['Nummer'].astype(str).str.contains(query, case=False, na=False) | df['Fachrichtung'].str.contains(query, case=False, na=False) | df['Tel'].astype(str).str.contains(query, case=False, na=False) ) mask &= query_mask # Spezifische Suchkriterien anwenden if name: name_mask = ( df['Vorname'].str.contains(name, case=False, na=False) | df['Nachname'].str.contains(name, case=False, na=False) ) mask &= name_mask if ort: ort_mask = df['Ort'].str.contains(ort, case=False, na=False) mask &= ort_mask if kundennummer: kundennummer_mask = df['Nummer'].astype(str).str.contains(kundennummer, case=False, na=False) mask &= kundennummer_mask if fachrichtung: fachrichtung_mask = df['Fachrichtung'].str.contains(fachrichtung, case=False, na=False) mask &= fachrichtung_mask if telefon: telefon_mask = df['Tel'].astype(str).str.contains(telefon, case=False, na=False) mask &= telefon_mask results = df[mask].to_dict('records') logger.info(f"{len(results)} Ergebnisse gefunden") return jsonify({ 'results': results, 'total': len(results) }) except Exception as e: logger.error(f"Fehler bei der Suche: {str(e)}") return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(debug=True, port=5001)