Add grouped inventory goals with import completion notifications.

Players can create named goal groups, set absolute item targets from inventory, track progress in a new Goals tab, and get banners when uploads complete goals or entire groups.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-06-19 22:25:42 +02:00
parent 82b47f9df1
commit f5b5541555
8 changed files with 965 additions and 5 deletions
+317
View File
@@ -57,6 +57,21 @@ def init_db(conn: sqlite3.Connection) -> None:
FOREIGN KEY (snapshot_id) REFERENCES snapshots(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_snapshots_exported ON snapshots(exported_at);
CREATE TABLE IF NOT EXISTS goal_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS goals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER REFERENCES goal_groups(id) ON DELETE SET NULL,
item_key TEXT NOT NULL UNIQUE,
item_name TEXT NOT NULL,
category TEXT NOT NULL,
target_qty INTEGER NOT NULL,
created_at TEXT NOT NULL,
completed_at TEXT
);
""")
conn.commit()
@@ -143,6 +158,8 @@ def import_save(
)
conn.commit()
goals_result = check_goals_after_import(snapshot_id, conn=conn, db_path=db_path)
if own_conn:
conn.close()
@@ -151,6 +168,7 @@ def import_save(
"snapshot_id": snapshot_id,
"import_report": import_report,
"import_summary": meta.get("import_summary"),
**goals_result,
}
@@ -342,3 +360,302 @@ def inventory_timeline(db_path: Path | str = DEFAULT_DB) -> dict[str, Any]:
series[key][idx] = row["qty"]
return {"snapshots": snapshots, "series": series}
def _latest_snapshot_id(conn: sqlite3.Connection) -> int | None:
row = conn.execute(
"SELECT id FROM snapshots ORDER BY exported_at DESC, id DESC LIMIT 1"
).fetchone()
return row["id"] if row else None
def _inventory_qty_for_snapshot(conn: sqlite3.Connection, snapshot_id: int, item_key: str) -> int:
row = conn.execute(
"SELECT qty FROM inventory_snapshots WHERE snapshot_id = ? AND item_key = ?",
(snapshot_id, item_key),
).fetchone()
return row["qty"] if row else 0
def _goal_row_to_dict(row: sqlite3.Row, current_qty: int) -> dict[str, Any]:
target = row["target_qty"]
progress_pct = min(100, int(current_qty / target * 100)) if target > 0 else 0
return {
"id": row["id"],
"group_id": row["group_id"],
"group_name": row["group_name"],
"item_key": row["item_key"],
"item_name": row["item_name"],
"category": row["category"],
"target_qty": target,
"current_qty": current_qty,
"progress_pct": progress_pct,
"completed_at": row["completed_at"],
"created_at": row["created_at"],
}
def list_goal_groups(db_path: Path | str = DEFAULT_DB) -> list[dict[str, Any]]:
conn = get_connection(db_path)
init_db(conn)
rows = conn.execute(
"""
SELECT g.id, g.name, g.created_at,
COUNT(go.id) AS total,
SUM(CASE WHEN go.completed_at IS NOT NULL THEN 1 ELSE 0 END) AS completed
FROM goal_groups g
LEFT JOIN goals go ON go.group_id = g.id
GROUP BY g.id
ORDER BY g.created_at ASC, g.id ASC
"""
).fetchall()
conn.close()
result = []
for r in rows:
total = r["total"] or 0
completed = r["completed"] or 0
result.append({
"id": r["id"],
"name": r["name"],
"created_at": r["created_at"],
"total": total,
"completed": completed,
"open": total - completed,
})
return result
def create_goal_group(name: str, db_path: Path | str = DEFAULT_DB) -> dict[str, Any]:
name = name.strip()
if not name:
raise ValueError("Group name is required")
conn = get_connection(db_path)
init_db(conn)
cur = conn.execute(
"INSERT INTO goal_groups (name, created_at) VALUES (?, ?)",
(name, _utc_now_iso()),
)
group_id = cur.lastrowid
conn.commit()
conn.close()
return {"id": group_id, "name": name, "total": 0, "completed": 0, "open": 0}
def delete_goal_group(group_id: int, db_path: Path | str = DEFAULT_DB) -> bool:
conn = get_connection(db_path)
init_db(conn)
row = conn.execute("SELECT id FROM goal_groups WHERE id = ?", (group_id,)).fetchone()
if not row:
conn.close()
return False
conn.execute("UPDATE goals SET group_id = NULL WHERE group_id = ?", (group_id,))
conn.execute("DELETE FROM goal_groups WHERE id = ?", (group_id,))
conn.commit()
conn.close()
return True
def _fetch_goals_with_qty(conn: sqlite3.Connection, snapshot_id: int | None) -> list[dict[str, Any]]:
rows = conn.execute(
"""
SELECT go.id, go.group_id, gg.name AS group_name, go.item_key, go.item_name,
go.category, go.target_qty, go.created_at, go.completed_at
FROM goals go
LEFT JOIN goal_groups gg ON gg.id = go.group_id
ORDER BY go.created_at ASC, go.id ASC
"""
).fetchall()
goals = []
for row in rows:
current_qty = _inventory_qty_for_snapshot(conn, snapshot_id, row["item_key"]) if snapshot_id else 0
goals.append(_goal_row_to_dict(row, current_qty))
return goals
def list_goals_structured(db_path: Path | str = DEFAULT_DB) -> dict[str, Any]:
conn = get_connection(db_path)
init_db(conn)
snapshot_id = _latest_snapshot_id(conn)
all_goals = _fetch_goals_with_qty(conn, snapshot_id)
groups_map: dict[int, dict[str, Any]] = {}
ungrouped: list[dict[str, Any]] = []
for goal in all_goals:
if goal["group_id"] is None:
ungrouped.append(goal)
continue
gid = goal["group_id"]
if gid not in groups_map:
groups_map[gid] = {
"id": gid,
"name": goal["group_name"],
"total": 0,
"completed": 0,
"goals": [],
}
groups_map[gid]["goals"].append(goal)
groups_map[gid]["total"] += 1
if goal["completed_at"]:
groups_map[gid]["completed"] += 1
conn.close()
return {
"groups": list(groups_map.values()),
"ungrouped": ungrouped,
}
def _resolve_item_from_latest(
conn: sqlite3.Connection, item_key: str
) -> dict[str, str] | None:
snapshot_id = _latest_snapshot_id(conn)
if not snapshot_id:
return None
row = conn.execute(
"SELECT item_key, qty, category FROM inventory_snapshots WHERE snapshot_id = ? AND item_key = ?",
(snapshot_id, item_key),
).fetchone()
if not row:
return None
latest = get_latest_snapshot(conn=conn)
item_name = item_key.replace("_", " ").title()
if latest:
for item in latest.get("inventory", []):
if item["key"] == item_key:
item_name = item["name"]
break
return {"item_key": item_key, "item_name": item_name, "category": row["category"], "qty": row["qty"]}
def create_goal(
item_key: str,
target_qty: int,
group_id: int | None = None,
db_path: Path | str = DEFAULT_DB,
) -> dict[str, Any]:
if target_qty <= 0:
raise ValueError("Target quantity must be positive")
conn = get_connection(db_path)
init_db(conn)
existing = conn.execute("SELECT id FROM goals WHERE item_key = ?", (item_key,)).fetchone()
if existing:
conn.close()
raise ValueError("A goal for this item already exists")
if group_id is not None:
group = conn.execute("SELECT id FROM goal_groups WHERE id = ?", (group_id,)).fetchone()
if not group:
conn.close()
raise ValueError("Goal group not found")
item = _resolve_item_from_latest(conn, item_key)
if not item:
conn.close()
raise ValueError("Item not found in current inventory")
now = _utc_now_iso()
completed_at = now if item["qty"] >= target_qty else None
cur = conn.execute(
"""
INSERT INTO goals (group_id, item_key, item_name, category, target_qty, created_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(group_id, item_key, item["item_name"], item["category"], target_qty, now, completed_at),
)
goal_id = cur.lastrowid
conn.commit()
row = conn.execute(
"""
SELECT go.id, go.group_id, gg.name AS group_name, go.item_key, go.item_name,
go.category, go.target_qty, go.created_at, go.completed_at
FROM goals go
LEFT JOIN goal_groups gg ON gg.id = go.group_id
WHERE go.id = ?
""",
(goal_id,),
).fetchone()
conn.close()
return _goal_row_to_dict(row, item["qty"])
def delete_goal(goal_id: int, db_path: Path | str = DEFAULT_DB) -> bool:
conn = get_connection(db_path)
init_db(conn)
cur = conn.execute("DELETE FROM goals WHERE id = ?", (goal_id,))
conn.commit()
deleted = cur.rowcount > 0
conn.close()
return deleted
def check_goals_after_import(
snapshot_id: int,
conn: sqlite3.Connection | None = None,
db_path: Path | str = DEFAULT_DB,
) -> dict[str, Any]:
own_conn = conn is None
if own_conn:
conn = get_connection(db_path)
init_db(conn)
open_goals = conn.execute(
"""
SELECT go.id, go.group_id, gg.name AS group_name, go.item_key, go.item_name,
go.target_qty
FROM goals go
LEFT JOIN goal_groups gg ON gg.id = go.group_id
WHERE go.completed_at IS NULL
"""
).fetchall()
now = _utc_now_iso()
goals_completed: list[dict[str, Any]] = []
groups_touched: set[int] = set()
for goal in open_goals:
current_qty = _inventory_qty_for_snapshot(conn, snapshot_id, goal["item_key"])
if current_qty >= goal["target_qty"]:
conn.execute(
"UPDATE goals SET completed_at = ? WHERE id = ?",
(now, goal["id"]),
)
entry = {
"id": goal["id"],
"item_key": goal["item_key"],
"item_name": goal["item_name"],
"target_qty": goal["target_qty"],
"current_qty": current_qty,
"group_id": goal["group_id"],
"group_name": goal["group_name"],
}
goals_completed.append(entry)
if goal["group_id"] is not None:
groups_touched.add(goal["group_id"])
groups_completed: list[dict[str, Any]] = []
for gid in groups_touched:
stats = conn.execute(
"""
SELECT COUNT(*) AS total,
SUM(CASE WHEN completed_at IS NOT NULL THEN 1 ELSE 0 END) AS completed
FROM goals WHERE group_id = ?
""",
(gid,),
).fetchone()
if stats and stats["total"] and stats["total"] == stats["completed"]:
g = conn.execute("SELECT id, name FROM goal_groups WHERE id = ?", (gid,)).fetchone()
if g:
groups_completed.append({"id": g["id"], "name": g["name"]})
conn.commit()
if own_conn:
conn.close()
return {
"goals_completed": goals_completed,
"groups_completed": groups_completed,
}