from flask import Flask, render_template, request, jsonify, url_for, redirect, session import pandas as pd import os import logging import numpy as np from datetime import datetime, timedelta from dotenv import load_dotenv import requests from collections import defaultdict import ipaddress import csv import sqlite3 from functools import wraps app = Flask(__name__, static_folder='static') app.secret_key = os.getenv('SECRET_KEY', 'default-secret-key') logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Version der Anwendung VERSION = "1.2.1" # Pfad zur CSV-Datei CSV_FILE = "data/customers.csv" # Lade Umgebungsvariablen load_dotenv() # Statisches Passwort aus der .env Datei STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'default-password') ALLOWED_IP_RANGES = os.getenv('ALLOWED_IP_RANGES', '').split(',') def init_db(): """Initialisiert die SQLite-Datenbank und erstellt die notwendigen Tabellen.""" conn = sqlite3.connect('customers.db') c = conn.cursor() # Erstelle die Kunden-Tabelle c.execute(''' CREATE TABLE IF NOT EXISTS customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, nummer TEXT, name TEXT, strasse TEXT, plz TEXT, ort TEXT, telefon TEXT, mobil TEXT, email TEXT, bemerkung TEXT ) ''') conn.commit() conn.close() def import_csv(): """Importiert die Daten aus der CSV-Datei in die SQLite-Datenbank.""" conn = sqlite3.connect('customers.db') c = conn.cursor() # Lösche bestehende Daten c.execute('DELETE FROM customers') try: # Lese die CSV-Datei mit pandas df = pd.read_csv('data/customers.csv', sep=',', encoding='utf-8', quotechar='"') # Entferne Anführungszeichen aus den Spaltennamen df.columns = df.columns.str.strip('"') # Entferne Anführungszeichen aus den Werten for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].str.strip('"') # Kombiniere Vorname und Nachname df['name'] = df['Vorname'] + ' ' + df['Nachname'] # Importiere die Daten for _, row in df.iterrows(): c.execute(''' INSERT INTO customers (nummer, name, strasse, plz, ort, telefon, mobil, email, bemerkung) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( row['Nummer'], row['name'], row['Strasse'], row['PLZ'], row['Ort'], row['Tel'], row['Handy'], row['mail'], f"Fachrichtung: {row['Fachrichtung']}" )) conn.commit() logger.info('CSV-Daten erfolgreich in die Datenbank importiert') except Exception as e: logger.error(f'Fehler beim Import der CSV-Daten: {str(e)}') raise finally: conn.close() def search_customers(search_params): """Sucht Kunden in der Datenbank basierend auf den Suchparametern.""" conn = sqlite3.connect('customers.db') c = conn.cursor() # Erstelle die WHERE-Bedingungen basierend auf den Suchparametern conditions = [] params = [] if search_params.get('name'): conditions.append('name LIKE ?') params.append(f'%{search_params["name"]}%') if search_params.get('ort'): conditions.append('ort LIKE ?') params.append(f'%{search_params["ort"]}%') if search_params.get('nummer'): conditions.append('nummer LIKE ?') params.append(f'%{search_params["nummer"]}%') if search_params.get('plz'): conditions.append('plz LIKE ?') params.append(f'%{search_params["plz"]}%') # Erstelle die SQL-Abfrage sql = 'SELECT * FROM customers' if conditions: sql += ' WHERE ' + ' AND '.join(conditions) # Führe die Abfrage aus c.execute(sql, params) results = c.fetchall() # Konvertiere die Ergebnisse in ein Dictionary columns = ['id', 'nummer', 'name', 'strasse', 'plz', 'ort', 'telefon', 'mobil', 'email', 'bemerkung'] customers = [] for row in results: customer = dict(zip(columns, row)) customers.append(customer) conn.close() return customers def clean_dataframe(df): """Konvertiert NaN-Werte in None für JSON-Kompatibilität""" return df.replace({np.nan: None}) # CSV-Datei laden def load_data(): try: logger.info("Versuche CSV-Datei zu laden...") if not os.path.exists(CSV_FILE): logger.error(f"CSV-Datei '{CSV_FILE}' nicht gefunden!") return None # Lade CSV mit Komma als Trennzeichen df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"') # Entferne Anführungszeichen aus den Spaltennamen df.columns = df.columns.str.strip('"') # Entferne Anführungszeichen aus den Werten for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].str.strip('"') df = clean_dataframe(df) logger.info(f"CSV-Datei erfolgreich geladen. {len(df)} Einträge gefunden.") return df except Exception as e: logger.error(f"Fehler beim Laden der CSV-Datei: {str(e)}") return None @app.route('/login', methods=['GET', 'POST']) def login(): # Versuche, die tatsächliche Client-IP aus dem X-Forwarded-For-Header zu erhalten client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '').split(',') logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") logger.info(f"Session Status: {session}") # Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt client_ip_obj = ipaddress.ip_address(client_ip) for ip_range in allowed_ip_ranges: try: network = ipaddress.ip_network(ip_range.strip(), strict=False) logger.info(f"Überprüfe Netzwerk: {network}") if client_ip_obj in network: logger.info("Client-IP ist im erlaubten Bereich.") session['logged_in'] = True session.permanent = True # Session bleibt bestehen return redirect(url_for('index')) except ValueError: logger.error(f"Ungültiges Netzwerkformat: {ip_range}") if request.method == 'POST': password = request.form.get('password') logger.info(f"Login-Versuch mit Passwort: {'*' * len(password) if password else 'None'}") if password == STATIC_PASSWORD: session['logged_in'] = True session.permanent = True # Session bleibt bestehen logger.info("Login erfolgreich, Session gesetzt") return redirect(url_for('index')) else: logger.warning("Falsches Passwort eingegeben") return render_template('login.html', error="Falsches Passwort") logger.info("Zeige Login-Seite") return render_template('login.html') @app.route('/') def index(): logger.info(f"Index-Route aufgerufen. Session Status: {session}") if not session.get('logged_in'): logger.info("Benutzer nicht eingeloggt, Weiterleitung zum Login") return redirect(url_for('login')) allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '') client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") return render_template('index.html', allowed_ip_ranges=allowed_ip_ranges) @app.route('/search') def search(): try: # Hole die Suchparameter aus der Anfrage search_params = { 'name': request.args.get('name', ''), 'ort': request.args.get('ort', ''), 'nummer': request.args.get('nummer', ''), 'plz': request.args.get('plz', '') } # Führe die Suche in der Datenbank durch results = search_customers(search_params) # Protokolliere die Anzahl der gefundenen Ergebnisse logger.info(f'Suchergebnisse gefunden: {len(results)}') return jsonify(results) except Exception as e: logger.error(f'Fehler bei der Suche: {str(e)}') return jsonify({"error": str(e)}), 500 def init_app(app): """Initialisiert die Anwendung mit allen notwendigen Einstellungen.""" with app.app_context(): # Initialisiere die Datenbank init_db() # Importiere die CSV-Daten import_csv() logger.info("Anwendung erfolgreich initialisiert") # Initialisiere die App init_app(app) if __name__ == '__main__': app.run(debug=True, port=5001)