Version 1.2.2: Verbesserte Telefonnummern-Formatierung und Dokumentation

This commit is contained in:
2025-03-18 13:47:41 +01:00
parent 68a2db28a1
commit d5954eac89
6 changed files with 125 additions and 57 deletions

125
app.py
View File

@@ -21,7 +21,10 @@ logger = logging.getLogger(__name__)
VERSION = "1.2.1"
# Pfad zur CSV-Datei
CSV_FILE = "data/customers.csv"
CSV_FILE = 'data/customers.csv'
# Pfad zur Datenbank
DB_FILE = 'data/customers.db'
# Lade Umgebungsvariablen
load_dotenv()
@@ -31,11 +34,11 @@ STATIC_PASSWORD = os.getenv('LOGIN_PASSWORD', 'default-password')
ALLOWED_IP_RANGES = os.getenv('ALLOWED_IP_RANGES', '').split(',')
def init_db():
"""Initialisiert die SQLite-Datenbank und erstellt die notwendigen Tabellen."""
conn = sqlite3.connect('customers.db')
"""Initialisiert die SQLite-Datenbank mit der notwendigen Tabelle."""
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
# Erstelle die Kunden-Tabelle
# Erstelle die Tabelle mit Indizes
c.execute('''
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -51,12 +54,20 @@ def init_db():
)
''')
# Erstelle Indizes für häufig durchsuchte Spalten
c.execute('CREATE INDEX IF NOT EXISTS idx_name ON customers(name)')
c.execute('CREATE INDEX IF NOT EXISTS idx_ort ON customers(ort)')
c.execute('CREATE INDEX IF NOT EXISTS idx_nummer ON customers(nummer)')
c.execute('CREATE INDEX IF NOT EXISTS idx_telefon ON customers(telefon)')
c.execute('CREATE INDEX IF NOT EXISTS idx_email ON customers(email)')
conn.commit()
conn.close()
logger.info('Datenbank initialisiert')
def import_csv():
"""Importiert die Daten aus der CSV-Datei in die SQLite-Datenbank."""
conn = sqlite3.connect('customers.db')
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
# Lösche bestehende Daten
@@ -64,7 +75,7 @@ def import_csv():
try:
# Lese die CSV-Datei mit pandas
df = pd.read_csv('data/customers.csv', sep=',', encoding='utf-8', quotechar='"')
df = pd.read_csv(CSV_FILE, sep=',', encoding='utf-8', quotechar='"')
# Entferne Anführungszeichen aus den Spaltennamen
df.columns = df.columns.str.strip('"')
@@ -103,48 +114,65 @@ def import_csv():
conn.close()
def search_customers(search_params):
"""Sucht Kunden in der Datenbank basierend auf den Suchparametern."""
conn = sqlite3.connect('customers.db')
"""Sucht nach Kunden basierend auf den Suchparametern."""
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
# Erstelle die WHERE-Bedingungen basierend auf den Suchparametern
conditions = []
params = []
if search_params.get('name'):
conditions.append('name LIKE ?')
params.append(f'%{search_params["name"]}%')
if search_params.get('ort'):
conditions.append('ort LIKE ?')
params.append(f'%{search_params["ort"]}%')
if search_params.get('nummer'):
conditions.append('nummer LIKE ?')
params.append(f'%{search_params["nummer"]}%')
if search_params.get('plz'):
conditions.append('plz LIKE ?')
params.append(f'%{search_params["plz"]}%')
# Erstelle die SQL-Abfrage
sql = 'SELECT * FROM customers'
if conditions:
sql += ' WHERE ' + ' AND '.join(conditions)
# Führe die Abfrage aus
c.execute(sql, params)
results = c.fetchall()
# Konvertiere die Ergebnisse in ein Dictionary
columns = ['id', 'nummer', 'name', 'strasse', 'plz', 'ort', 'telefon', 'mobil', 'email', 'bemerkung']
customers = []
for row in results:
customer = dict(zip(columns, row))
customers.append(customer)
conn.close()
return customers
try:
# Baue die SQL-Abfrage dynamisch auf
query = "SELECT * FROM customers WHERE 1=1"
params = []
# Allgemeine Suche über alle Felder
if search_params.get('q'):
search_term = f"%{search_params['q']}%"
query += " AND (name LIKE ? OR ort LIKE ? OR nummer LIKE ? OR telefon LIKE ? OR mobil LIKE ? OR email LIKE ? OR bemerkung LIKE ?)"
params.extend([search_term] * 7)
# Spezifische Suche für einzelne Felder
if search_params.get('name'):
query += " AND name LIKE ?"
params.append(f"%{search_params['name']}%")
if search_params.get('ort'):
query += " AND ort LIKE ?"
params.append(f"%{search_params['ort']}%")
if search_params.get('nummer'):
query += " AND nummer LIKE ?"
params.append(f"%{search_params['nummer']}%")
if search_params.get('plz'):
query += " AND plz LIKE ?"
params.append(f"%{search_params['plz']}%")
# Führe die Abfrage aus
c.execute(query, params)
results = c.fetchall()
# Formatiere die Ergebnisse
customers = []
for row in results:
customer = {
'id': row[0],
'nummer': row[1],
'name': row[2],
'strasse': row[3],
'plz': row[4],
'ort': row[5],
'telefon': row[6],
'mobil': row[7],
'email': row[8],
'bemerkung': row[9]
}
customers.append(customer)
return customers
except Exception as e:
logger.error(f"Fehler bei der Kundensuche: {str(e)}")
raise
finally:
conn.close()
def clean_dataframe(df):
"""Konvertiert NaN-Werte in None für JSON-Kompatibilität"""
@@ -233,7 +261,10 @@ def search():
'name': request.args.get('name', ''),
'ort': request.args.get('ort', ''),
'nummer': request.args.get('nummer', ''),
'plz': request.args.get('plz', '')
'plz': request.args.get('plz', ''),
'telefon': request.args.get('telefon', ''),
'email': request.args.get('email', ''),
'q': request.args.get('q', '')
}
# Führe die Suche in der Datenbank durch