Add relative/skill goals, import diffs, and history tooling.
Extends the goals system and viewer UX so players can track item and skill targets with groups, ETAs, global search, snapshot management, and DB export. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -12,6 +12,7 @@ from typing import Any
|
||||
from parser import SaveParseError, normalize_save, load_save
|
||||
|
||||
DEFAULT_DB = Path(__file__).parent / "data" / "history.db"
|
||||
SKILL_GOAL_PREFIX = "__skill__:"
|
||||
|
||||
|
||||
def _utc_now_iso() -> str:
|
||||
@@ -65,17 +66,32 @@ def init_db(conn: sqlite3.Connection) -> None:
|
||||
CREATE TABLE IF NOT EXISTS goals (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
group_id INTEGER REFERENCES goal_groups(id) ON DELETE SET NULL,
|
||||
goal_type TEXT NOT NULL DEFAULT 'item',
|
||||
mode TEXT NOT NULL DEFAULT 'absolute',
|
||||
item_key TEXT NOT NULL UNIQUE,
|
||||
item_name TEXT NOT NULL,
|
||||
category TEXT NOT NULL,
|
||||
target_qty INTEGER NOT NULL,
|
||||
baseline_qty INTEGER,
|
||||
created_at TEXT NOT NULL,
|
||||
completed_at TEXT
|
||||
);
|
||||
""")
|
||||
_migrate_schema(conn)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def _migrate_schema(conn: sqlite3.Connection) -> None:
|
||||
"""Add columns introduced after initial goals release."""
|
||||
cols = {row[1] for row in conn.execute("PRAGMA table_info(goals)")}
|
||||
if "goal_type" not in cols:
|
||||
conn.execute("ALTER TABLE goals ADD COLUMN goal_type TEXT NOT NULL DEFAULT 'item'")
|
||||
if "mode" not in cols:
|
||||
conn.execute("ALTER TABLE goals ADD COLUMN mode TEXT NOT NULL DEFAULT 'absolute'")
|
||||
if "baseline_qty" not in cols:
|
||||
conn.execute("ALTER TABLE goals ADD COLUMN baseline_qty INTEGER")
|
||||
|
||||
|
||||
def file_hash(path: Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as f:
|
||||
@@ -159,6 +175,7 @@ def import_save(
|
||||
conn.commit()
|
||||
|
||||
goals_result = check_goals_after_import(snapshot_id, conn=conn, db_path=db_path)
|
||||
import_changes = summarize_import_changes(snapshot_id, conn=conn)
|
||||
|
||||
if own_conn:
|
||||
conn.close()
|
||||
@@ -168,6 +185,7 @@ def import_save(
|
||||
"snapshot_id": snapshot_id,
|
||||
"import_report": import_report,
|
||||
"import_summary": meta.get("import_summary"),
|
||||
"import_changes": import_changes,
|
||||
**goals_result,
|
||||
}
|
||||
|
||||
@@ -377,19 +395,122 @@ def _inventory_qty_for_snapshot(conn: sqlite3.Connection, snapshot_id: int, item
|
||||
return row["qty"] if row else 0
|
||||
|
||||
|
||||
def _goal_row_to_dict(row: sqlite3.Row, current_qty: int) -> dict[str, Any]:
|
||||
def _skill_goal_storage_key(skill_key: str) -> str:
|
||||
return f"{SKILL_GOAL_PREFIX}{skill_key}"
|
||||
|
||||
|
||||
def _skill_key_from_storage(storage_key: str) -> str | None:
|
||||
if storage_key.startswith(SKILL_GOAL_PREFIX):
|
||||
return storage_key[len(SKILL_GOAL_PREFIX):]
|
||||
return None
|
||||
|
||||
|
||||
def _skill_level_for_snapshot(conn: sqlite3.Connection, snapshot_id: int, skill_key: str) -> int:
|
||||
row = conn.execute(
|
||||
"SELECT level FROM skill_snapshots WHERE snapshot_id = ? AND skill_key = ?",
|
||||
(snapshot_id, skill_key),
|
||||
).fetchone()
|
||||
return row["level"] if row else 0
|
||||
|
||||
|
||||
def _goal_is_complete(
|
||||
goal_type: str,
|
||||
mode: str,
|
||||
target: int,
|
||||
baseline: int | None,
|
||||
*,
|
||||
current_item_qty: int = 0,
|
||||
current_skill_level: int = 0,
|
||||
) -> bool:
|
||||
base = baseline or 0
|
||||
if goal_type == "skill":
|
||||
current = current_skill_level
|
||||
else:
|
||||
current = current_item_qty
|
||||
if mode == "relative":
|
||||
return max(0, current - base) >= target
|
||||
return current >= target
|
||||
|
||||
|
||||
def _goal_progress_values(
|
||||
row: sqlite3.Row,
|
||||
*,
|
||||
current_item_qty: int = 0,
|
||||
current_skill_level: int = 0,
|
||||
) -> tuple[int, int, int, bool]:
|
||||
goal_type = row["goal_type"] if row["goal_type"] else "item"
|
||||
mode = row["mode"] if row["mode"] else "absolute"
|
||||
target = row["target_qty"]
|
||||
progress_pct = min(100, int(current_qty / target * 100)) if target > 0 else 0
|
||||
baseline = row["baseline_qty"] if row["baseline_qty"] is not None else 0
|
||||
|
||||
if goal_type == "skill":
|
||||
current = current_skill_level
|
||||
if mode == "relative":
|
||||
progress_val = max(0, current - baseline)
|
||||
completed = progress_val >= target
|
||||
display_current = progress_val
|
||||
display_target = target
|
||||
else:
|
||||
progress_val = current
|
||||
completed = current >= target
|
||||
display_current = current
|
||||
display_target = target
|
||||
else:
|
||||
current = current_item_qty
|
||||
if mode == "relative":
|
||||
progress_val = max(0, current - baseline)
|
||||
completed = progress_val >= target
|
||||
display_current = progress_val
|
||||
display_target = target
|
||||
else:
|
||||
progress_val = current
|
||||
completed = current >= target
|
||||
display_current = current
|
||||
display_target = target
|
||||
|
||||
progress_pct = min(100, int(progress_val / target * 100)) if target > 0 else (100 if completed else 0)
|
||||
return display_current, display_target, progress_pct, completed
|
||||
|
||||
|
||||
def _goal_row_to_dict(
|
||||
row: sqlite3.Row,
|
||||
*,
|
||||
current_item_qty: int = 0,
|
||||
current_skill_level: int = 0,
|
||||
rate_per_snapshot: float | None = None,
|
||||
) -> dict[str, Any]:
|
||||
display_current, display_target, progress_pct, completed = _goal_progress_values(
|
||||
row,
|
||||
current_item_qty=current_item_qty,
|
||||
current_skill_level=current_skill_level,
|
||||
)
|
||||
goal_type = row["goal_type"] if row["goal_type"] else "item"
|
||||
mode = row["mode"] if row["mode"] else "absolute"
|
||||
skill_key = _skill_key_from_storage(row["item_key"]) if goal_type == "skill" else None
|
||||
missing = max(0, display_target - display_current) if not row["completed_at"] else 0
|
||||
eta_snapshots = None
|
||||
if missing > 0 and rate_per_snapshot and rate_per_snapshot > 0:
|
||||
eta_snapshots = max(1, int((missing + rate_per_snapshot - 1) // rate_per_snapshot))
|
||||
|
||||
return {
|
||||
"id": row["id"],
|
||||
"group_id": row["group_id"],
|
||||
"group_name": row["group_name"],
|
||||
"item_key": row["item_key"],
|
||||
"goal_type": goal_type,
|
||||
"mode": mode,
|
||||
"item_key": row["item_key"] if goal_type == "item" else skill_key,
|
||||
"skill_key": skill_key,
|
||||
"item_name": row["item_name"],
|
||||
"category": row["category"],
|
||||
"target_qty": target,
|
||||
"current_qty": current_qty,
|
||||
"target_qty": row["target_qty"],
|
||||
"baseline_qty": row["baseline_qty"],
|
||||
"current_qty": display_current,
|
||||
"target_display": display_target,
|
||||
"raw_current_qty": current_item_qty,
|
||||
"raw_skill_level": current_skill_level,
|
||||
"missing_qty": missing,
|
||||
"progress_pct": progress_pct,
|
||||
"eta_snapshots": eta_snapshots,
|
||||
"completed_at": row["completed_at"],
|
||||
"created_at": row["created_at"],
|
||||
}
|
||||
@@ -455,20 +576,81 @@ def delete_goal_group(group_id: int, db_path: Path | str = DEFAULT_DB) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def rename_goal_group(group_id: int, name: str, db_path: Path | str = DEFAULT_DB) -> bool:
|
||||
name = name.strip()
|
||||
if not name:
|
||||
raise ValueError("Group name is required")
|
||||
conn = get_connection(db_path)
|
||||
init_db(conn)
|
||||
cur = conn.execute(
|
||||
"UPDATE goal_groups SET name = ? WHERE id = ?",
|
||||
(name, group_id),
|
||||
)
|
||||
conn.commit()
|
||||
updated = cur.rowcount > 0
|
||||
conn.close()
|
||||
return updated
|
||||
|
||||
|
||||
def _goal_item_rates(conn: sqlite3.Connection) -> dict[str, float]:
|
||||
"""Average qty gain per snapshot for item goals (last segments)."""
|
||||
snap_rows = conn.execute(
|
||||
"SELECT id FROM snapshots ORDER BY exported_at ASC, id ASC"
|
||||
).fetchall()
|
||||
if len(snap_rows) < 2:
|
||||
return {}
|
||||
snap_index = {row["id"]: idx for idx, row in enumerate(snap_rows)}
|
||||
inv_rows = conn.execute(
|
||||
"SELECT snapshot_id, item_key, qty FROM inventory_snapshots"
|
||||
).fetchall()
|
||||
series: dict[str, list[int]] = {}
|
||||
n = len(snap_rows)
|
||||
for row in inv_rows:
|
||||
key = row["item_key"]
|
||||
if key not in series:
|
||||
series[key] = [0] * n
|
||||
idx = snap_index.get(row["snapshot_id"])
|
||||
if idx is not None:
|
||||
series[key][idx] = row["qty"]
|
||||
rates: dict[str, float] = {}
|
||||
for key, values in series.items():
|
||||
deltas = [values[i] - values[i - 1] for i in range(1, len(values))]
|
||||
positive = [d for d in deltas if d > 0]
|
||||
if positive:
|
||||
rates[key] = sum(positive) / len(positive)
|
||||
return rates
|
||||
|
||||
|
||||
def _fetch_goals_with_qty(conn: sqlite3.Connection, snapshot_id: int | None) -> list[dict[str, Any]]:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT go.id, go.group_id, gg.name AS group_name, go.item_key, go.item_name,
|
||||
go.category, go.target_qty, go.created_at, go.completed_at
|
||||
SELECT go.id, go.group_id, gg.name AS group_name, go.goal_type, go.mode,
|
||||
go.item_key, go.item_name, go.category, go.target_qty, go.baseline_qty,
|
||||
go.created_at, go.completed_at
|
||||
FROM goals go
|
||||
LEFT JOIN goal_groups gg ON gg.id = go.group_id
|
||||
ORDER BY go.created_at ASC, go.id ASC
|
||||
"""
|
||||
).fetchall()
|
||||
rates = _goal_item_rates(conn) if snapshot_id else {}
|
||||
goals = []
|
||||
for row in rows:
|
||||
current_qty = _inventory_qty_for_snapshot(conn, snapshot_id, row["item_key"]) if snapshot_id else 0
|
||||
goals.append(_goal_row_to_dict(row, current_qty))
|
||||
goal_type = row["goal_type"] if row["goal_type"] else "item"
|
||||
rate = None
|
||||
if goal_type == "item":
|
||||
current_item_qty = _inventory_qty_for_snapshot(conn, snapshot_id, row["item_key"]) if snapshot_id else 0
|
||||
current_skill_level = 0
|
||||
rate = rates.get(row["item_key"])
|
||||
else:
|
||||
skill_key = _skill_key_from_storage(row["item_key"]) or ""
|
||||
current_item_qty = 0
|
||||
current_skill_level = _skill_level_for_snapshot(conn, snapshot_id, skill_key) if snapshot_id else 0
|
||||
goals.append(_goal_row_to_dict(
|
||||
row,
|
||||
current_item_qty=current_item_qty,
|
||||
current_skill_level=current_skill_level,
|
||||
rate_per_snapshot=rate,
|
||||
))
|
||||
return goals
|
||||
|
||||
|
||||
@@ -520,6 +702,16 @@ def list_goals_structured(db_path: Path | str = DEFAULT_DB) -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def goals_overview(db_path: Path | str = DEFAULT_DB) -> dict[str, int]:
|
||||
structured = list_goals_structured(db_path)
|
||||
all_goals = structured["ungrouped"] + [
|
||||
g for group in structured["groups"] for g in group.get("goals", [])
|
||||
]
|
||||
open_count = sum(1 for g in all_goals if not g["completed_at"])
|
||||
done_count = sum(1 for g in all_goals if g["completed_at"])
|
||||
return {"open": open_count, "completed": done_count, "total": len(all_goals)}
|
||||
|
||||
|
||||
def _resolve_item_from_latest(
|
||||
conn: sqlite3.Connection, item_key: str
|
||||
) -> dict[str, str] | None:
|
||||
@@ -546,10 +738,13 @@ def create_goal(
|
||||
item_key: str,
|
||||
target_qty: int,
|
||||
group_id: int | None = None,
|
||||
mode: str = "absolute",
|
||||
db_path: Path | str = DEFAULT_DB,
|
||||
) -> dict[str, Any]:
|
||||
if target_qty <= 0:
|
||||
raise ValueError("Target quantity must be positive")
|
||||
if mode not in ("absolute", "relative"):
|
||||
raise ValueError("Invalid goal mode")
|
||||
|
||||
conn = get_connection(db_path)
|
||||
init_db(conn)
|
||||
@@ -570,22 +765,28 @@ def create_goal(
|
||||
conn.close()
|
||||
raise ValueError("Item not found in current inventory")
|
||||
|
||||
baseline = item["qty"] if mode == "relative" else None
|
||||
now = _utc_now_iso()
|
||||
completed_at = now if item["qty"] >= target_qty else None
|
||||
completed = _goal_is_complete(
|
||||
"item", mode, target_qty, baseline, current_item_qty=item["qty"]
|
||||
)
|
||||
completed_at = now if completed else None
|
||||
cur = conn.execute(
|
||||
"""
|
||||
INSERT INTO goals (group_id, item_key, item_name, category, target_qty, created_at, completed_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO goals
|
||||
(group_id, goal_type, mode, item_key, item_name, category, target_qty, baseline_qty, created_at, completed_at)
|
||||
VALUES (?, 'item', ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(group_id, item_key, item["item_name"], item["category"], target_qty, now, completed_at),
|
||||
(group_id, mode, item_key, item["item_name"], item["category"], target_qty, baseline, now, completed_at),
|
||||
)
|
||||
goal_id = cur.lastrowid
|
||||
conn.commit()
|
||||
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT go.id, go.group_id, gg.name AS group_name, go.item_key, go.item_name,
|
||||
go.category, go.target_qty, go.created_at, go.completed_at
|
||||
SELECT go.id, go.group_id, gg.name AS group_name, go.goal_type, go.mode,
|
||||
go.item_key, go.item_name, go.category, go.target_qty, go.baseline_qty,
|
||||
go.created_at, go.completed_at
|
||||
FROM goals go
|
||||
LEFT JOIN goal_groups gg ON gg.id = go.group_id
|
||||
WHERE go.id = ?
|
||||
@@ -593,7 +794,80 @@ def create_goal(
|
||||
(goal_id,),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
return _goal_row_to_dict(row, item["qty"])
|
||||
return _goal_row_to_dict(row, current_item_qty=item["qty"])
|
||||
|
||||
|
||||
def create_skill_goal(
|
||||
skill_key: str,
|
||||
target_level: int,
|
||||
group_id: int | None = None,
|
||||
mode: str = "absolute",
|
||||
db_path: Path | str = DEFAULT_DB,
|
||||
) -> dict[str, Any]:
|
||||
if target_level <= 0:
|
||||
raise ValueError("Target level must be positive")
|
||||
if mode not in ("absolute", "relative"):
|
||||
raise ValueError("Invalid goal mode")
|
||||
|
||||
conn = get_connection(db_path)
|
||||
init_db(conn)
|
||||
storage_key = _skill_goal_storage_key(skill_key)
|
||||
|
||||
existing = conn.execute("SELECT id FROM goals WHERE item_key = ?", (storage_key,)).fetchone()
|
||||
if existing:
|
||||
conn.close()
|
||||
raise ValueError("A goal for this skill already exists")
|
||||
|
||||
if group_id is not None:
|
||||
group = conn.execute("SELECT id FROM goal_groups WHERE id = ?", (group_id,)).fetchone()
|
||||
if not group:
|
||||
conn.close()
|
||||
raise ValueError("Goal group not found")
|
||||
|
||||
snapshot_id = _latest_snapshot_id(conn)
|
||||
if not snapshot_id:
|
||||
conn.close()
|
||||
raise ValueError("No snapshots imported yet")
|
||||
|
||||
current_level = _skill_level_for_snapshot(conn, snapshot_id, skill_key)
|
||||
latest = get_latest_snapshot(conn=conn)
|
||||
skill_name = skill_key.replace("_", " ").title()
|
||||
if latest:
|
||||
for sk in latest.get("skills", []):
|
||||
if sk["key"] == skill_key:
|
||||
skill_name = sk["name"]
|
||||
break
|
||||
|
||||
baseline = current_level if mode == "relative" else None
|
||||
now = _utc_now_iso()
|
||||
completed = _goal_is_complete(
|
||||
"skill", mode, target_level, baseline, current_skill_level=current_level
|
||||
)
|
||||
completed_at = now if completed else None
|
||||
cur = conn.execute(
|
||||
"""
|
||||
INSERT INTO goals
|
||||
(group_id, goal_type, mode, item_key, item_name, category, target_qty, baseline_qty, created_at, completed_at)
|
||||
VALUES (?, 'skill', ?, ?, ?, 'Skill', ?, ?, ?, ?)
|
||||
""",
|
||||
(group_id, mode, storage_key, skill_name, target_level, baseline, now, completed_at),
|
||||
)
|
||||
goal_id = cur.lastrowid
|
||||
conn.commit()
|
||||
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT go.id, go.group_id, gg.name AS group_name, go.goal_type, go.mode,
|
||||
go.item_key, go.item_name, go.category, go.target_qty, go.baseline_qty,
|
||||
go.created_at, go.completed_at
|
||||
FROM goals go
|
||||
LEFT JOIN goal_groups gg ON gg.id = go.group_id
|
||||
WHERE go.id = ?
|
||||
""",
|
||||
(goal_id,),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
return _goal_row_to_dict(row, current_skill_level=current_level)
|
||||
|
||||
|
||||
def delete_goal(goal_id: int, db_path: Path | str = DEFAULT_DB) -> bool:
|
||||
@@ -618,8 +892,8 @@ def check_goals_after_import(
|
||||
|
||||
open_goals = conn.execute(
|
||||
"""
|
||||
SELECT go.id, go.group_id, gg.name AS group_name, go.item_key, go.item_name,
|
||||
go.target_qty
|
||||
SELECT go.id, go.group_id, gg.name AS group_name, go.goal_type, go.mode,
|
||||
go.item_key, go.item_name, go.target_qty, go.baseline_qty
|
||||
FROM goals go
|
||||
LEFT JOIN goal_groups gg ON gg.id = go.group_id
|
||||
WHERE go.completed_at IS NULL
|
||||
@@ -631,24 +905,42 @@ def check_goals_after_import(
|
||||
groups_touched: set[int] = set()
|
||||
|
||||
for goal in open_goals:
|
||||
current_qty = _inventory_qty_for_snapshot(conn, snapshot_id, goal["item_key"])
|
||||
if current_qty >= goal["target_qty"]:
|
||||
conn.execute(
|
||||
"UPDATE goals SET completed_at = ? WHERE id = ?",
|
||||
(now, goal["id"]),
|
||||
)
|
||||
entry = {
|
||||
"id": goal["id"],
|
||||
"item_key": goal["item_key"],
|
||||
"item_name": goal["item_name"],
|
||||
"target_qty": goal["target_qty"],
|
||||
"current_qty": current_qty,
|
||||
"group_id": goal["group_id"],
|
||||
"group_name": goal["group_name"],
|
||||
}
|
||||
goals_completed.append(entry)
|
||||
if goal["group_id"] is not None:
|
||||
groups_touched.add(goal["group_id"])
|
||||
goal_type = goal["goal_type"] if goal["goal_type"] else "item"
|
||||
mode = goal["mode"] if goal["mode"] else "absolute"
|
||||
if goal_type == "skill":
|
||||
skill_key = _skill_key_from_storage(goal["item_key"]) or ""
|
||||
current_item_qty = 0
|
||||
current_skill_level = _skill_level_for_snapshot(conn, snapshot_id, skill_key)
|
||||
current_display = current_skill_level
|
||||
else:
|
||||
current_item_qty = _inventory_qty_for_snapshot(conn, snapshot_id, goal["item_key"])
|
||||
current_skill_level = 0
|
||||
current_display = current_item_qty
|
||||
|
||||
if not _goal_is_complete(
|
||||
goal_type, mode, goal["target_qty"], goal["baseline_qty"],
|
||||
current_item_qty=current_item_qty,
|
||||
current_skill_level=current_skill_level,
|
||||
):
|
||||
continue
|
||||
|
||||
conn.execute(
|
||||
"UPDATE goals SET completed_at = ? WHERE id = ?",
|
||||
(now, goal["id"]),
|
||||
)
|
||||
entry = {
|
||||
"id": goal["id"],
|
||||
"goal_type": goal_type,
|
||||
"item_key": goal["item_key"],
|
||||
"item_name": goal["item_name"],
|
||||
"target_qty": goal["target_qty"],
|
||||
"current_qty": current_display,
|
||||
"group_id": goal["group_id"],
|
||||
"group_name": goal["group_name"],
|
||||
}
|
||||
goals_completed.append(entry)
|
||||
if goal["group_id"] is not None:
|
||||
groups_touched.add(goal["group_id"])
|
||||
|
||||
groups_completed: list[dict[str, Any]] = []
|
||||
for gid in groups_touched:
|
||||
@@ -673,3 +965,121 @@ def check_goals_after_import(
|
||||
"goals_completed": goals_completed,
|
||||
"groups_completed": groups_completed,
|
||||
}
|
||||
|
||||
|
||||
def summarize_import_changes(
|
||||
snapshot_id: int,
|
||||
conn: sqlite3.Connection | None = None,
|
||||
db_path: Path | str = DEFAULT_DB,
|
||||
) -> dict[str, Any]:
|
||||
own_conn = conn is None
|
||||
if own_conn:
|
||||
conn = get_connection(db_path)
|
||||
init_db(conn)
|
||||
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT id FROM snapshots
|
||||
WHERE id < ?
|
||||
ORDER BY exported_at DESC, id DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
(snapshot_id,),
|
||||
).fetchone()
|
||||
|
||||
if not rows:
|
||||
if own_conn:
|
||||
conn.close()
|
||||
return {"has_previous": False}
|
||||
|
||||
older_id = rows["id"]
|
||||
try:
|
||||
diff = diff_snapshots(older_id, snapshot_id, conn=conn, db_path=db_path)
|
||||
except ValueError:
|
||||
if own_conn:
|
||||
conn.close()
|
||||
return {"has_previous": False}
|
||||
|
||||
older_data = get_snapshot(older_id, conn=conn, db_path=db_path) or {}
|
||||
newer_data = get_snapshot(snapshot_id, conn=conn, db_path=db_path) or {}
|
||||
|
||||
quests_completed = 0
|
||||
old_story = {q.get("name"): q.get("completed") for q in (older_data.get("quests") or {}).get("story", [])}
|
||||
for q in (newer_data.get("quests") or {}).get("story", []):
|
||||
name = q.get("name")
|
||||
if name and q.get("completed") and not old_story.get(name):
|
||||
quests_completed += 1
|
||||
|
||||
slayer_delta = 0
|
||||
old_slayer = (older_data.get("combat") or {}).get("slayer_task") or {}
|
||||
new_slayer = (newer_data.get("combat") or {}).get("slayer_task") or {}
|
||||
if old_slayer.get("display_name") == new_slayer.get("display_name"):
|
||||
slayer_delta = (new_slayer.get("kills_completed") or 0) - (old_slayer.get("kills_completed") or 0)
|
||||
|
||||
dungeon_delta = 0
|
||||
old_runs = (older_data.get("combat") or {}).get("dungeon_runs") or {}
|
||||
new_runs = (newer_data.get("combat") or {}).get("dungeon_runs") or {}
|
||||
for key, val in new_runs.items():
|
||||
dungeon_delta += max(0, val - old_runs.get(key, 0))
|
||||
|
||||
if own_conn:
|
||||
conn.close()
|
||||
|
||||
return {
|
||||
"has_previous": True,
|
||||
"older_id": older_id,
|
||||
"newer_id": snapshot_id,
|
||||
"coins_delta": diff["summary"]["coins_delta"],
|
||||
"total_level_delta": diff["summary"]["total_level_delta"],
|
||||
"inventory_changes": len(diff["inventory_changes"]),
|
||||
"skill_changes": len(diff["skill_changes"]),
|
||||
"quests_completed": quests_completed,
|
||||
"slayer_kills_delta": max(0, slayer_delta),
|
||||
"dungeon_runs_delta": dungeon_delta,
|
||||
"top_inventory": diff["inventory_changes"][:5],
|
||||
"top_skills": diff["skill_changes"][:5],
|
||||
}
|
||||
|
||||
|
||||
def skill_timeline(db_path: Path | str = DEFAULT_DB) -> dict[str, Any]:
|
||||
"""Per-skill level series aligned to snapshots (oldest → newest)."""
|
||||
conn = get_connection(db_path)
|
||||
init_db(conn)
|
||||
snap_rows = conn.execute(
|
||||
"""
|
||||
SELECT id, exported_at FROM snapshots
|
||||
ORDER BY exported_at ASC, id ASC
|
||||
"""
|
||||
).fetchall()
|
||||
snapshots = [dict(r) for r in snap_rows]
|
||||
if not snapshots:
|
||||
conn.close()
|
||||
return {"snapshots": [], "series": {}}
|
||||
|
||||
snap_index = {row["id"]: idx for idx, row in enumerate(snapshots)}
|
||||
skill_rows = conn.execute(
|
||||
"SELECT snapshot_id, skill_key, level FROM skill_snapshots"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
|
||||
series: dict[str, list[int]] = {}
|
||||
n = len(snapshots)
|
||||
for row in skill_rows:
|
||||
key = row["skill_key"]
|
||||
if key not in series:
|
||||
series[key] = [0] * n
|
||||
idx = snap_index.get(row["snapshot_id"])
|
||||
if idx is not None:
|
||||
series[key][idx] = row["level"]
|
||||
|
||||
return {"snapshots": snapshots, "series": series}
|
||||
|
||||
|
||||
def delete_snapshot(snapshot_id: int, db_path: Path | str = DEFAULT_DB) -> bool:
|
||||
conn = get_connection(db_path)
|
||||
init_db(conn)
|
||||
cur = conn.execute("DELETE FROM snapshots WHERE id = ?", (snapshot_id,))
|
||||
conn.commit()
|
||||
deleted = cur.rowcount > 0
|
||||
conn.close()
|
||||
return deleted
|
||||
|
||||
Reference in New Issue
Block a user