diff --git a/app.py b/app.py index 0ddefff..008db72 100644 --- a/app.py +++ b/app.py @@ -8,14 +8,17 @@ from dotenv import load_dotenv import requests from collections import defaultdict import ipaddress +import csv +import sqlite3 +from functools import wraps app = Flask(__name__, static_folder='static') -app.secret_key = 'your_secret_key' # Setzen Sie einen sicheren geheimen Schlüssel für die Session -logging.basicConfig(level=logging.DEBUG) +app.secret_key = os.getenv('SECRET_KEY', 'default-secret-key') +logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Version der Anwendung -VERSION = "1.0.6" +VERSION = "1.2.1" # Pfad zur CSV-Datei CSV_FILE = "data/customers.csv" @@ -24,7 +27,124 @@ CSV_FILE = "data/customers.csv" load_dotenv() # Statisches Passwort aus der .env Datei -STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'changeme') +STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'default-password') +ALLOWED_IP_RANGES = os.getenv('ALLOWED_IP_RANGES', '').split(',') + +def init_db(): + """Initialisiert die SQLite-Datenbank und erstellt die notwendigen Tabellen.""" + conn = sqlite3.connect('customers.db') + c = conn.cursor() + + # Erstelle die Kunden-Tabelle + c.execute(''' + CREATE TABLE IF NOT EXISTS customers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + nummer TEXT, + name TEXT, + strasse TEXT, + plz TEXT, + ort TEXT, + telefon TEXT, + mobil TEXT, + email TEXT, + bemerkung TEXT + ) + ''') + + conn.commit() + conn.close() + +def import_csv(): + """Importiert die Daten aus der CSV-Datei in die SQLite-Datenbank.""" + conn = sqlite3.connect('customers.db') + c = conn.cursor() + + # Lösche bestehende Daten + c.execute('DELETE FROM customers') + + try: + # Lese die CSV-Datei mit pandas + df = pd.read_csv('data/customers.csv', sep=',', encoding='utf-8', quotechar='"') + + # Entferne Anführungszeichen aus den Spaltennamen + df.columns = df.columns.str.strip('"') + + # Entferne Anführungszeichen aus den Werten + for col in df.columns: + if df[col].dtype == 'object': + df[col] = df[col].str.strip('"') + + # Kombiniere Vorname und Nachname + df['name'] = df['Vorname'] + ' ' + df['Nachname'] + + # Importiere die Daten + for _, row in df.iterrows(): + c.execute(''' + INSERT INTO customers (nummer, name, strasse, plz, ort, telefon, mobil, email, bemerkung) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + row['Nummer'], + row['name'], + row['Strasse'], + row['PLZ'], + row['Ort'], + row['Tel'], + row['Handy'], + row['mail'], + f"Fachrichtung: {row['Fachrichtung']}" + )) + + conn.commit() + logger.info('CSV-Daten erfolgreich in die Datenbank importiert') + except Exception as e: + logger.error(f'Fehler beim Import der CSV-Daten: {str(e)}') + raise + finally: + conn.close() + +def search_customers(search_params): + """Sucht Kunden in der Datenbank basierend auf den Suchparametern.""" + conn = sqlite3.connect('customers.db') + c = conn.cursor() + + # Erstelle die WHERE-Bedingungen basierend auf den Suchparametern + conditions = [] + params = [] + + if search_params.get('name'): + conditions.append('name LIKE ?') + params.append(f'%{search_params["name"]}%') + + if search_params.get('ort'): + conditions.append('ort LIKE ?') + params.append(f'%{search_params["ort"]}%') + + if search_params.get('nummer'): + conditions.append('nummer LIKE ?') + params.append(f'%{search_params["nummer"]}%') + + if search_params.get('plz'): + conditions.append('plz LIKE ?') + params.append(f'%{search_params["plz"]}%') + + # Erstelle die SQL-Abfrage + sql = 'SELECT * FROM customers' + if conditions: + sql += ' WHERE ' + ' AND '.join(conditions) + + # Führe die Abfrage aus + c.execute(sql, params) + results = c.fetchall() + + # Konvertiere die Ergebnisse in ein Dictionary + columns = ['id', 'nummer', 'name', 'strasse', 'plz', 'ort', 'telefon', 'mobil', 'email', 'bemerkung'] + customers = [] + for row in results: + customer = dict(zip(columns, row)) + customers.append(customer) + + conn.close() + return customers def clean_dataframe(df): """Konvertiert NaN-Werte in None für JSON-Kompatibilität""" @@ -61,6 +181,7 @@ def login(): logger.info(f"Client-IP: {client_ip}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") + logger.info(f"Session Status: {session}") # Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt client_ip_obj = ipaddress.ip_address(client_ip) @@ -71,23 +192,33 @@ def login(): if client_ip_obj in network: logger.info("Client-IP ist im erlaubten Bereich.") session['logged_in'] = True + session.permanent = True # Session bleibt bestehen return redirect(url_for('index')) except ValueError: logger.error(f"Ungültiges Netzwerkformat: {ip_range}") if request.method == 'POST': password = request.form.get('password') + logger.info(f"Login-Versuch mit Passwort: {'*' * len(password) if password else 'None'}") if password == STATIC_PASSWORD: session['logged_in'] = True + session.permanent = True # Session bleibt bestehen + logger.info("Login erfolgreich, Session gesetzt") return redirect(url_for('index')) else: + logger.warning("Falsches Passwort eingegeben") return render_template('login.html', error="Falsches Passwort") + + logger.info("Zeige Login-Seite") return render_template('login.html') @app.route('/') def index(): + logger.info(f"Index-Route aufgerufen. Session Status: {session}") if not session.get('logged_in'): + logger.info("Benutzer nicht eingeloggt, Weiterleitung zum Login") return redirect(url_for('login')) + allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '') client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) logger.info(f"Client-IP: {client_ip}") @@ -96,71 +227,37 @@ def index(): @app.route('/search') def search(): - if not session.get('logged_in'): - return redirect(url_for('login')) try: - # CSV-Datei laden - df = load_data() - if df is None: - return jsonify({"error": "Datenbank konnte nicht geladen werden"}), 500 - - # Suchparameter aus der URL holen - name = request.args.get('name', '').strip() - ort = request.args.get('ort', '').strip() - kundennummer = request.args.get('kundennummer', '').strip() - fachrichtung = request.args.get('fachrichtung', '').strip() - telefon = request.args.get('telefon', '').strip() - query = request.args.get('q', '').strip() - - # Initialisiere die Maske für die Filterung - mask = pd.Series(True, index=df.index) + # Hole die Suchparameter aus der Anfrage + search_params = { + 'name': request.args.get('name', ''), + 'ort': request.args.get('ort', ''), + 'nummer': request.args.get('nummer', ''), + 'plz': request.args.get('plz', '') + } - # Wenn eine allgemeine Suche angegeben ist - if query: - query_mask = ( - df['Vorname'].str.contains(query, case=False, na=False) | - df['Nachname'].str.contains(query, case=False, na=False) | - df['Ort'].str.contains(query, case=False, na=False) | - df['Nummer'].astype(str).str.contains(query, case=False, na=False) | - df['Fachrichtung'].str.contains(query, case=False, na=False) | - df['Tel'].astype(str).str.contains(query, case=False, na=False) - ) - mask &= query_mask + # Führe die Suche in der Datenbank durch + results = search_customers(search_params) - # Spezifische Suchkriterien anwenden - if name: - name_mask = ( - df['Vorname'].str.contains(name, case=False, na=False) | - df['Nachname'].str.contains(name, case=False, na=False) - ) - mask &= name_mask + # Protokolliere die Anzahl der gefundenen Ergebnisse + logger.info(f'Suchergebnisse gefunden: {len(results)}') - if ort: - ort_mask = df['Ort'].str.contains(ort, case=False, na=False) - mask &= ort_mask - - if kundennummer: - kundennummer_mask = df['Nummer'].astype(str).str.contains(kundennummer, case=False, na=False) - mask &= kundennummer_mask - - if fachrichtung: - fachrichtung_mask = df['Fachrichtung'].str.contains(fachrichtung, case=False, na=False) - mask &= fachrichtung_mask - - if telefon: - telefon_mask = df['Tel'].astype(str).str.contains(telefon, case=False, na=False) - mask &= telefon_mask - - results = df[mask].to_dict('records') - logger.info(f"{len(results)} Ergebnisse gefunden") - - return jsonify({ - 'results': results, - 'total': len(results) - }) + return jsonify(results) except Exception as e: - logger.error(f"Fehler bei der Suche: {str(e)}") + logger.error(f'Fehler bei der Suche: {str(e)}') return jsonify({"error": str(e)}), 500 +def init_app(app): + """Initialisiert die Anwendung mit allen notwendigen Einstellungen.""" + with app.app_context(): + # Initialisiere die Datenbank + init_db() + # Importiere die CSV-Daten + import_csv() + logger.info("Anwendung erfolgreich initialisiert") + +# Initialisiere die App +init_app(app) + if __name__ == '__main__': app.run(debug=True, port=5001) \ No newline at end of file