382 lines
14 KiB
Python
382 lines
14 KiB
Python
from flask import Flask, render_template, request, jsonify, url_for, redirect, session
|
|
import pandas as pd
|
|
import os
|
|
import logging
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from dotenv import load_dotenv
|
|
import requests
|
|
from collections import defaultdict
|
|
import ipaddress
|
|
import csv
|
|
import sqlite3
|
|
from functools import wraps
|
|
|
|
app = Flask(__name__, static_folder='static')
|
|
app.secret_key = os.getenv('SECRET_KEY', 'default-secret-key')
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Version der Anwendung
|
|
VERSION = "1.2.1"
|
|
|
|
# Pfad zur CSV-Datei
|
|
CSV_FILE = 'data/customers.csv'
|
|
|
|
# Pfad zur Datenbank
|
|
DB_FILE = 'data/customers.db'
|
|
|
|
# Lade Umgebungsvariablen
|
|
load_dotenv()
|
|
|
|
# Statisches Passwort aus der .env Datei
|
|
STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'default-password')
|
|
ALLOWED_IP_RANGES = os.getenv('ALLOWED_IP_RANGES', '').split(',')
|
|
|
|
def init_db():
|
|
"""Initialisiert die SQLite-Datenbank mit der notwendigen Tabelle."""
|
|
conn = sqlite3.connect(DB_FILE)
|
|
c = conn.cursor()
|
|
|
|
# Erstelle die Kunden-Tabelle
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS customers (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
nummer TEXT,
|
|
name TEXT,
|
|
strasse TEXT,
|
|
plz TEXT,
|
|
ort TEXT,
|
|
telefon TEXT,
|
|
mobil TEXT,
|
|
email TEXT,
|
|
bemerkung TEXT,
|
|
fachrichtung TEXT
|
|
)
|
|
''')
|
|
|
|
# Erstelle die Tags-Tabelle
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL
|
|
)
|
|
''')
|
|
|
|
# Erstelle die Verbindungstabelle für Kunden-Tags
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS customer_tags (
|
|
customer_id INTEGER,
|
|
tag_id INTEGER,
|
|
PRIMARY KEY (customer_id, tag_id),
|
|
FOREIGN KEY (customer_id) REFERENCES customers (id),
|
|
FOREIGN KEY (tag_id) REFERENCES tags (id)
|
|
)
|
|
''')
|
|
|
|
# Erstelle Indizes für alle Suchfelder
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_nummer ON customers(nummer)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_name ON customers(name)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_strasse ON customers(strasse)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_plz ON customers(plz)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_ort ON customers(ort)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_telefon ON customers(telefon)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_mobil ON customers(mobil)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email)')
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_fachrichtung ON customers(fachrichtung)')
|
|
|
|
# Erstelle einen zusammengesetzten Index für die häufigste Suchkombination
|
|
c.execute('CREATE INDEX IF NOT EXISTS idx_customers_name_ort ON customers(name, ort)')
|
|
|
|
# Erstelle Standard-Tags, falls sie noch nicht existieren
|
|
c.execute('INSERT OR IGNORE INTO tags (name) VALUES (?)', ('medisoft',))
|
|
c.execute('INSERT OR IGNORE INTO tags (name) VALUES (?)', ('mediconsult',))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
logger.info('Datenbank initialisiert')
|
|
|
|
def import_csv():
|
|
"""Importiert die Daten aus der CSV-Datei in die SQLite-Datenbank."""
|
|
conn = sqlite3.connect(DB_FILE)
|
|
c = conn.cursor()
|
|
|
|
# Lösche bestehende Daten
|
|
c.execute('DELETE FROM customer_tags')
|
|
c.execute('DELETE FROM customers')
|
|
|
|
try:
|
|
# Lese die CSV-Datei mit pandas
|
|
df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"')
|
|
|
|
# Entferne Anführungszeichen aus den Spaltennamen
|
|
df.columns = df.columns.str.strip('"')
|
|
|
|
# Entferne Anführungszeichen aus den Werten
|
|
for col in df.columns:
|
|
if df[col].dtype == 'object':
|
|
df[col] = df[col].str.strip('"')
|
|
|
|
# Kombiniere Vorname und Nachname
|
|
df['name'] = df['Vorname'] + ' ' + df['Nachname']
|
|
|
|
# Hole die ID des medisoft Tags
|
|
c.execute('SELECT id FROM tags WHERE name = ?', ('medisoft',))
|
|
medisoft_tag_id = c.fetchone()[0]
|
|
|
|
# Importiere die Daten
|
|
for _, row in df.iterrows():
|
|
c.execute('''
|
|
INSERT INTO customers (nummer, name, strasse, plz, ort, telefon, mobil, email, bemerkung, fachrichtung)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
row['Nummer'],
|
|
row['name'],
|
|
row['Strasse'],
|
|
row['PLZ'],
|
|
row['Ort'],
|
|
row['Tel'],
|
|
row['Handy'],
|
|
row['mail'],
|
|
f"Fachrichtung: {row['Fachrichtung']}",
|
|
row['Fachrichtung']
|
|
))
|
|
|
|
# Hole die ID des eingefügten Kunden
|
|
customer_id = c.lastrowid
|
|
|
|
# Füge das medisoft Tag hinzu
|
|
c.execute('''
|
|
INSERT INTO customer_tags (customer_id, tag_id)
|
|
VALUES (?, ?)
|
|
''', (customer_id, medisoft_tag_id))
|
|
|
|
conn.commit()
|
|
logger.info('CSV-Daten erfolgreich in die Datenbank importiert')
|
|
except Exception as e:
|
|
logger.error(f'Fehler beim Import der CSV-Daten: {str(e)}')
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def search_customers(search_params):
|
|
"""Sucht nach Kunden basierend auf den Suchparametern."""
|
|
# Prüfe, ob alle Suchfelder leer sind
|
|
if not any([
|
|
search_params.get('q'),
|
|
search_params.get('name'),
|
|
search_params.get('ort'),
|
|
search_params.get('nummer'),
|
|
search_params.get('plz'),
|
|
search_params.get('telefon'),
|
|
search_params.get('email'),
|
|
search_params.get('fachrichtung')
|
|
]):
|
|
return []
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
c = conn.cursor()
|
|
|
|
try:
|
|
# Baue die SQL-Abfrage dynamisch auf
|
|
query = """
|
|
SELECT c.*, GROUP_CONCAT(t.name) as tags
|
|
FROM customers c
|
|
LEFT JOIN customer_tags ct ON c.id = ct.customer_id
|
|
LEFT JOIN tags t ON ct.tag_id = t.id
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
|
|
# Allgemeine Suche über alle Felder
|
|
if search_params.get('q'):
|
|
search_term = f"%{search_params['q']}%"
|
|
operator = search_params.get('operator', 'or').upper()
|
|
|
|
if operator == 'AND':
|
|
# Bei UND-Verknüpfung müssen alle Begriffe in mindestens einem Feld vorkommen
|
|
terms = search_params['q'].split()
|
|
conditions = []
|
|
for term in terms:
|
|
term = f"%{term}%"
|
|
conditions.append("(c.name LIKE ? OR c.ort LIKE ? OR c.nummer LIKE ? OR c.telefon LIKE ? OR c.mobil LIKE ? OR c.email LIKE ? OR c.bemerkung LIKE ? OR c.fachrichtung LIKE ?)")
|
|
params.extend([term] * 8)
|
|
query += " AND " + " AND ".join(conditions)
|
|
else:
|
|
# Bei ODER-Verknüpfung (Standard) muss mindestens ein Begriff in einem Feld vorkommen
|
|
query += " AND (c.name LIKE ? OR c.ort LIKE ? OR c.nummer LIKE ? OR c.telefon LIKE ? OR c.mobil LIKE ? OR c.email LIKE ? OR c.bemerkung LIKE ? OR c.fachrichtung LIKE ?)"
|
|
params.extend([search_term] * 8)
|
|
|
|
# Spezifische Suche für einzelne Felder
|
|
if search_params.get('name'):
|
|
query += " AND c.name LIKE ?"
|
|
params.append(f"%{search_params['name']}%")
|
|
|
|
if search_params.get('ort'):
|
|
query += " AND c.ort LIKE ?"
|
|
params.append(f"%{search_params['ort']}%")
|
|
|
|
if search_params.get('nummer'):
|
|
query += " AND c.nummer LIKE ?"
|
|
params.append(f"%{search_params['nummer']}%")
|
|
|
|
if search_params.get('plz'):
|
|
query += " AND c.plz LIKE ?"
|
|
params.append(f"%{search_params['plz']}%")
|
|
|
|
if search_params.get('fachrichtung'):
|
|
query += " AND c.fachrichtung LIKE ?"
|
|
params.append(f"%{search_params['fachrichtung']}%")
|
|
|
|
query += " GROUP BY c.id"
|
|
|
|
# Führe die Abfrage aus
|
|
c.execute(query, params)
|
|
results = c.fetchall()
|
|
|
|
# Formatiere die Ergebnisse
|
|
customers = []
|
|
for row in results:
|
|
customer = {
|
|
'id': row[0],
|
|
'nummer': row[1],
|
|
'name': row[2],
|
|
'strasse': row[3],
|
|
'plz': row[4],
|
|
'ort': row[5],
|
|
'telefon': row[6],
|
|
'mobil': row[7],
|
|
'email': row[8],
|
|
'bemerkung': row[9],
|
|
'fachrichtung': row[10],
|
|
'tags': row[11].split(',') if row[11] else []
|
|
}
|
|
customers.append(customer)
|
|
|
|
return customers
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Kundensuche: {str(e)}")
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def clean_dataframe(df):
|
|
"""Konvertiert NaN-Werte in None für JSON-Kompatibilität"""
|
|
return df.replace({np.nan: None})
|
|
|
|
# CSV-Datei laden
|
|
def load_data():
|
|
try:
|
|
logger.info("Versuche CSV-Datei zu laden...")
|
|
if not os.path.exists(CSV_FILE):
|
|
logger.error(f"CSV-Datei '{CSV_FILE}' nicht gefunden!")
|
|
return None
|
|
|
|
# Lade CSV mit Komma als Trennzeichen
|
|
df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"')
|
|
# Entferne Anführungszeichen aus den Spaltennamen
|
|
df.columns = df.columns.str.strip('"')
|
|
# Entferne Anführungszeichen aus den Werten
|
|
for col in df.columns:
|
|
if df[col].dtype == 'object':
|
|
df[col] = df[col].str.strip('"')
|
|
df = clean_dataframe(df)
|
|
logger.info(f"CSV-Datei erfolgreich geladen. {len(df)} Einträge gefunden.")
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Laden der CSV-Datei: {str(e)}")
|
|
return None
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
# Versuche, die tatsächliche Client-IP aus dem X-Forwarded-For-Header zu erhalten
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '').split(',')
|
|
|
|
logger.info(f"Client-IP: {client_ip}")
|
|
logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}")
|
|
logger.info(f"Session Status: {session}")
|
|
|
|
# Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt
|
|
client_ip_obj = ipaddress.ip_address(client_ip)
|
|
for ip_range in allowed_ip_ranges:
|
|
try:
|
|
network = ipaddress.ip_network(ip_range.strip(), strict=False)
|
|
logger.info(f"Überprüfe Netzwerk: {network}")
|
|
if client_ip_obj in network:
|
|
logger.info("Client-IP ist im erlaubten Bereich.")
|
|
session['logged_in'] = True
|
|
session.permanent = True # Session bleibt bestehen
|
|
return redirect(url_for('index'))
|
|
except ValueError:
|
|
logger.error(f"Ungültiges Netzwerkformat: {ip_range}")
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password')
|
|
logger.info(f"Login-Versuch mit Passwort: {'*' * len(password) if password else 'None'}")
|
|
if password == STATIC_PASSWORD:
|
|
session['logged_in'] = True
|
|
session.permanent = True # Session bleibt bestehen
|
|
logger.info("Login erfolgreich, Session gesetzt")
|
|
return redirect(url_for('index'))
|
|
else:
|
|
logger.warning("Falsches Passwort eingegeben")
|
|
return render_template('login.html', error="Falsches Passwort")
|
|
|
|
logger.info("Zeige Login-Seite")
|
|
return render_template('login.html')
|
|
|
|
@app.route('/')
|
|
def index():
|
|
logger.info(f"Index-Route aufgerufen. Session Status: {session}")
|
|
if not session.get('logged_in'):
|
|
logger.info("Benutzer nicht eingeloggt, Weiterleitung zum Login")
|
|
return redirect(url_for('login'))
|
|
|
|
allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '')
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
logger.info(f"Client-IP: {client_ip}")
|
|
logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}")
|
|
return render_template('index.html', allowed_ip_ranges=allowed_ip_ranges)
|
|
|
|
@app.route('/search')
|
|
def search():
|
|
try:
|
|
# Hole die Suchparameter aus der Anfrage
|
|
search_params = {
|
|
'name': request.args.get('name', ''),
|
|
'ort': request.args.get('ort', ''),
|
|
'nummer': request.args.get('nummer', ''),
|
|
'plz': request.args.get('plz', ''),
|
|
'telefon': request.args.get('telefon', ''),
|
|
'email': request.args.get('email', ''),
|
|
'q': request.args.get('q', ''),
|
|
'fachrichtung': request.args.get('fachrichtung', ''),
|
|
'operator': request.args.get('operator', 'or')
|
|
}
|
|
|
|
# Führe die Suche in der Datenbank durch
|
|
results = search_customers(search_params)
|
|
|
|
# Protokolliere die Anzahl der gefundenen Ergebnisse
|
|
logger.info(f'Suchergebnisse gefunden: {len(results)}')
|
|
|
|
return jsonify(results)
|
|
except Exception as e:
|
|
logger.error(f'Fehler bei der Suche: {str(e)}')
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
def init_app(app):
|
|
"""Initialisiert die Anwendung mit allen notwendigen Einstellungen."""
|
|
with app.app_context():
|
|
# Initialisiere die Datenbank
|
|
init_db()
|
|
# Importiere die CSV-Daten
|
|
import_csv()
|
|
logger.info("Anwendung erfolgreich initialisiert")
|
|
|
|
# Initialisiere die App
|
|
init_app(app)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, port=5001) |