Files
Idle-Fantasy-Save-Viewer/app.py
T
elpatron f2c564e69a Translate README to English and fix DATA_DIR resolution.
Read get_data_dir from the environment on each request so CLI and server always use the same viewer database path.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-19 16:16:24 +02:00

239 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Idle Fantasy Save Viewer Flask server with per-viewer secret URLs."""
from __future__ import annotations
import argparse
import os
import sys
import webbrowser
from pathlib import Path
from flask import Blueprint, Flask, abort, jsonify, render_template, request
from werkzeug.utils import secure_filename
from db import (
DEFAULT_DB,
diff_snapshots,
get_latest_snapshot,
get_snapshot,
import_save,
init_db,
list_snapshots,
get_connection,
timeline,
)
from viewers import (
LOCAL_VIEWER_ID,
create_viewer,
ensure_local_viewer,
is_valid_viewer_id,
viewer_db_path,
)
app = Flask(__name__)
DATA_DIR = Path(os.environ.get("DATA_DIR", Path(__file__).parent / "data"))
DB_PATH = DEFAULT_DB
def get_data_dir() -> Path:
return Path(os.environ.get("DATA_DIR", Path(__file__).parent / "data"))
def _viewer_url(viewer_id: str) -> str:
base = request.host_url.rstrip("/")
return f"{base}/v/{viewer_id}/"
def _resolve_viewer_db(viewer_id: str) -> Path:
if not is_valid_viewer_id(viewer_id):
abort(404)
db_path = viewer_db_path(viewer_id, get_data_dir())
if not db_path.exists():
abort(404)
return db_path
viewer_bp = Blueprint("viewer", __name__, url_prefix="/v/<viewer_id>")
@viewer_bp.route("/")
def viewer_index(viewer_id: str):
_resolve_viewer_db(viewer_id)
return render_template("index.html", viewer_id=viewer_id)
@viewer_bp.route("/api/snapshot/latest")
def api_latest(viewer_id: str):
db_path = _resolve_viewer_db(viewer_id)
data = get_latest_snapshot(db_path=db_path)
if not data:
return jsonify({"error": "No snapshots imported yet"}), 404
return jsonify(data)
@viewer_bp.route("/api/snapshot/<int:snapshot_id>")
def api_snapshot(viewer_id: str, snapshot_id: int):
db_path = _resolve_viewer_db(viewer_id)
data = get_snapshot(snapshot_id, db_path=db_path)
if not data:
return jsonify({"error": "Snapshot not found"}), 404
return jsonify(data)
@viewer_bp.route("/api/snapshots")
def api_snapshots(viewer_id: str):
db_path = _resolve_viewer_db(viewer_id)
return jsonify(list_snapshots(db_path=db_path))
@viewer_bp.route("/api/snapshots/<int:older_id>/diff/<int:newer_id>")
def api_diff(viewer_id: str, older_id: int, newer_id: int):
db_path = _resolve_viewer_db(viewer_id)
try:
return jsonify(diff_snapshots(older_id, newer_id, db_path=db_path))
except ValueError as e:
return jsonify({"error": str(e)}), 404
@viewer_bp.route("/api/timeline")
def api_timeline(viewer_id: str):
db_path = _resolve_viewer_db(viewer_id)
return jsonify(timeline(db_path=db_path))
@viewer_bp.route("/api/import", methods=["POST"])
def api_import(viewer_id: str):
db_path = _resolve_viewer_db(viewer_id)
if "file" in request.files:
f = request.files["file"]
if not f.filename:
return jsonify({"error": "No file selected"}), 400
upload_dir = get_data_dir() / "uploads"
upload_dir.mkdir(parents=True, exist_ok=True)
safe_name = secure_filename(f.filename) or "upload.json"
tmp = upload_dir / f"_upload_{viewer_id}_{safe_name}"
f.save(tmp)
try:
result = import_save(tmp, db_path=db_path)
finally:
tmp.unlink(missing_ok=True)
if result.get("error"):
return jsonify(result), 422
return jsonify(result)
body = request.get_json(silent=True) or {}
path = body.get("path")
if not path:
return jsonify({"error": "Provide file upload or JSON body with path"}), 400
path = Path(path)
if not path.exists():
return jsonify({"error": f"File not found: {path}"}), 404
result = import_save(path, db_path=db_path)
if result.get("error"):
return jsonify(result), 422
return jsonify(result)
app.register_blueprint(viewer_bp)
@app.route("/")
def landing():
return render_template("landing.html")
@app.post("/api/viewers")
def api_create_viewer():
viewer_id = create_viewer(get_data_dir())
return jsonify({
"viewer_id": viewer_id,
"url": _viewer_url(viewer_id),
}), 201
def _print_import_report(result: dict) -> None:
report = result.get("import_report") or []
if not report:
return
for item in report:
level = item.get("level", "info").upper()
print(f" [{level}] {item.get('message')}", file=sys.stderr)
def main() -> int:
parser = argparse.ArgumentParser(description="Idle Fantasy Save Viewer")
parser.add_argument("save_file", nargs="?", help="Save JSON to import on start")
parser.add_argument("--import", dest="import_file", metavar="FILE", help="Import save without starting server")
parser.add_argument("--port", type=int, default=5000)
parser.add_argument("--host", default="127.0.0.1", help="Bind host (use 0.0.0.0 in Docker)")
parser.add_argument("--no-browser", action="store_true")
parser.add_argument("--db", type=Path, help="SQLite path (legacy single-file mode)")
parser.add_argument("--viewer", default=LOCAL_VIEWER_ID, help="Viewer id for CLI (default: local)")
args = parser.parse_args()
global DATA_DIR, DB_PATH
DATA_DIR = Path(os.environ.get("DATA_DIR", Path(__file__).parent / "data"))
if args.db:
DB_PATH = args.db
db_path = DB_PATH
viewer_id = None
else:
viewer_id = ensure_local_viewer(DATA_DIR) if args.viewer == LOCAL_VIEWER_ID else args.viewer
if not is_valid_viewer_id(viewer_id):
print(f"Error: invalid viewer id: {viewer_id}", file=sys.stderr)
return 1
db_path = viewer_db_path(viewer_id, DATA_DIR)
if not db_path.exists():
conn = get_connection(db_path)
init_db(conn)
conn.close()
conn = get_connection(db_path)
init_db(conn)
conn.close()
import_path = args.import_file or args.save_file
if import_path:
path = Path(import_path)
if not path.exists():
print(f"Error: file not found: {path}", file=sys.stderr)
return 1
result = import_save(path, db_path=db_path)
if result.get("error"):
print(f"Import failed: {result['error']}", file=sys.stderr)
_print_import_report(result)
return 1
if result.get("imported"):
print(f"Imported snapshot #{result['snapshot_id']} from {path.name}")
summary = result.get("import_summary") or {}
if summary.get("warnings") or summary.get("infos"):
print(
f"Notes: {summary.get('warnings', 0)} warning(s), "
f"{summary.get('infos', 0)} info(s)",
file=sys.stderr,
)
_print_import_report(result)
else:
print(f"Skipped duplicate: {path.name} (snapshot #{result['snapshot_id']})")
if args.import_file and not args.save_file:
return 0
if viewer_id:
url = f"http://{args.host}:{args.port}/v/{viewer_id}/"
else:
url = f"http://{args.host}:{args.port}/"
print(f"Starting server at {url}")
if not args.no_browser and args.host in ("127.0.0.1", "localhost"):
webbrowser.open(url)
app.run(host=args.host, port=args.port, debug=False)
return 0
if __name__ == "__main__":
sys.exit(main())