- Fix circular import issue by moving simulation_manager to app/__init__.py - Enhance get_wealth_evolution to include inequality metrics data - Add get_inequality_evolution method for complete chart data - Update API to return top10_shares and capital_shares in evolution data - Modify onSimulationComplete to fetch and populate charts with complete data - Fix simulation threading to properly mark completion state - Add test script to verify chart data generation The charts now properly display simulation results by fetching complete evolution data when simulation completes, resolving the empty diagrams issue.
499 lines
17 KiB
Python
499 lines
17 KiB
Python
"""
|
|
Economic Simulation Engine
|
|
|
|
This module implements the main simulation engine that orchestrates multiple
|
|
economic agents running Markov chains to demonstrate wealth inequality dynamics
|
|
when capital return rate (r) exceeds economic growth rate (g).
|
|
"""
|
|
|
|
import numpy as np
|
|
import uuid
|
|
from typing import List, Dict, Tuple, Optional
|
|
from dataclasses import dataclass
|
|
import time
|
|
import json
|
|
|
|
from .markov_chain import EconomicAgent
|
|
|
|
|
|
@dataclass
|
|
class SimulationParameters:
|
|
"""Configuration parameters for economic simulation."""
|
|
r_rate: float # Capital return rate
|
|
g_rate: float # Economic growth rate
|
|
initial_capital: float = 1000.0
|
|
initial_consumption: float = 1000.0
|
|
num_agents: int = 100
|
|
iterations: int = 1000
|
|
|
|
def __post_init__(self):
|
|
"""Validate parameters after initialization."""
|
|
if self.r_rate < 0 or self.r_rate > 1:
|
|
raise ValueError("Capital rate must be between 0 and 1")
|
|
if self.g_rate < 0 or self.g_rate > 1:
|
|
raise ValueError("Growth rate must be between 0 and 1")
|
|
if self.num_agents < 1 or self.num_agents > 10000:
|
|
raise ValueError("Number of agents must be between 1 and 10000")
|
|
if self.iterations < 1 or self.iterations > 100000:
|
|
raise ValueError("Iterations must be between 1 and 100000")
|
|
|
|
|
|
@dataclass
|
|
class SimulationSnapshot:
|
|
"""Snapshot of simulation state at a specific iteration."""
|
|
iteration: int
|
|
timestamp: float
|
|
wealth_distribution: List[float]
|
|
consumption_distribution: List[float]
|
|
total_wealth: float
|
|
gini_coefficient: float
|
|
wealth_concentration_top10: float
|
|
capital_share: float
|
|
average_wealth: float
|
|
median_wealth: float
|
|
|
|
|
|
class EconomicSimulation:
|
|
"""
|
|
Main simulation engine for demonstrating Piketty's inequality principle.
|
|
|
|
Manages multiple economic agents and tracks wealth distribution over time
|
|
to show how r > g leads to increasing inequality.
|
|
"""
|
|
|
|
def __init__(self, parameters: SimulationParameters):
|
|
"""
|
|
Initialize the economic simulation.
|
|
|
|
Args:
|
|
parameters: Configuration parameters for the simulation
|
|
"""
|
|
self.parameters = parameters
|
|
self.simulation_id = str(uuid.uuid4())
|
|
self.agents: List[EconomicAgent] = []
|
|
self.snapshots: List[SimulationSnapshot] = []
|
|
self.current_iteration = 0
|
|
self.is_running = False
|
|
self.start_time = None
|
|
|
|
self._initialize_agents()
|
|
|
|
def _initialize_agents(self):
|
|
"""Create economic agents with specified parameters."""
|
|
self.agents = []
|
|
|
|
for i in range(self.parameters.num_agents):
|
|
agent_id = f"agent_{i:04d}"
|
|
|
|
# Add some randomness to initial conditions
|
|
initial_capital = self.parameters.initial_capital * (0.8 + 0.4 * np.random.random())
|
|
initial_consumption = self.parameters.initial_consumption * (0.8 + 0.4 * np.random.random())
|
|
|
|
agent = EconomicAgent(
|
|
agent_id=agent_id,
|
|
capital_rate=self.parameters.r_rate,
|
|
growth_rate=self.parameters.g_rate,
|
|
initial_capital=initial_capital,
|
|
initial_consumption=initial_consumption
|
|
)
|
|
|
|
self.agents.append(agent)
|
|
|
|
def step(self) -> SimulationSnapshot:
|
|
"""
|
|
Perform one simulation step for all agents.
|
|
|
|
Returns:
|
|
Snapshot of the current simulation state
|
|
"""
|
|
# Step all agents
|
|
for agent in self.agents:
|
|
agent.step()
|
|
|
|
# Create snapshot
|
|
snapshot = self._create_snapshot()
|
|
self.snapshots.append(snapshot)
|
|
self.current_iteration += 1
|
|
|
|
return snapshot
|
|
|
|
def run_simulation(self, iterations: Optional[int] = None) -> List[SimulationSnapshot]:
|
|
"""
|
|
Run the complete simulation for specified iterations.
|
|
|
|
Args:
|
|
iterations: Number of iterations to run (uses parameter default if None)
|
|
|
|
Returns:
|
|
List of snapshots for each iteration
|
|
"""
|
|
if iterations is None:
|
|
iterations = self.parameters.iterations
|
|
|
|
self.is_running = True
|
|
self.start_time = time.time()
|
|
|
|
try:
|
|
for _ in range(iterations):
|
|
if not self.is_running:
|
|
break
|
|
self.step()
|
|
finally:
|
|
self.is_running = False
|
|
|
|
return self.snapshots.copy()
|
|
|
|
def _create_snapshot(self) -> SimulationSnapshot:
|
|
"""Create a snapshot of current simulation state."""
|
|
# Collect wealth data
|
|
wealth_data = []
|
|
consumption_data = []
|
|
total_wealth_data = []
|
|
|
|
for agent in self.agents:
|
|
if agent.wealth_history and agent.consumption_history:
|
|
wealth = agent.wealth_history[-1]
|
|
consumption = agent.consumption_history[-1]
|
|
else:
|
|
wealth = agent.initial_capital
|
|
consumption = agent.initial_consumption
|
|
|
|
wealth_data.append(wealth)
|
|
consumption_data.append(consumption)
|
|
total_wealth_data.append(wealth + consumption)
|
|
|
|
# Calculate metrics
|
|
total_wealth = sum(total_wealth_data)
|
|
gini = self._calculate_gini_coefficient(total_wealth_data)
|
|
wealth_conc = self._calculate_wealth_concentration(total_wealth_data, 0.1)
|
|
capital_share = sum(wealth_data) / total_wealth if total_wealth > 0 else 0
|
|
|
|
return SimulationSnapshot(
|
|
iteration=self.current_iteration,
|
|
timestamp=time.time(),
|
|
wealth_distribution=wealth_data,
|
|
consumption_distribution=consumption_data,
|
|
total_wealth=total_wealth,
|
|
gini_coefficient=gini,
|
|
wealth_concentration_top10=wealth_conc,
|
|
capital_share=capital_share,
|
|
average_wealth=np.mean(total_wealth_data),
|
|
median_wealth=np.median(total_wealth_data)
|
|
)
|
|
|
|
def _calculate_gini_coefficient(self, wealth_data: List[float]) -> float:
|
|
"""
|
|
Calculate the Gini coefficient for wealth inequality.
|
|
|
|
Args:
|
|
wealth_data: List of wealth values
|
|
|
|
Returns:
|
|
Gini coefficient between 0 (perfect equality) and 1 (perfect inequality)
|
|
"""
|
|
if not wealth_data or len(wealth_data) < 2:
|
|
return 0.0
|
|
|
|
# Sort wealth data
|
|
sorted_wealth = sorted(wealth_data)
|
|
n = len(sorted_wealth)
|
|
|
|
# Calculate Gini coefficient
|
|
cumsum = np.cumsum(sorted_wealth)
|
|
total_wealth = cumsum[-1]
|
|
|
|
if total_wealth == 0:
|
|
return 0.0
|
|
|
|
# Gini coefficient formula
|
|
gini = (2 * sum((i + 1) * wealth for i, wealth in enumerate(sorted_wealth))) / (n * total_wealth) - (n + 1) / n
|
|
|
|
return max(0.0, min(1.0, gini))
|
|
|
|
def _calculate_wealth_concentration(self, wealth_data: List[float], top_percentage: float) -> float:
|
|
"""
|
|
Calculate the share of total wealth held by the top percentage of agents.
|
|
|
|
Args:
|
|
wealth_data: List of wealth values
|
|
top_percentage: Percentage of top agents (e.g., 0.1 for top 10%)
|
|
|
|
Returns:
|
|
Share of total wealth held by top agents
|
|
"""
|
|
if not wealth_data:
|
|
return 0.0
|
|
|
|
sorted_wealth = sorted(wealth_data, reverse=True)
|
|
total_wealth = sum(sorted_wealth)
|
|
|
|
if total_wealth == 0:
|
|
return 0.0
|
|
|
|
top_count = max(1, int(len(sorted_wealth) * top_percentage))
|
|
top_wealth = sum(sorted_wealth[:top_count])
|
|
|
|
return top_wealth / total_wealth
|
|
|
|
def get_latest_snapshot(self) -> Optional[SimulationSnapshot]:
|
|
"""Get the most recent simulation snapshot."""
|
|
return self.snapshots[-1] if self.snapshots else None
|
|
|
|
def get_snapshot_at_iteration(self, iteration: int) -> Optional[SimulationSnapshot]:
|
|
"""Get snapshot at specific iteration."""
|
|
for snapshot in self.snapshots:
|
|
if snapshot.iteration == iteration:
|
|
return snapshot
|
|
return None
|
|
|
|
def get_wealth_evolution(self) -> Tuple[List[int], List[float], List[float]]:
|
|
"""
|
|
Get wealth evolution data for plotting.
|
|
|
|
Returns:
|
|
Tuple of (iterations, total_wealth_over_time, gini_over_time)
|
|
"""
|
|
if not self.snapshots:
|
|
return [], [], []
|
|
|
|
iterations = [s.iteration for s in self.snapshots]
|
|
total_wealth = [s.total_wealth for s in self.snapshots]
|
|
gini_coefficients = [s.gini_coefficient for s in self.snapshots]
|
|
|
|
return iterations, total_wealth, gini_coefficients
|
|
|
|
def get_inequality_evolution(self) -> Tuple[List[int], List[float], List[float], List[float]]:
|
|
"""
|
|
Get inequality evolution data for plotting.
|
|
|
|
Returns:
|
|
Tuple of (iterations, gini_over_time, top10_share_over_time, capital_share_over_time)
|
|
"""
|
|
if not self.snapshots:
|
|
return [], [], [], []
|
|
|
|
iterations = [s.iteration for s in self.snapshots]
|
|
gini_coefficients = [s.gini_coefficient for s in self.snapshots]
|
|
top10_shares = [s.wealth_concentration_top10 for s in self.snapshots]
|
|
capital_shares = [s.capital_share for s in self.snapshots]
|
|
|
|
return iterations, gini_coefficients, top10_shares, capital_shares
|
|
|
|
def get_agent_wealth_distribution(self) -> Dict[str, List[float]]:
|
|
"""
|
|
Get current wealth distribution across all agents.
|
|
|
|
Returns:
|
|
Dictionary with agent IDs and their current wealth values
|
|
"""
|
|
distribution = {}
|
|
|
|
for agent in self.agents:
|
|
if agent.wealth_history:
|
|
distribution[agent.agent_id] = agent.wealth_history[-1]
|
|
else:
|
|
distribution[agent.agent_id] = agent.initial_capital
|
|
|
|
return distribution
|
|
|
|
def update_parameters(self, new_parameters: SimulationParameters):
|
|
"""
|
|
Update simulation parameters and restart with new settings.
|
|
|
|
Args:
|
|
new_parameters: New simulation configuration
|
|
"""
|
|
self.parameters = new_parameters
|
|
self.current_iteration = 0
|
|
self.snapshots.clear()
|
|
self._initialize_agents()
|
|
|
|
def stop_simulation(self):
|
|
"""Stop the running simulation."""
|
|
self.is_running = False
|
|
|
|
def reset_simulation(self):
|
|
"""Reset simulation to initial state."""
|
|
self.current_iteration = 0
|
|
self.snapshots.clear()
|
|
self.is_running = False
|
|
self._initialize_agents()
|
|
|
|
def export_data(self, format_type: str = 'json') -> str:
|
|
"""
|
|
Export simulation data in specified format.
|
|
|
|
Args:
|
|
format_type: Export format ('json' or 'csv')
|
|
|
|
Returns:
|
|
Formatted data string
|
|
"""
|
|
if format_type.lower() == 'json':
|
|
return self._export_json()
|
|
elif format_type.lower() == 'csv':
|
|
return self._export_csv()
|
|
else:
|
|
raise ValueError("Format must be 'json' or 'csv'")
|
|
|
|
def _export_json(self) -> str:
|
|
"""Export simulation data as JSON."""
|
|
data = {
|
|
'simulation_id': self.simulation_id,
|
|
'parameters': {
|
|
'r_rate': self.parameters.r_rate,
|
|
'g_rate': self.parameters.g_rate,
|
|
'initial_capital': self.parameters.initial_capital,
|
|
'initial_consumption': self.parameters.initial_consumption,
|
|
'num_agents': self.parameters.num_agents,
|
|
'iterations': self.parameters.iterations
|
|
},
|
|
'snapshots': []
|
|
}
|
|
|
|
for snapshot in self.snapshots:
|
|
snapshot_data = {
|
|
'iteration': snapshot.iteration,
|
|
'timestamp': snapshot.timestamp,
|
|
'total_wealth': snapshot.total_wealth,
|
|
'gini_coefficient': snapshot.gini_coefficient,
|
|
'wealth_concentration_top10': snapshot.wealth_concentration_top10,
|
|
'capital_share': snapshot.capital_share,
|
|
'average_wealth': snapshot.average_wealth,
|
|
'median_wealth': snapshot.median_wealth
|
|
}
|
|
data['snapshots'].append(snapshot_data)
|
|
|
|
return json.dumps(data, indent=2)
|
|
|
|
def _export_csv(self) -> str:
|
|
"""Export simulation data as CSV."""
|
|
if not self.snapshots:
|
|
return "iteration,total_wealth,gini_coefficient,wealth_concentration_top10,capital_share,average_wealth,median_wealth\n"
|
|
|
|
lines = ["iteration,total_wealth,gini_coefficient,wealth_concentration_top10,capital_share,average_wealth,median_wealth"]
|
|
|
|
for snapshot in self.snapshots:
|
|
line = f"{snapshot.iteration},{snapshot.total_wealth},{snapshot.gini_coefficient}," \
|
|
f"{snapshot.wealth_concentration_top10},{snapshot.capital_share}," \
|
|
f"{snapshot.average_wealth},{snapshot.median_wealth}"
|
|
lines.append(line)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
class SimulationManager:
|
|
"""
|
|
Manages multiple simulation instances for concurrent access.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the simulation manager."""
|
|
self.simulations: Dict[str, EconomicSimulation] = {}
|
|
self.active_simulations: Dict[str, EconomicSimulation] = {}
|
|
|
|
def create_simulation(self, parameters: SimulationParameters) -> str:
|
|
"""
|
|
Create a new simulation instance.
|
|
|
|
Args:
|
|
parameters: Simulation configuration
|
|
|
|
Returns:
|
|
Unique simulation ID
|
|
"""
|
|
simulation = EconomicSimulation(parameters)
|
|
simulation_id = simulation.simulation_id
|
|
|
|
self.simulations[simulation_id] = simulation
|
|
|
|
return simulation_id
|
|
|
|
def get_simulation(self, simulation_id: str) -> Optional[EconomicSimulation]:
|
|
"""
|
|
Get simulation instance by ID.
|
|
|
|
Args:
|
|
simulation_id: Unique simulation identifier
|
|
|
|
Returns:
|
|
Simulation instance or None if not found
|
|
"""
|
|
return self.simulations.get(simulation_id)
|
|
|
|
def start_simulation(self, simulation_id: str, iterations: Optional[int] = None) -> bool:
|
|
"""
|
|
Start running a simulation.
|
|
|
|
Args:
|
|
simulation_id: Unique simulation identifier
|
|
iterations: Number of iterations to run
|
|
|
|
Returns:
|
|
True if simulation started successfully
|
|
"""
|
|
simulation = self.get_simulation(simulation_id)
|
|
if not simulation:
|
|
return False
|
|
|
|
self.active_simulations[simulation_id] = simulation
|
|
# In a real application, this would run in a separate thread
|
|
simulation.run_simulation(iterations)
|
|
|
|
return True
|
|
|
|
def stop_simulation(self, simulation_id: str) -> bool:
|
|
"""
|
|
Stop a running simulation.
|
|
|
|
Args:
|
|
simulation_id: Unique simulation identifier
|
|
|
|
Returns:
|
|
True if simulation stopped successfully
|
|
"""
|
|
simulation = self.get_simulation(simulation_id)
|
|
if simulation:
|
|
simulation.stop_simulation()
|
|
self.active_simulations.pop(simulation_id, None)
|
|
return True
|
|
return False
|
|
|
|
def delete_simulation(self, simulation_id: str) -> bool:
|
|
"""
|
|
Delete a simulation instance.
|
|
|
|
Args:
|
|
simulation_id: Unique simulation identifier
|
|
|
|
Returns:
|
|
True if simulation deleted successfully
|
|
"""
|
|
if simulation_id in self.simulations:
|
|
self.stop_simulation(simulation_id)
|
|
del self.simulations[simulation_id]
|
|
return True
|
|
return False
|
|
|
|
def list_simulations(self) -> List[Dict]:
|
|
"""
|
|
List all simulation instances with their status.
|
|
|
|
Returns:
|
|
List of simulation information dictionaries
|
|
"""
|
|
simulations_info = []
|
|
|
|
for sim_id, simulation in self.simulations.items():
|
|
info = {
|
|
'simulation_id': sim_id,
|
|
'is_running': simulation.is_running,
|
|
'current_iteration': simulation.current_iteration,
|
|
'total_iterations': simulation.parameters.iterations,
|
|
'r_rate': simulation.parameters.r_rate,
|
|
'g_rate': simulation.parameters.g_rate,
|
|
'num_agents': simulation.parameters.num_agents
|
|
}
|
|
simulations_info.append(info)
|
|
|
|
return simulations_info |