162 lines
5.9 KiB
Python
162 lines
5.9 KiB
Python
from flask import Flask, render_template, request, jsonify, url_for, redirect, session
|
|
import pandas as pd
|
|
import os
|
|
import logging
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from dotenv import load_dotenv
|
|
import requests
|
|
from collections import defaultdict
|
|
import ipaddress
|
|
|
|
app = Flask(__name__, static_folder='static')
|
|
app.secret_key = 'your_secret_key' # Setzen Sie einen sicheren geheimen Schlüssel für die Session
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Version der Anwendung
|
|
VERSION = "1.0.6"
|
|
|
|
# Pfad zur CSV-Datei
|
|
CSV_FILE = "data/customers.csv"
|
|
|
|
# Lade Umgebungsvariablen
|
|
load_dotenv()
|
|
|
|
# Statisches Passwort aus der .env Datei
|
|
STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'changeme')
|
|
|
|
def clean_dataframe(df):
|
|
"""Konvertiert NaN-Werte in None für JSON-Kompatibilität"""
|
|
return df.replace({np.nan: None})
|
|
|
|
# CSV-Datei laden
|
|
def load_data():
|
|
try:
|
|
logger.info("Versuche CSV-Datei zu laden...")
|
|
if not os.path.exists(CSV_FILE):
|
|
logger.error(f"CSV-Datei '{CSV_FILE}' nicht gefunden!")
|
|
return None
|
|
|
|
# Lade CSV mit Komma als Trennzeichen
|
|
df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"')
|
|
# Entferne Anführungszeichen aus den Spaltennamen
|
|
df.columns = df.columns.str.strip('"')
|
|
# Entferne Anführungszeichen aus den Werten
|
|
for col in df.columns:
|
|
if df[col].dtype == 'object':
|
|
df[col] = df[col].str.strip('"')
|
|
df = clean_dataframe(df)
|
|
logger.info(f"CSV-Datei erfolgreich geladen. {len(df)} Einträge gefunden.")
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Laden der CSV-Datei: {str(e)}")
|
|
return None
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
# Versuche, die tatsächliche Client-IP aus dem X-Forwarded-For-Header zu erhalten
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '').split(',')
|
|
|
|
logger.info(f"Client-IP: {client_ip}")
|
|
logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}")
|
|
|
|
# Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt
|
|
client_ip_obj = ipaddress.ip_address(client_ip)
|
|
for ip_range in allowed_ip_ranges:
|
|
try:
|
|
network = ipaddress.ip_network(ip_range.strip(), strict=False)
|
|
logger.info(f"Überprüfe Netzwerk: {network}")
|
|
if client_ip_obj in network:
|
|
logger.info("Client-IP ist im erlaubten Bereich.")
|
|
session['logged_in'] = True
|
|
return redirect(url_for('index'))
|
|
except ValueError:
|
|
logger.error(f"Ungültiges Netzwerkformat: {ip_range}")
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password')
|
|
if password == STATIC_PASSWORD:
|
|
session['logged_in'] = True
|
|
return redirect(url_for('index'))
|
|
else:
|
|
return render_template('login.html', error="Falsches Passwort")
|
|
return render_template('login.html')
|
|
|
|
@app.route('/')
|
|
def index():
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
return render_template('index.html')
|
|
|
|
@app.route('/search')
|
|
def search():
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
try:
|
|
# CSV-Datei laden
|
|
df = load_data()
|
|
if df is None:
|
|
return jsonify({"error": "Datenbank konnte nicht geladen werden"}), 500
|
|
|
|
# Suchparameter aus der URL holen
|
|
name = request.args.get('name', '').strip()
|
|
ort = request.args.get('ort', '').strip()
|
|
kundennummer = request.args.get('kundennummer', '').strip()
|
|
fachrichtung = request.args.get('fachrichtung', '').strip()
|
|
telefon = request.args.get('telefon', '').strip()
|
|
query = request.args.get('q', '').strip()
|
|
|
|
# Initialisiere die Maske für die Filterung
|
|
mask = pd.Series(True, index=df.index)
|
|
|
|
# Wenn eine allgemeine Suche angegeben ist
|
|
if query:
|
|
query_mask = (
|
|
df['Vorname'].str.contains(query, case=False, na=False) |
|
|
df['Nachname'].str.contains(query, case=False, na=False) |
|
|
df['Ort'].str.contains(query, case=False, na=False) |
|
|
df['Nummer'].astype(str).str.contains(query, case=False, na=False) |
|
|
df['Fachrichtung'].str.contains(query, case=False, na=False) |
|
|
df['Tel'].astype(str).str.contains(query, case=False, na=False)
|
|
)
|
|
mask &= query_mask
|
|
|
|
# Spezifische Suchkriterien anwenden
|
|
if name:
|
|
name_mask = (
|
|
df['Vorname'].str.contains(name, case=False, na=False) |
|
|
df['Nachname'].str.contains(name, case=False, na=False)
|
|
)
|
|
mask &= name_mask
|
|
|
|
if ort:
|
|
ort_mask = df['Ort'].str.contains(ort, case=False, na=False)
|
|
mask &= ort_mask
|
|
|
|
if kundennummer:
|
|
kundennummer_mask = df['Nummer'].astype(str).str.contains(kundennummer, case=False, na=False)
|
|
mask &= kundennummer_mask
|
|
|
|
if fachrichtung:
|
|
fachrichtung_mask = df['Fachrichtung'].str.contains(fachrichtung, case=False, na=False)
|
|
mask &= fachrichtung_mask
|
|
|
|
if telefon:
|
|
telefon_mask = df['Tel'].astype(str).str.contains(telefon, case=False, na=False)
|
|
mask &= telefon_mask
|
|
|
|
results = df[mask].to_dict('records')
|
|
logger.info(f"{len(results)} Ergebnisse gefunden")
|
|
|
|
return jsonify({
|
|
'results': results,
|
|
'total': len(results)
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Suche: {str(e)}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, port=5001) |