Version 1.2.1: Verbesserte CSV-Import-Funktionalität und Login-Fix

This commit is contained in:
2025-03-18 12:54:31 +01:00
parent 00bb197620
commit 0b1ad7b2c5

223
app.py
View File

@@ -8,14 +8,17 @@ from dotenv import load_dotenv
import requests import requests
from collections import defaultdict from collections import defaultdict
import ipaddress import ipaddress
import csv
import sqlite3
from functools import wraps
app = Flask(__name__, static_folder='static') app = Flask(__name__, static_folder='static')
app.secret_key = 'your_secret_key' # Setzen Sie einen sicheren geheimen Schlüssel für die Session app.secret_key = os.getenv('SECRET_KEY', 'default-secret-key')
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Version der Anwendung # Version der Anwendung
VERSION = "1.0.6" VERSION = "1.2.1"
# Pfad zur CSV-Datei # Pfad zur CSV-Datei
CSV_FILE = "data/customers.csv" CSV_FILE = "data/customers.csv"
@@ -24,7 +27,124 @@ CSV_FILE = "data/customers.csv"
load_dotenv() load_dotenv()
# Statisches Passwort aus der .env Datei # Statisches Passwort aus der .env Datei
STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'changeme') STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'default-password')
ALLOWED_IP_RANGES = os.getenv('ALLOWED_IP_RANGES', '').split(',')
def init_db():
"""Initialisiert die SQLite-Datenbank und erstellt die notwendigen Tabellen."""
conn = sqlite3.connect('customers.db')
c = conn.cursor()
# Erstelle die Kunden-Tabelle
c.execute('''
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nummer TEXT,
name TEXT,
strasse TEXT,
plz TEXT,
ort TEXT,
telefon TEXT,
mobil TEXT,
email TEXT,
bemerkung TEXT
)
''')
conn.commit()
conn.close()
def import_csv():
"""Importiert die Daten aus der CSV-Datei in die SQLite-Datenbank."""
conn = sqlite3.connect('customers.db')
c = conn.cursor()
# Lösche bestehende Daten
c.execute('DELETE FROM customers')
try:
# Lese die CSV-Datei mit pandas
df = pd.read_csv('data/customers.csv', sep=',', encoding='utf-8', quotechar='"')
# Entferne Anführungszeichen aus den Spaltennamen
df.columns = df.columns.str.strip('"')
# Entferne Anführungszeichen aus den Werten
for col in df.columns:
if df[col].dtype == 'object':
df[col] = df[col].str.strip('"')
# Kombiniere Vorname und Nachname
df['name'] = df['Vorname'] + ' ' + df['Nachname']
# Importiere die Daten
for _, row in df.iterrows():
c.execute('''
INSERT INTO customers (nummer, name, strasse, plz, ort, telefon, mobil, email, bemerkung)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
row['Nummer'],
row['name'],
row['Strasse'],
row['PLZ'],
row['Ort'],
row['Tel'],
row['Handy'],
row['mail'],
f"Fachrichtung: {row['Fachrichtung']}"
))
conn.commit()
logger.info('CSV-Daten erfolgreich in die Datenbank importiert')
except Exception as e:
logger.error(f'Fehler beim Import der CSV-Daten: {str(e)}')
raise
finally:
conn.close()
def search_customers(search_params):
"""Sucht Kunden in der Datenbank basierend auf den Suchparametern."""
conn = sqlite3.connect('customers.db')
c = conn.cursor()
# Erstelle die WHERE-Bedingungen basierend auf den Suchparametern
conditions = []
params = []
if search_params.get('name'):
conditions.append('name LIKE ?')
params.append(f'%{search_params["name"]}%')
if search_params.get('ort'):
conditions.append('ort LIKE ?')
params.append(f'%{search_params["ort"]}%')
if search_params.get('nummer'):
conditions.append('nummer LIKE ?')
params.append(f'%{search_params["nummer"]}%')
if search_params.get('plz'):
conditions.append('plz LIKE ?')
params.append(f'%{search_params["plz"]}%')
# Erstelle die SQL-Abfrage
sql = 'SELECT * FROM customers'
if conditions:
sql += ' WHERE ' + ' AND '.join(conditions)
# Führe die Abfrage aus
c.execute(sql, params)
results = c.fetchall()
# Konvertiere die Ergebnisse in ein Dictionary
columns = ['id', 'nummer', 'name', 'strasse', 'plz', 'ort', 'telefon', 'mobil', 'email', 'bemerkung']
customers = []
for row in results:
customer = dict(zip(columns, row))
customers.append(customer)
conn.close()
return customers
def clean_dataframe(df): def clean_dataframe(df):
"""Konvertiert NaN-Werte in None für JSON-Kompatibilität""" """Konvertiert NaN-Werte in None für JSON-Kompatibilität"""
@@ -61,6 +181,7 @@ def login():
logger.info(f"Client-IP: {client_ip}") logger.info(f"Client-IP: {client_ip}")
logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}") logger.info(f"Erlaubte IP-Bereiche: {allowed_ip_ranges}")
logger.info(f"Session Status: {session}")
# Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt # Überprüfen, ob die IP-Adresse in einem der erlaubten Subnetze liegt
client_ip_obj = ipaddress.ip_address(client_ip) client_ip_obj = ipaddress.ip_address(client_ip)
@@ -71,23 +192,33 @@ def login():
if client_ip_obj in network: if client_ip_obj in network:
logger.info("Client-IP ist im erlaubten Bereich.") logger.info("Client-IP ist im erlaubten Bereich.")
session['logged_in'] = True session['logged_in'] = True
session.permanent = True # Session bleibt bestehen
return redirect(url_for('index')) return redirect(url_for('index'))
except ValueError: except ValueError:
logger.error(f"Ungültiges Netzwerkformat: {ip_range}") logger.error(f"Ungültiges Netzwerkformat: {ip_range}")
if request.method == 'POST': if request.method == 'POST':
password = request.form.get('password') password = request.form.get('password')
logger.info(f"Login-Versuch mit Passwort: {'*' * len(password) if password else 'None'}")
if password == STATIC_PASSWORD: if password == STATIC_PASSWORD:
session['logged_in'] = True session['logged_in'] = True
session.permanent = True # Session bleibt bestehen
logger.info("Login erfolgreich, Session gesetzt")
return redirect(url_for('index')) return redirect(url_for('index'))
else: else:
logger.warning("Falsches Passwort eingegeben")
return render_template('login.html', error="Falsches Passwort") return render_template('login.html', error="Falsches Passwort")
logger.info("Zeige Login-Seite")
return render_template('login.html') return render_template('login.html')
@app.route('/') @app.route('/')
def index(): def index():
logger.info(f"Index-Route aufgerufen. Session Status: {session}")
if not session.get('logged_in'): if not session.get('logged_in'):
logger.info("Benutzer nicht eingeloggt, Weiterleitung zum Login")
return redirect(url_for('login')) return redirect(url_for('login'))
allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '') allowed_ip_ranges = os.getenv('ALLOWED_IP_RANGES', '')
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
logger.info(f"Client-IP: {client_ip}") logger.info(f"Client-IP: {client_ip}")
@@ -96,71 +227,37 @@ def index():
@app.route('/search') @app.route('/search')
def search(): def search():
if not session.get('logged_in'):
return redirect(url_for('login'))
try: try:
# CSV-Datei laden # Hole die Suchparameter aus der Anfrage
df = load_data() search_params = {
if df is None: 'name': request.args.get('name', ''),
return jsonify({"error": "Datenbank konnte nicht geladen werden"}), 500 'ort': request.args.get('ort', ''),
'nummer': request.args.get('nummer', ''),
'plz': request.args.get('plz', '')
}
# Suchparameter aus der URL holen # Führe die Suche in der Datenbank durch
name = request.args.get('name', '').strip() results = search_customers(search_params)
ort = request.args.get('ort', '').strip()
kundennummer = request.args.get('kundennummer', '').strip()
fachrichtung = request.args.get('fachrichtung', '').strip()
telefon = request.args.get('telefon', '').strip()
query = request.args.get('q', '').strip()
# Initialisiere die Maske für die Filterung # Protokolliere die Anzahl der gefundenen Ergebnisse
mask = pd.Series(True, index=df.index) logger.info(f'Suchergebnisse gefunden: {len(results)}')
# Wenn eine allgemeine Suche angegeben ist return jsonify(results)
if query:
query_mask = (
df['Vorname'].str.contains(query, case=False, na=False) |
df['Nachname'].str.contains(query, case=False, na=False) |
df['Ort'].str.contains(query, case=False, na=False) |
df['Nummer'].astype(str).str.contains(query, case=False, na=False) |
df['Fachrichtung'].str.contains(query, case=False, na=False) |
df['Tel'].astype(str).str.contains(query, case=False, na=False)
)
mask &= query_mask
# Spezifische Suchkriterien anwenden
if name:
name_mask = (
df['Vorname'].str.contains(name, case=False, na=False) |
df['Nachname'].str.contains(name, case=False, na=False)
)
mask &= name_mask
if ort:
ort_mask = df['Ort'].str.contains(ort, case=False, na=False)
mask &= ort_mask
if kundennummer:
kundennummer_mask = df['Nummer'].astype(str).str.contains(kundennummer, case=False, na=False)
mask &= kundennummer_mask
if fachrichtung:
fachrichtung_mask = df['Fachrichtung'].str.contains(fachrichtung, case=False, na=False)
mask &= fachrichtung_mask
if telefon:
telefon_mask = df['Tel'].astype(str).str.contains(telefon, case=False, na=False)
mask &= telefon_mask
results = df[mask].to_dict('records')
logger.info(f"{len(results)} Ergebnisse gefunden")
return jsonify({
'results': results,
'total': len(results)
})
except Exception as e: except Exception as e:
logger.error(f"Fehler bei der Suche: {str(e)}") logger.error(f'Fehler bei der Suche: {str(e)}')
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
def init_app(app):
"""Initialisiert die Anwendung mit allen notwendigen Einstellungen."""
with app.app_context():
# Initialisiere die Datenbank
init_db()
# Importiere die CSV-Daten
import_csv()
logger.info("Anwendung erfolgreich initialisiert")
# Initialisiere die App
init_app(app)
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True, port=5001) app.run(debug=True, port=5001)