Files
capitalism-eats-the-world/app/routes/api.py

383 lines
13 KiB
Python

"""
API Routes for Simulation Control
Provides REST API endpoints for controlling simulations, retrieving data,
and managing simulation parameters with real-time updates via SocketIO.
"""
from flask import Blueprint, request, jsonify, current_app
from flask_socketio import emit, join_room, leave_room
import threading
import time
from typing import Optional
from app import socketio
from app.models import SimulationManager, SimulationParameters
from app.routes.main import simulation_manager
api_bp = Blueprint('api', __name__)
@api_bp.route('/simulation', methods=['POST'])
def create_simulation():
"""
Create a new simulation with specified parameters.
Request JSON:
{
"r_rate": 0.05,
"g_rate": 0.03,
"initial_capital": 1000,
"initial_consumption": 1000,
"num_agents": 100,
"iterations": 1000
}
Returns:
JSON response with simulation ID and status
"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No JSON data provided'}), 400
# Validate and create parameters
params = SimulationParameters(
r_rate=float(data.get('r_rate', 0.05)),
g_rate=float(data.get('g_rate', 0.03)),
initial_capital=float(data.get('initial_capital', 1000)),
initial_consumption=float(data.get('initial_consumption', 1000)),
num_agents=int(data.get('num_agents', 100)),
iterations=int(data.get('iterations', 1000))
)
# Create simulation
simulation_id = simulation_manager.create_simulation(params)
return jsonify({
'simulation_id': simulation_id,
'status': 'created',
'parameters': {
'r_rate': params.r_rate,
'g_rate': params.g_rate,
'initial_capital': params.initial_capital,
'initial_consumption': params.initial_consumption,
'num_agents': params.num_agents,
'iterations': params.iterations
}
}), 201
except ValueError as e:
return jsonify({'error': f'Invalid parameter: {str(e)}'}), 400
except Exception as e:
current_app.logger.error(f"Error creating simulation: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@api_bp.route('/simulation/<simulation_id>/start', methods=['POST'])
def start_simulation(simulation_id: str):
"""
Start running a simulation.
Args:
simulation_id: Unique simulation identifier
Returns:
JSON response indicating success or failure
"""
try:
simulation = simulation_manager.get_simulation(simulation_id)
if not simulation:
return jsonify({'error': 'Simulation not found'}), 404
if simulation.is_running:
return jsonify({'error': 'Simulation already running'}), 400
# Start simulation in background thread
def run_simulation_background():
try:
# Run simulation with progress updates
total_iterations = simulation.parameters.iterations
for i in range(total_iterations):
if not simulation.is_running:
break
snapshot = simulation.step()
# Emit progress update every 10 iterations or at milestones
if i % 10 == 0 or i == total_iterations - 1:
progress_data = {
'simulation_id': simulation_id,
'iteration': snapshot.iteration,
'total_iterations': total_iterations,
'progress_percentage': (snapshot.iteration / total_iterations) * 100,
'total_wealth': snapshot.total_wealth,
'gini_coefficient': snapshot.gini_coefficient,
'capital_share': snapshot.capital_share,
'wealth_concentration_top10': snapshot.wealth_concentration_top10
}
socketio.emit('simulation_progress', progress_data,
room=f'simulation_{simulation_id}')
# Small delay to allow real-time visualization
time.sleep(0.01)
# Emit completion
socketio.emit('simulation_complete', {
'simulation_id': simulation_id,
'total_snapshots': len(simulation.snapshots)
}, room=f'simulation_{simulation_id}')
except Exception as e:
current_app.logger.error(f"Error in simulation thread: {str(e)}")
socketio.emit('simulation_error', {
'simulation_id': simulation_id,
'error': str(e)
}, room=f'simulation_{simulation_id}')
# Start background thread
thread = threading.Thread(target=run_simulation_background)
thread.daemon = True
thread.start()
return jsonify({
'simulation_id': simulation_id,
'status': 'started',
'message': 'Simulation started successfully'
})
except Exception as e:
current_app.logger.error(f"Error starting simulation: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@api_bp.route('/simulation/<simulation_id>/stop', methods=['POST'])
def stop_simulation(simulation_id: str):
"""
Stop a running simulation.
Args:
simulation_id: Unique simulation identifier
Returns:
JSON response indicating success or failure
"""
try:
success = simulation_manager.stop_simulation(simulation_id)
if not success:
return jsonify({'error': 'Simulation not found or not running'}), 404
# Emit stop event
socketio.emit('simulation_stopped', {
'simulation_id': simulation_id
}, room=f'simulation_{simulation_id}')
return jsonify({
'simulation_id': simulation_id,
'status': 'stopped',
'message': 'Simulation stopped successfully'
})
except Exception as e:
current_app.logger.error(f"Error stopping simulation: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@api_bp.route('/simulation/<simulation_id>/data', methods=['GET'])
def get_simulation_data(simulation_id: str):
"""
Get current simulation data and snapshots.
Args:
simulation_id: Unique simulation identifier
Returns:
JSON response with simulation data
"""
try:
simulation = simulation_manager.get_simulation(simulation_id)
if not simulation:
return jsonify({'error': 'Simulation not found'}), 404
# Get specific iteration if requested
iteration = request.args.get('iteration', type=int)
if iteration is not None:
snapshot = simulation.get_snapshot_at_iteration(iteration)
if not snapshot:
return jsonify({'error': f'No data for iteration {iteration}'}), 404
return jsonify({
'simulation_id': simulation_id,
'snapshot': {
'iteration': snapshot.iteration,
'total_wealth': snapshot.total_wealth,
'gini_coefficient': snapshot.gini_coefficient,
'wealth_concentration_top10': snapshot.wealth_concentration_top10,
'capital_share': snapshot.capital_share,
'average_wealth': snapshot.average_wealth,
'median_wealth': snapshot.median_wealth
}
})
# Get latest snapshot and evolution data
latest_snapshot = simulation.get_latest_snapshot()
iterations, total_wealth, gini_coefficients = simulation.get_wealth_evolution()
response_data = {
'simulation_id': simulation_id,
'is_running': simulation.is_running,
'current_iteration': simulation.current_iteration,
'total_snapshots': len(simulation.snapshots),
'parameters': {
'r_rate': simulation.parameters.r_rate,
'g_rate': simulation.parameters.g_rate,
'num_agents': simulation.parameters.num_agents,
'iterations': simulation.parameters.iterations
}
}
if latest_snapshot:
response_data['latest_snapshot'] = {
'iteration': latest_snapshot.iteration,
'total_wealth': latest_snapshot.total_wealth,
'gini_coefficient': latest_snapshot.gini_coefficient,
'wealth_concentration_top10': latest_snapshot.wealth_concentration_top10,
'capital_share': latest_snapshot.capital_share,
'average_wealth': latest_snapshot.average_wealth,
'median_wealth': latest_snapshot.median_wealth
}
# Include evolution data if requested
if request.args.get('include_evolution', '').lower() == 'true':
response_data['evolution'] = {
'iterations': iterations,
'total_wealth': total_wealth,
'gini_coefficients': gini_coefficients
}
return jsonify(response_data)
except Exception as e:
current_app.logger.error(f"Error getting simulation data: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@api_bp.route('/simulation/<simulation_id>/export/<format_type>', methods=['GET'])
def export_simulation_data(simulation_id: str, format_type: str):
"""
Export simulation data in specified format.
Args:
simulation_id: Unique simulation identifier
format_type: Export format ('json' or 'csv')
Returns:
Data in requested format
"""
try:
simulation = simulation_manager.get_simulation(simulation_id)
if not simulation:
return jsonify({'error': 'Simulation not found'}), 404
if format_type.lower() not in ['json', 'csv']:
return jsonify({'error': 'Format must be json or csv'}), 400
exported_data = simulation.export_data(format_type)
if format_type.lower() == 'json':
return jsonify({'data': exported_data})
else: # CSV
return exported_data, 200, {
'Content-Type': 'text/csv',
'Content-Disposition': f'attachment; filename=simulation_{simulation_id[:8]}.csv'
}
except Exception as e:
current_app.logger.error(f"Error exporting simulation data: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@api_bp.route('/simulation/<simulation_id>', methods=['DELETE'])
def delete_simulation(simulation_id: str):
"""
Delete a simulation.
Args:
simulation_id: Unique simulation identifier
Returns:
JSON response indicating success or failure
"""
try:
success = simulation_manager.delete_simulation(simulation_id)
if not success:
return jsonify({'error': 'Simulation not found'}), 404
return jsonify({
'simulation_id': simulation_id,
'status': 'deleted',
'message': 'Simulation deleted successfully'
})
except Exception as e:
current_app.logger.error(f"Error deleting simulation: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@api_bp.route('/simulations', methods=['GET'])
def list_simulations():
"""
List all simulations with their status.
Returns:
JSON response with list of simulation information
"""
try:
simulations_info = simulation_manager.list_simulations()
return jsonify({
'simulations': simulations_info,
'total_count': len(simulations_info)
})
except Exception as e:
current_app.logger.error(f"Error listing simulations: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
# SocketIO event handlers
@socketio.on('join_simulation')
def on_join_simulation(data):
"""Handle client joining simulation room for real-time updates."""
simulation_id = data.get('simulation_id')
if simulation_id:
join_room(f'simulation_{simulation_id}')
emit('joined_simulation', {'simulation_id': simulation_id})
@socketio.on('leave_simulation')
def on_leave_simulation(data):
"""Handle client leaving simulation room."""
simulation_id = data.get('simulation_id')
if simulation_id:
leave_room(f'simulation_{simulation_id}')
emit('left_simulation', {'simulation_id': simulation_id})
@socketio.on('connect')
def on_connect():
"""Handle client connection."""
emit('connected', {'status': 'connected'})
@socketio.on('disconnect')
def on_disconnect():
"""Handle client disconnection."""
print('Client disconnected')