from flask import Flask, render_template, request, jsonify, url_for, redirect, session import pandas as pd import os import logging import numpy as np from datetime import datetime, timedelta from dotenv import load_dotenv import requests from collections import defaultdict import ipaddress import csv import sqlite3 from functools import wraps app = Flask(__name__, static_folder='static') app.secret_key = os.getenv('SECRET_KEY', 'default-secret-key') logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Version der Anwendung VERSION = "1.2.1" # Pfad zur Datenbank DB_FILE = 'data/customers.db' # Lade Umgebungsvariablen load_dotenv() # Statisches Passwort aus der .env Datei STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'default-password') ALLOWED_IP_RANGES = os.getenv('ALLOWED_IP_RANGES', '').split(',') def get_db_connection(): """Erstellt eine neue Datenbankverbindung mit Timeout""" conn = sqlite3.connect(DB_FILE, timeout=20) conn.row_factory = sqlite3.Row return conn def init_db(): """Initialisiert die SQLite-Datenbank mit der notwendigen Tabelle.""" conn = get_db_connection() c = conn.cursor() try: # Erstelle die Kunden-Tabelle c.execute(''' CREATE TABLE IF NOT EXISTS customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, nummer TEXT, name TEXT, strasse TEXT, plz TEXT, ort TEXT, telefon TEXT, mobil TEXT, email TEXT, bemerkung TEXT, fachrichtung TEXT, tag TEXT ) ''') # Erstelle Indizes für alle Suchfelder c.execute('CREATE INDEX IF NOT EXISTS idx_customers_nummer ON customers(nummer)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_name ON customers(name)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_strasse ON customers(strasse)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_plz ON customers(plz)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_ort ON customers(ort)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_telefon ON customers(telefon)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_mobil ON customers(mobil)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_fachrichtung ON customers(fachrichtung)') c.execute('CREATE INDEX IF NOT EXISTS idx_customers_tag ON customers(tag)') # Erstelle einen zusammengesetzten Index für die häufigste Suchkombination c.execute('CREATE INDEX IF NOT EXISTS idx_customers_name_ort ON customers(name, ort)') conn.commit() logger.info('Datenbank initialisiert') except Exception as e: logger.error(f'Fehler bei der Datenbankinitialisierung: {str(e)}') raise finally: conn.close() def import_csv(): """Importiert die CSV-Datei in die Datenbank""" conn = None try: conn = get_db_connection() c = conn.cursor() # Lösche bestehende Daten c.execute('DELETE FROM customers') # Importiere MEDISOFT-Daten if os.path.exists('data/customers.csv'): logger.info("Importiere MEDISOFT-Daten...") df = pd.read_csv('data/customers.csv', encoding='utf-8') df.columns = df.columns.str.strip().str.replace('"', '') df = df.apply(lambda x: x.str.strip().str.replace('"', '') if x.dtype == "object" else x) df['name'] = df['Vorname'] + ' ' + df['Nachname'] for _, row in df.iterrows(): c.execute(''' INSERT INTO customers (name, nummer, strasse, plz, ort, telefon, mobil, email, fachrichtung, tag) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (row['name'], row['Nummer'], row['Strasse'], row['PLZ'], row['Ort'], row['Tel'], row['Handy'], row['mail'], row['Fachrichtung'], 'medisoft')) else: logger.warning("MEDISOFT CSV-Datei nicht gefunden") # Importiere MEDICONSULT-Daten if os.path.exists('data/customers_snk.csv'): logger.info("Importiere MEDICONSULT-Daten...") df_snk = pd.read_csv('data/customers_snk.csv', encoding='utf-8') df_snk.columns = df_snk.columns.str.strip().str.replace('"', '') df_snk = df_snk.apply(lambda x: x.str.strip().str.replace('"', '') if x.dtype == "object" else x) df_snk['name'] = df_snk['Vorname'] + ' ' + df_snk['Nachname'] for _, row in df_snk.iterrows(): c.execute(''' INSERT INTO customers (name, nummer, strasse, plz, ort, telefon, mobil, email, fachrichtung, tag) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (row['name'], row['Nummer'], row['Strasse'], row['PLZ'], row['Ort'], row['Tel'], row['Handy'], row['mail'], row['Fachrichtung'], 'mediconsult')) else: logger.warning("MEDICONSULT CSV-Datei nicht gefunden") conn.commit() logger.info("CSV-Daten erfolgreich in die Datenbank importiert") except Exception as e: logger.error(f"Fehler beim Importieren der CSV-Datei: {str(e)}") raise finally: if conn: conn.close() def search_customers(): try: q = request.args.get('q', '') name = request.args.get('name', '') ort = request.args.get('ort', '') nummer = request.args.get('nummer', '') plz = request.args.get('plz', '') fachrichtung = request.args.get('fachrichtung', '') operator = request.args.get('operator', 'or') conn = get_db_connection() c = conn.cursor() # Basis-SQL-Query query = ''' SELECT DISTINCT c.id, c.name, c.nummer, c.strasse, c.plz, c.ort, c.telefon, c.mobil, c.email, c.fachrichtung, c.tag FROM customers c WHERE 1=1 ''' params = [] # Suchbedingungen conditions = [] if q: search_terms = q.split() if operator == 'and': for term in search_terms: conditions.append(''' (c.name LIKE ? OR c.nummer LIKE ? OR c.strasse LIKE ? OR c.plz LIKE ? OR c.ort LIKE ? OR c.telefon LIKE ? OR c.mobil LIKE ? OR c.email LIKE ? OR c.fachrichtung LIKE ? OR c.tag LIKE ?) ''') params.extend([f'%{term}%'] * 10) else: term_conditions = [] for term in search_terms: term_conditions.append(''' (c.name LIKE ? OR c.nummer LIKE ? OR c.strasse LIKE ? OR c.plz LIKE ? OR c.ort LIKE ? OR c.telefon LIKE ? OR c.mobil LIKE ? OR c.email LIKE ? OR c.fachrichtung LIKE ? OR c.tag LIKE ?) ''') params.extend([f'%{term}%'] * 10) conditions.append('(' + ' OR '.join(term_conditions) + ')') if name: conditions.append('c.name LIKE ?') params.append(f'%{name}%') if ort: conditions.append('c.ort LIKE ?') params.append(f'%{ort}%') if nummer: conditions.append('c.nummer LIKE ?') params.append(f'%{nummer}%') if plz: conditions.append('c.plz LIKE ?') params.append(f'%{plz}%') if fachrichtung: conditions.append('c.fachrichtung LIKE ?') params.append(f'%{fachrichtung}%') if conditions: query += ' AND ' + ' AND '.join(conditions) c.execute(query, params) results = c.fetchall() # Formatiere die Ergebnisse formatted_results = [] for row in results: customer = { 'id': row[0], 'name': row[1], 'nummer': row[2], 'strasse': row[3], 'plz': row[4], 'ort': row[5], 'telefon': row[6], 'mobil': row[7], 'email': row[8], 'fachrichtung': row[9], 'tag': row[10] } formatted_results.append(customer) conn.close() return jsonify(formatted_results) except Exception as e: logger.error(f"Fehler bei der Suche: {str(e)}") return jsonify({'error': str(e)}), 500 def clean_dataframe(df): """Konvertiert NaN-Werte in None für JSON-Kompatibilität""" return df.replace({np.nan: None}) @app.route('/login', methods=['GET', 'POST']) def login(): # Versuche, die tatsächliche Client-IP aus dem X-Forwarded-For-Header zu erhalten client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '').split(',') logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") logger.info(f"Session Status: {session}") # Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt client_ip_obj = ipaddress.ip_address(client_ip) for ip_range in allowed_ip_ranges: try: network = ipaddress.ip_network(ip_range.strip(), strict=False) logger.info(f"Überprüfe Netzwerk: {network}") if client_ip_obj in network: logger.info("Client-IP ist im erlaubten Bereich.") session['logged_in'] = True session.permanent = True # Session bleibt bestehen return redirect(url_for('index')) except ValueError: logger.error(f"Ungültiges Netzwerkformat: {ip_range}") if request.method == 'POST': password = request.form.get('password') logger.info(f"Login-Versuch mit Passwort: {'*' * len(password) if password else 'None'}") if password == STATIC_PASSWORD: session['logged_in'] = True session.permanent = True # Session bleibt bestehen logger.info("Login erfolgreich, Session gesetzt") return redirect(url_for('index')) else: logger.warning("Falsches Passwort eingegeben") return render_template('login.html', error="Falsches Passwort") logger.info("Zeige Login-Seite") return render_template('login.html') @app.route('/') def index(): logger.info(f"Index-Route aufgerufen. Session Status: {session}") if not session.get('logged_in'): logger.info("Benutzer nicht eingeloggt, Weiterleitung zum Login") return redirect(url_for('login')) allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '') client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") return render_template('index.html', allowed_ip_ranges=allowed_ip_ranges) @app.route('/search', methods=['GET', 'POST']) def search_customers(): try: if request.method == 'POST': data = request.get_json() query = data.get('query', '') tag = data.get('tag', 'medisoft') else: query = request.args.get('q', '') name = request.args.get('name', '') ort = request.args.get('ort', '') nummer = request.args.get('nummer', '') plz = request.args.get('plz', '') fachrichtung = request.args.get('fachrichtung', '') operator = request.args.get('operator', 'or') tag = request.args.get('tag', 'medisoft') conn = get_db_connection() c = conn.cursor() # Basis-SQL-Query sql = ''' SELECT c.nummer, c.name, c.strasse, c.plz, c.ort, c.telefon, c.mobil, c.email, c.fachrichtung, c.tag, CASE WHEN c.tag = 'medisoft' THEN 'MEDISOFT' WHEN c.tag = 'mediconsult' THEN 'MEDICONSULT' ELSE c.tag END as formatted_tag FROM customers c WHERE 1=1 ''' params = [] if request.method == 'POST': if query: sql += ''' AND ( c.name LIKE ? OR c.nummer LIKE ? OR c.strasse LIKE ? OR c.plz LIKE ? OR c.ort LIKE ? OR c.telefon LIKE ? OR c.mobil LIKE ? OR c.email LIKE ? OR c.fachrichtung LIKE ? )''' search_pattern = f'%{query}%' params.extend([search_pattern] * 9) else: # Suchbedingungen für GET-Request conditions = [] if query: search_terms = query.split() if operator == 'and': for term in search_terms: conditions.append(''' (c.name LIKE ? OR c.nummer LIKE ? OR c.strasse LIKE ? OR c.plz LIKE ? OR c.ort LIKE ? OR c.telefon LIKE ? OR c.mobil LIKE ? OR c.email LIKE ? OR c.fachrichtung LIKE ?) ''') params.extend([f'%{term}%'] * 9) else: term_conditions = [] for term in search_terms: term_conditions.append(''' (c.name LIKE ? OR c.nummer LIKE ? OR c.strasse LIKE ? OR c.plz LIKE ? OR c.ort LIKE ? OR c.telefon LIKE ? OR c.mobil LIKE ? OR c.email LIKE ? OR c.fachrichtung LIKE ?) ''') params.extend([f'%{term}%'] * 9) conditions.append('(' + ' OR '.join(term_conditions) + ')') if name: conditions.append('c.name LIKE ?') params.append(f'%{name}%') if ort: conditions.append('c.ort LIKE ?') params.append(f'%{ort}%') if nummer: conditions.append('c.nummer LIKE ?') params.append(f'%{nummer}%') if plz: conditions.append('c.plz LIKE ?') params.append(f'%{plz}%') if fachrichtung: conditions.append('c.fachrichtung LIKE ?') params.append(f'%{fachrichtung}%') if conditions: sql += ' AND ' + ' AND '.join(conditions) # Füge Tag-Filter hinzu, wenn nicht 'all' ausgewählt ist if tag != 'all': sql += ' AND c.tag = ?' params.append(tag) sql += ' ORDER BY c.name' c.execute(sql, params) results = c.fetchall() formatted_results = [] for row in results: customer = { 'nummer': row[0], 'name': row[1], 'strasse': row[2], 'plz': row[3], 'ort': row[4], 'telefon': row[5], 'mobil': row[6], 'email': row[7], 'fachrichtung': row[8], 'tag': row[10] # Verwende den formatierten Tag } formatted_results.append(customer) conn.close() return jsonify(formatted_results) except Exception as e: print(f"Fehler bei der Suche: {str(e)}") return jsonify({'error': str(e)}), 500 def init_app(app): """Initialisiert die Anwendung mit allen notwendigen Einstellungen.""" with app.app_context(): try: # Stelle sicher, dass der data-Ordner existiert os.makedirs('data', exist_ok=True) # Lösche die alte Datenbank, falls sie existiert if os.path.exists(DB_FILE): try: os.remove(DB_FILE) logger.info(f"Alte Datenbank {DB_FILE} wurde gelöscht") except Exception as e: logger.error(f"Fehler beim Löschen der alten Datenbank: {str(e)}") # Initialisiere die Datenbank init_db() # Importiere die CSV-Daten import_csv() logger.info("Anwendung erfolgreich initialisiert") except Exception as e: logger.error(f"Fehler bei der Initialisierung: {str(e)}") raise # Initialisiere die App init_app(app) if __name__ == '__main__': app.run(debug=True, port=5001)