Files
capitalism-eats-the-world/app/models/economic_model.py
elpatron 3a08de3a4a Fix wealth distribution chart functionality
- Add get_wealth_histogram() method to EconomicModel for histogram data
- Add new API endpoint /simulation/<id>/distribution for chart data
- Extend main data API with include_distribution parameter
- Update real-time progress updates to include distribution data
- Fix frontend updateCharts() to handle wealth distribution chart
- Add distribution data processing in simulation.js
- Update test_charts.py to verify histogram functionality

Resolves issue where wealth distribution chart was not updating during simulations.
2025-08-24 20:10:58 +02:00

560 lines
19 KiB
Python

"""
Economic Simulation Engine
This module implements the main simulation engine that orchestrates multiple
economic agents running Markov chains to demonstrate wealth inequality dynamics
when capital return rate (r) exceeds economic growth rate (g).
"""
import numpy as np
import uuid
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import time
import json
from .markov_chain import EconomicAgent
@dataclass
class SimulationParameters:
"""Configuration parameters for economic simulation."""
r_rate: float # Capital return rate
g_rate: float # Economic growth rate
initial_capital: float = 1000.0
initial_consumption: float = 1000.0
num_agents: int = 100
iterations: int = 1000
def __post_init__(self):
"""Validate parameters after initialization."""
if self.r_rate < 0 or self.r_rate > 1:
raise ValueError("Capital rate must be between 0 and 1")
if self.g_rate < 0 or self.g_rate > 1:
raise ValueError("Growth rate must be between 0 and 1")
if self.num_agents < 1 or self.num_agents > 10000:
raise ValueError("Number of agents must be between 1 and 10000")
if self.iterations < 1 or self.iterations > 100000:
raise ValueError("Iterations must be between 1 and 100000")
@dataclass
class SimulationSnapshot:
"""Snapshot of simulation state at a specific iteration."""
iteration: int
timestamp: float
wealth_distribution: List[float]
consumption_distribution: List[float]
total_wealth: float
gini_coefficient: float
wealth_concentration_top10: float
capital_share: float
average_wealth: float
median_wealth: float
class EconomicSimulation:
"""
Main simulation engine for demonstrating Piketty's inequality principle.
Manages multiple economic agents and tracks wealth distribution over time
to show how r > g leads to increasing inequality.
"""
def __init__(self, parameters: SimulationParameters):
"""
Initialize the economic simulation.
Args:
parameters: Configuration parameters for the simulation
"""
self.parameters = parameters
self.simulation_id = str(uuid.uuid4())
self.agents: List[EconomicAgent] = []
self.snapshots: List[SimulationSnapshot] = []
self.current_iteration = 0
self.is_running = False
self.start_time = None
self._initialize_agents()
def _initialize_agents(self):
"""Create economic agents with specified parameters."""
self.agents = []
for i in range(self.parameters.num_agents):
agent_id = f"agent_{i:04d}"
# Add some randomness to initial conditions
initial_capital = self.parameters.initial_capital * (0.8 + 0.4 * np.random.random())
initial_consumption = self.parameters.initial_consumption * (0.8 + 0.4 * np.random.random())
agent = EconomicAgent(
agent_id=agent_id,
capital_rate=self.parameters.r_rate,
growth_rate=self.parameters.g_rate,
initial_capital=initial_capital,
initial_consumption=initial_consumption
)
self.agents.append(agent)
def step(self) -> SimulationSnapshot:
"""
Perform one simulation step for all agents.
Returns:
Snapshot of the current simulation state
"""
# Step all agents
for agent in self.agents:
agent.step()
# Create snapshot
snapshot = self._create_snapshot()
self.snapshots.append(snapshot)
self.current_iteration += 1
return snapshot
def run_simulation(self, iterations: Optional[int] = None) -> List[SimulationSnapshot]:
"""
Run the complete simulation for specified iterations.
Args:
iterations: Number of iterations to run (uses parameter default if None)
Returns:
List of snapshots for each iteration
"""
if iterations is None:
iterations = self.parameters.iterations
self.is_running = True
self.start_time = time.time()
try:
for _ in range(iterations):
if not self.is_running:
break
self.step()
finally:
self.is_running = False
return self.snapshots.copy()
def _create_snapshot(self) -> SimulationSnapshot:
"""Create a snapshot of current simulation state."""
# Collect wealth data
wealth_data = []
consumption_data = []
total_wealth_data = []
for agent in self.agents:
if agent.wealth_history and agent.consumption_history:
wealth = agent.wealth_history[-1]
consumption = agent.consumption_history[-1]
else:
wealth = agent.initial_capital
consumption = agent.initial_consumption
wealth_data.append(wealth)
consumption_data.append(consumption)
total_wealth_data.append(wealth + consumption)
# Calculate metrics
total_wealth = sum(total_wealth_data)
gini = self._calculate_gini_coefficient(total_wealth_data)
wealth_conc = self._calculate_wealth_concentration(total_wealth_data, 0.1)
capital_share = sum(wealth_data) / total_wealth if total_wealth > 0 else 0
return SimulationSnapshot(
iteration=self.current_iteration,
timestamp=time.time(),
wealth_distribution=wealth_data,
consumption_distribution=consumption_data,
total_wealth=total_wealth,
gini_coefficient=gini,
wealth_concentration_top10=wealth_conc,
capital_share=capital_share,
average_wealth=float(np.mean(total_wealth_data)),
median_wealth=float(np.median(total_wealth_data))
)
def _calculate_gini_coefficient(self, wealth_data: List[float]) -> float:
"""
Calculate the Gini coefficient for wealth inequality.
Args:
wealth_data: List of wealth values
Returns:
Gini coefficient between 0 (perfect equality) and 1 (perfect inequality)
"""
if not wealth_data or len(wealth_data) < 2:
return 0.0
# Sort wealth data
sorted_wealth = sorted(wealth_data)
n = len(sorted_wealth)
# Calculate Gini coefficient
cumsum = np.cumsum(sorted_wealth)
total_wealth = cumsum[-1]
if total_wealth == 0:
return 0.0
# Gini coefficient formula
gini = (2 * sum((i + 1) * wealth for i, wealth in enumerate(sorted_wealth))) / (n * total_wealth) - (n + 1) / n
return max(0.0, min(1.0, gini))
def _calculate_wealth_concentration(self, wealth_data: List[float], top_percentage: float) -> float:
"""
Calculate the share of total wealth held by the top percentage of agents.
Args:
wealth_data: List of wealth values
top_percentage: Percentage of top agents (e.g., 0.1 for top 10%)
Returns:
Share of total wealth held by top agents
"""
if not wealth_data:
return 0.0
sorted_wealth = sorted(wealth_data, reverse=True)
total_wealth = sum(sorted_wealth)
if total_wealth == 0:
return 0.0
top_count = max(1, int(len(sorted_wealth) * top_percentage))
top_wealth = sum(sorted_wealth[:top_count])
return top_wealth / total_wealth
def get_latest_snapshot(self) -> Optional[SimulationSnapshot]:
"""Get the most recent simulation snapshot."""
return self.snapshots[-1] if self.snapshots else None
def get_snapshot_at_iteration(self, iteration: int) -> Optional[SimulationSnapshot]:
"""Get snapshot at specific iteration."""
for snapshot in self.snapshots:
if snapshot.iteration == iteration:
return snapshot
return None
def get_wealth_evolution(self) -> Tuple[List[int], List[float], List[float]]:
"""
Get wealth evolution data for plotting.
Returns:
Tuple of (iterations, total_wealth_over_time, gini_over_time)
"""
if not self.snapshots:
return [], [], []
iterations = [s.iteration for s in self.snapshots]
total_wealth = [s.total_wealth for s in self.snapshots]
gini_coefficients = [s.gini_coefficient for s in self.snapshots]
return iterations, total_wealth, gini_coefficients
def get_inequality_evolution(self) -> Tuple[List[int], List[float], List[float], List[float]]:
"""
Get inequality evolution data for plotting.
Returns:
Tuple of (iterations, gini_over_time, top10_share_over_time, capital_share_over_time)
"""
if not self.snapshots:
return [], [], [], []
iterations = [s.iteration for s in self.snapshots]
gini_coefficients = [s.gini_coefficient for s in self.snapshots]
top10_shares = [s.wealth_concentration_top10 for s in self.snapshots]
capital_shares = [s.capital_share for s in self.snapshots]
return iterations, gini_coefficients, top10_shares, capital_shares
def get_agent_wealth_distribution(self) -> Dict[str, List[float]]:
"""
Get current wealth distribution across all agents.
Returns:
Dictionary with agent IDs and their current wealth values
"""
distribution = {}
for agent in self.agents:
if agent.wealth_history:
distribution[agent.agent_id] = agent.wealth_history[-1]
else:
distribution[agent.agent_id] = agent.initial_capital
return distribution
def get_wealth_histogram(self, num_bins: int = 10) -> Tuple[List[str], List[int]]:
"""
Get wealth distribution as histogram data for charting.
Args:
num_bins: Number of bins for the histogram
Returns:
Tuple of (bin_labels, bin_counts) for histogram chart
"""
if not self.agents:
return [], []
# Get current total wealth for all agents (capital + consumption)
wealth_values = []
for agent in self.agents:
if agent.wealth_history and agent.consumption_history:
total_wealth = agent.wealth_history[-1] + agent.consumption_history[-1]
else:
total_wealth = agent.initial_capital + agent.initial_consumption
wealth_values.append(total_wealth)
if not wealth_values:
return [], []
# Calculate histogram bins
min_wealth = min(wealth_values)
max_wealth = max(wealth_values)
if min_wealth == max_wealth:
# All agents have same wealth
return [f"${min_wealth:.0f}"], [len(wealth_values)]
# Create bins
bin_width = (max_wealth - min_wealth) / num_bins
bins = [min_wealth + i * bin_width for i in range(num_bins + 1)]
# Count agents in each bin
bin_counts = [0] * num_bins
bin_labels = []
for i in range(num_bins):
# Create label for this bin
bin_start = bins[i]
bin_end = bins[i + 1]
if i == num_bins - 1: # Last bin includes upper bound
bin_labels.append(f"${bin_start:.0f} - ${bin_end:.0f}")
else:
bin_labels.append(f"${bin_start:.0f} - ${bin_end:.0f}")
# Count agents in this bin
for wealth in wealth_values:
if i == num_bins - 1: # Last bin includes upper bound
if bin_start <= wealth <= bin_end:
bin_counts[i] += 1
else:
if bin_start <= wealth < bin_end:
bin_counts[i] += 1
return bin_labels, bin_counts
def update_parameters(self, new_parameters: SimulationParameters):
"""
Update simulation parameters and restart with new settings.
Args:
new_parameters: New simulation configuration
"""
self.parameters = new_parameters
self.current_iteration = 0
self.snapshots.clear()
self._initialize_agents()
def stop_simulation(self):
"""Stop the running simulation."""
self.is_running = False
def reset_simulation(self):
"""Reset simulation to initial state."""
self.current_iteration = 0
self.snapshots.clear()
self.is_running = False
self._initialize_agents()
def export_data(self, format_type: str = 'json') -> str:
"""
Export simulation data in specified format.
Args:
format_type: Export format ('json' or 'csv')
Returns:
Formatted data string
"""
if format_type.lower() == 'json':
return self._export_json()
elif format_type.lower() == 'csv':
return self._export_csv()
else:
raise ValueError("Format must be 'json' or 'csv'")
def _export_json(self) -> str:
"""Export simulation data as JSON."""
data = {
'simulation_id': self.simulation_id,
'parameters': {
'r_rate': self.parameters.r_rate,
'g_rate': self.parameters.g_rate,
'initial_capital': self.parameters.initial_capital,
'initial_consumption': self.parameters.initial_consumption,
'num_agents': self.parameters.num_agents,
'iterations': self.parameters.iterations
},
'snapshots': []
}
for snapshot in self.snapshots:
snapshot_data = {
'iteration': snapshot.iteration,
'timestamp': snapshot.timestamp,
'total_wealth': snapshot.total_wealth,
'gini_coefficient': snapshot.gini_coefficient,
'wealth_concentration_top10': snapshot.wealth_concentration_top10,
'capital_share': snapshot.capital_share,
'average_wealth': snapshot.average_wealth,
'median_wealth': snapshot.median_wealth
}
data['snapshots'].append(snapshot_data)
return json.dumps(data, indent=2)
def _export_csv(self) -> str:
"""Export simulation data as CSV."""
if not self.snapshots:
return "iteration,total_wealth,gini_coefficient,wealth_concentration_top10,capital_share,average_wealth,median_wealth\n"
lines = ["iteration,total_wealth,gini_coefficient,wealth_concentration_top10,capital_share,average_wealth,median_wealth"]
for snapshot in self.snapshots:
line = f"{snapshot.iteration},{snapshot.total_wealth},{snapshot.gini_coefficient}," \
f"{snapshot.wealth_concentration_top10},{snapshot.capital_share}," \
f"{snapshot.average_wealth},{snapshot.median_wealth}"
lines.append(line)
return "\n".join(lines)
class SimulationManager:
"""
Manages multiple simulation instances for concurrent access.
"""
def __init__(self):
"""Initialize the simulation manager."""
self.simulations: Dict[str, EconomicSimulation] = {}
self.active_simulations: Dict[str, EconomicSimulation] = {}
def create_simulation(self, parameters: SimulationParameters) -> str:
"""
Create a new simulation instance.
Args:
parameters: Simulation configuration
Returns:
Unique simulation ID
"""
simulation = EconomicSimulation(parameters)
simulation_id = simulation.simulation_id
self.simulations[simulation_id] = simulation
return simulation_id
def get_simulation(self, simulation_id: str) -> Optional[EconomicSimulation]:
"""
Get simulation instance by ID.
Args:
simulation_id: Unique simulation identifier
Returns:
Simulation instance or None if not found
"""
return self.simulations.get(simulation_id)
def start_simulation(self, simulation_id: str, iterations: Optional[int] = None) -> bool:
"""
Start running a simulation.
Args:
simulation_id: Unique simulation identifier
iterations: Number of iterations to run
Returns:
True if simulation started successfully
"""
simulation = self.get_simulation(simulation_id)
if not simulation:
return False
self.active_simulations[simulation_id] = simulation
# In a real application, this would run in a separate thread
simulation.run_simulation(iterations)
return True
def stop_simulation(self, simulation_id: str) -> bool:
"""
Stop a running simulation.
Args:
simulation_id: Unique simulation identifier
Returns:
True if simulation stopped successfully
"""
simulation = self.get_simulation(simulation_id)
if simulation:
simulation.stop_simulation()
self.active_simulations.pop(simulation_id, None)
return True
return False
def delete_simulation(self, simulation_id: str) -> bool:
"""
Delete a simulation instance.
Args:
simulation_id: Unique simulation identifier
Returns:
True if simulation deleted successfully
"""
if simulation_id in self.simulations:
self.stop_simulation(simulation_id)
del self.simulations[simulation_id]
return True
return False
def list_simulations(self) -> List[Dict]:
"""
List all simulation instances with their status.
Returns:
List of simulation information dictionaries
"""
simulations_info = []
for sim_id, simulation in self.simulations.items():
info = {
'simulation_id': sim_id,
'is_running': simulation.is_running,
'current_iteration': simulation.current_iteration,
'total_iterations': simulation.parameters.iterations,
'r_rate': simulation.parameters.r_rate,
'g_rate': simulation.parameters.g_rate,
'num_agents': simulation.parameters.num_agents
}
simulations_info.append(info)
return simulations_info