from flask import Flask, render_template, request, jsonify, url_for, redirect, session import pandas as pd import os import logging import numpy as np from datetime import datetime, timedelta from dotenv import load_dotenv import requests from collections import defaultdict import ipaddress import csv import sqlite3 from functools import wraps app = Flask(__name__, static_folder='static') app.secret_key = os.getenv('SECRET_KEY', 'default-secret-key') logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Version der Anwendung VERSION = "1.2.1" # Pfad zur CSV-Datei CSV_FILE = 'data/customers.csv' # Pfad zur Datenbank DB_FILE = 'data/customers.db' # Lade Umgebungsvariablen load_dotenv() # Statisches Passwort aus der .env Datei STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'default-password') ALLOWED_IP_RANGES = os.getenv('ALLOWED_IP_RANGES', '').split(',') def init_db(): """Initialisiert die SQLite-Datenbank mit der notwendigen Tabelle.""" conn = sqlite3.connect(DB_FILE) c = conn.cursor() # Erstelle die Tabelle mit Indizes c.execute(''' CREATE TABLE IF NOT EXISTS customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, nummer TEXT, name TEXT, strasse TEXT, plz TEXT, ort TEXT, telefon TEXT, mobil TEXT, email TEXT, bemerkung TEXT ) ''') # Erstelle Indizes für alle Suchfelder c.execute('CREATE INDEX IF NOT EXISTS idx_customers_nummer ON customers(nummer)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_name ON customers(name)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_strasse ON customers(strasse)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_plz ON customers(plz)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_ort ON customers(ort)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_telefon ON customers(telefon)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_mobil ON customers(mobil)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email)') # Erstelle einen zusammengesetzten Index für die häufigste Suchkombination c.execute('CREATE INDEX IF NOT EXISTS idx_customers_name_ort ON customers(name, ort)') conn.commit() conn.close() logger.info('Datenbank initialisiert') def import_csv(): """Importiert die Daten aus der CSV-Datei in die SQLite-Datenbank.""" conn = sqlite3.connect(DB_FILE) c = conn.cursor() # Lösche bestehende Daten c.execute('DELETE FROM customers') try: # Lese die CSV-Datei mit pandas df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"') # Entferne Anführungszeichen aus den Spaltennamen df.columns = df.columns.str.strip('"') # Entferne Anführungszeichen aus den Werten for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].str.strip('"') # Kombiniere Vorname und Nachname df['name'] = df['Vorname'] + ' ' + df['Nachname'] # Importiere die Daten for _, row in df.iterrows(): c.execute(''' INSERT INTO customers (nummer, name, strasse, plz, ort, telefon, mobil, email, bemerkung) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( row['Nummer'], row['name'], row['Strasse'], row['PLZ'], row['Ort'], row['Tel'], row['Handy'], row['mail'], f"Fachrichtung: {row['Fachrichtung']}" )) conn.commit() logger.info('CSV-Daten erfolgreich in die Datenbank importiert') except Exception as e: logger.error(f'Fehler beim Import der CSV-Daten: {str(e)}') raise finally: conn.close() def search_customers(search_params): """Sucht nach Kunden basierend auf den Suchparametern.""" conn = sqlite3.connect(DB_FILE) c = conn.cursor() try: # Baue die SQL-Abfrage dynamisch auf query = "SELECT * FROM customers WHERE 1=1" params = [] # Allgemeine Suche über alle Felder if search_params.get('q'): search_term = f"%{search_params['q']}%" query += " AND (name LIKE ? OR ort LIKE ? OR nummer LIKE ? OR telefon LIKE ? OR mobil LIKE ? OR email LIKE ? OR bemerkung LIKE ?)" params.extend([search_term] * 7) # Spezifische Suche für einzelne Felder if search_params.get('name'): query += " AND name LIKE ?" params.append(f"%{search_params['name']}%") if search_params.get('ort'): query += " AND ort LIKE ?" params.append(f"%{search_params['ort']}%") if search_params.get('nummer'): query += " AND nummer LIKE ?" params.append(f"%{search_params['nummer']}%") if search_params.get('plz'): query += " AND plz LIKE ?" params.append(f"%{search_params['plz']}%") # Führe die Abfrage aus c.execute(query, params) results = c.fetchall() # Formatiere die Ergebnisse customers = [] for row in results: customer = { 'id': row[0], 'nummer': row[1], 'name': row[2], 'strasse': row[3], 'plz': row[4], 'ort': row[5], 'telefon': row[6], 'mobil': row[7], 'email': row[8], 'bemerkung': row[9] } customers.append(customer) return customers except Exception as e: logger.error(f"Fehler bei der Kundensuche: {str(e)}") raise finally: conn.close() def clean_dataframe(df): """Konvertiert NaN-Werte in None für JSON-Kompatibilität""" return df.replace({np.nan: None}) # CSV-Datei laden def load_data(): try: logger.info("Versuche CSV-Datei zu laden...") if not os.path.exists(CSV_FILE): logger.error(f"CSV-Datei '{CSV_FILE}' nicht gefunden!") return None # Lade CSV mit Komma als Trennzeichen df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"') # Entferne Anführungszeichen aus den Spaltennamen df.columns = df.columns.str.strip('"') # Entferne Anführungszeichen aus den Werten for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].str.strip('"') df = clean_dataframe(df) logger.info(f"CSV-Datei erfolgreich geladen. {len(df)} Einträge gefunden.") return df except Exception as e: logger.error(f"Fehler beim Laden der CSV-Datei: {str(e)}") return None @app.route('/login', methods=['GET', 'POST']) def login(): # Versuche, die tatsächliche Client-IP aus dem X-Forwarded-For-Header zu erhalten client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '').split(',') logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") logger.info(f"Session Status: {session}") # Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt client_ip_obj = ipaddress.ip_address(client_ip) for ip_range in allowed_ip_ranges: try: network = ipaddress.ip_network(ip_range.strip(), strict=False) logger.info(f"Überprüfe Netzwerk: {network}") if client_ip_obj in network: logger.info("Client-IP ist im erlaubten Bereich.") session['logged_in'] = True session.permanent = True # Session bleibt bestehen return redirect(url_for('index')) except ValueError: logger.error(f"Ungültiges Netzwerkformat: {ip_range}") if request.method == 'POST': password = request.form.get('password') logger.info(f"Login-Versuch mit Passwort: {'*' * len(password) if password else 'None'}") if password == STATIC_PASSWORD: session['logged_in'] = True session.permanent = True # Session bleibt bestehen logger.info("Login erfolgreich, Session gesetzt") return redirect(url_for('index')) else: logger.warning("Falsches Passwort eingegeben") return render_template('login.html', error="Falsches Passwort") logger.info("Zeige Login-Seite") return render_template('login.html') @app.route('/') def index(): logger.info(f"Index-Route aufgerufen. Session Status: {session}") if not session.get('logged_in'): logger.info("Benutzer nicht eingeloggt, Weiterleitung zum Login") return redirect(url_for('login')) allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '') client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") return render_template('index.html', allowed_ip_ranges=allowed_ip_ranges) @app.route('/search') def search(): try: # Hole die Suchparameter aus der Anfrage search_params = { 'name': request.args.get('name', ''), 'ort': request.args.get('ort', ''), 'nummer': request.args.get('nummer', ''), 'plz': request.args.get('plz', ''), 'telefon': request.args.get('telefon', ''), 'email': request.args.get('email', ''), 'q': request.args.get('q', '') } # Führe die Suche in der Datenbank durch results = search_customers(search_params) # Protokolliere die Anzahl der gefundenen Ergebnisse logger.info(f'Suchergebnisse gefunden: {len(results)}') return jsonify(results) except Exception as e: logger.error(f'Fehler bei der Suche: {str(e)}') return jsonify({"error": str(e)}), 500 def init_app(app): """Initialisiert die Anwendung mit allen notwendigen Einstellungen.""" with app.app_context(): # Initialisiere die Datenbank init_db() # Importiere die CSV-Daten import_csv() logger.info("Anwendung erfolgreich initialisiert") # Initialisiere die App init_app(app) if __name__ == '__main__': app.run(debug=True, port=5001)