""" Economic Simulation Engine This module implements the main simulation engine that orchestrates multiple economic agents running Markov chains to demonstrate wealth inequality dynamics when capital return rate (r) exceeds economic growth rate (g). """ import numpy as np import uuid from typing import List, Dict, Tuple, Optional from dataclasses import dataclass import time import json from .markov_chain import EconomicAgent @dataclass class SimulationParameters: """Configuration parameters for economic simulation.""" r_rate: float # Capital return rate g_rate: float # Economic growth rate initial_capital: float = 1000.0 initial_consumption: float = 1000.0 num_agents: int = 100 iterations: int = 1000 def __post_init__(self): """Validate parameters after initialization.""" if self.r_rate < 0 or self.r_rate > 1: raise ValueError("Capital rate must be between 0 and 1") if self.g_rate < 0 or self.g_rate > 1: raise ValueError("Growth rate must be between 0 and 1") if self.num_agents < 1 or self.num_agents > 10000: raise ValueError("Number of agents must be between 1 and 10000") if self.iterations < 1 or self.iterations > 100000: raise ValueError("Iterations must be between 1 and 100000") @dataclass class SimulationSnapshot: """Snapshot of simulation state at a specific iteration.""" iteration: int timestamp: float wealth_distribution: List[float] consumption_distribution: List[float] total_wealth: float gini_coefficient: float wealth_concentration_top10: float capital_share: float average_wealth: float median_wealth: float class EconomicSimulation: """ Main simulation engine for demonstrating Piketty's inequality principle. Manages multiple economic agents and tracks wealth distribution over time to show how r > g leads to increasing inequality. """ def __init__(self, parameters: SimulationParameters): """ Initialize the economic simulation. Args: parameters: Configuration parameters for the simulation """ self.parameters = parameters self.simulation_id = str(uuid.uuid4()) self.agents: List[EconomicAgent] = [] self.snapshots: List[SimulationSnapshot] = [] self.current_iteration = 0 self.is_running = False self.start_time = None self._initialize_agents() def _initialize_agents(self): """Create economic agents with specified parameters.""" self.agents = [] for i in range(self.parameters.num_agents): agent_id = f"agent_{i:04d}" # Add some randomness to initial conditions initial_capital = self.parameters.initial_capital * (0.8 + 0.4 * np.random.random()) initial_consumption = self.parameters.initial_consumption * (0.8 + 0.4 * np.random.random()) agent = EconomicAgent( agent_id=agent_id, capital_rate=self.parameters.r_rate, growth_rate=self.parameters.g_rate, initial_capital=initial_capital, initial_consumption=initial_consumption ) self.agents.append(agent) def step(self) -> SimulationSnapshot: """ Perform one simulation step for all agents. Returns: Snapshot of the current simulation state """ # Step all agents for agent in self.agents: agent.step() # Create snapshot snapshot = self._create_snapshot() self.snapshots.append(snapshot) self.current_iteration += 1 return snapshot def run_simulation(self, iterations: Optional[int] = None) -> List[SimulationSnapshot]: """ Run the complete simulation for specified iterations. Args: iterations: Number of iterations to run (uses parameter default if None) Returns: List of snapshots for each iteration """ if iterations is None: iterations = self.parameters.iterations self.is_running = True self.start_time = time.time() try: for _ in range(iterations): if not self.is_running: break self.step() finally: self.is_running = False return self.snapshots.copy() def _create_snapshot(self) -> SimulationSnapshot: """Create a snapshot of current simulation state.""" # Collect wealth data wealth_data = [] consumption_data = [] total_wealth_data = [] for agent in self.agents: if agent.wealth_history and agent.consumption_history: wealth = agent.wealth_history[-1] consumption = agent.consumption_history[-1] else: wealth = agent.initial_capital consumption = agent.initial_consumption wealth_data.append(wealth) consumption_data.append(consumption) total_wealth_data.append(wealth + consumption) # Calculate metrics total_wealth = sum(total_wealth_data) gini = self._calculate_gini_coefficient(total_wealth_data) wealth_conc = self._calculate_wealth_concentration(total_wealth_data, 0.1) capital_share = sum(wealth_data) / total_wealth if total_wealth > 0 else 0 return SimulationSnapshot( iteration=self.current_iteration, timestamp=time.time(), wealth_distribution=wealth_data, consumption_distribution=consumption_data, total_wealth=total_wealth, gini_coefficient=gini, wealth_concentration_top10=wealth_conc, capital_share=capital_share, average_wealth=float(np.mean(total_wealth_data)), median_wealth=float(np.median(total_wealth_data)) ) def _calculate_gini_coefficient(self, wealth_data: List[float]) -> float: """ Calculate the Gini coefficient for wealth inequality. Args: wealth_data: List of wealth values Returns: Gini coefficient between 0 (perfect equality) and 1 (perfect inequality) """ if not wealth_data or len(wealth_data) < 2: return 0.0 # Sort wealth data sorted_wealth = sorted(wealth_data) n = len(sorted_wealth) # Calculate Gini coefficient cumsum = np.cumsum(sorted_wealth) total_wealth = cumsum[-1] if total_wealth == 0: return 0.0 # Gini coefficient formula gini = (2 * sum((i + 1) * wealth for i, wealth in enumerate(sorted_wealth))) / (n * total_wealth) - (n + 1) / n return max(0.0, min(1.0, gini)) def _calculate_wealth_concentration(self, wealth_data: List[float], top_percentage: float) -> float: """ Calculate the share of total wealth held by the top percentage of agents. Args: wealth_data: List of wealth values top_percentage: Percentage of top agents (e.g., 0.1 for top 10%) Returns: Share of total wealth held by top agents """ if not wealth_data: return 0.0 sorted_wealth = sorted(wealth_data, reverse=True) total_wealth = sum(sorted_wealth) if total_wealth == 0: return 0.0 top_count = max(1, int(len(sorted_wealth) * top_percentage)) top_wealth = sum(sorted_wealth[:top_count]) return top_wealth / total_wealth def get_latest_snapshot(self) -> Optional[SimulationSnapshot]: """Get the most recent simulation snapshot.""" return self.snapshots[-1] if self.snapshots else None def get_snapshot_at_iteration(self, iteration: int) -> Optional[SimulationSnapshot]: """Get snapshot at specific iteration.""" for snapshot in self.snapshots: if snapshot.iteration == iteration: return snapshot return None def get_wealth_evolution(self) -> Tuple[List[int], List[float], List[float]]: """ Get wealth evolution data for plotting. Returns: Tuple of (iterations, total_wealth_over_time, gini_over_time) """ if not self.snapshots: return [], [], [] iterations = [s.iteration for s in self.snapshots] total_wealth = [s.total_wealth for s in self.snapshots] gini_coefficients = [s.gini_coefficient for s in self.snapshots] return iterations, total_wealth, gini_coefficients def get_inequality_evolution(self) -> Tuple[List[int], List[float], List[float], List[float]]: """ Get inequality evolution data for plotting. Returns: Tuple of (iterations, gini_over_time, top10_share_over_time, capital_share_over_time) """ if not self.snapshots: return [], [], [], [] iterations = [s.iteration for s in self.snapshots] gini_coefficients = [s.gini_coefficient for s in self.snapshots] top10_shares = [s.wealth_concentration_top10 for s in self.snapshots] capital_shares = [s.capital_share for s in self.snapshots] return iterations, gini_coefficients, top10_shares, capital_shares def get_agent_wealth_distribution(self) -> Dict[str, List[float]]: """ Get current wealth distribution across all agents. Returns: Dictionary with agent IDs and their current wealth values """ distribution = {} for agent in self.agents: if agent.wealth_history: distribution[agent.agent_id] = agent.wealth_history[-1] else: distribution[agent.agent_id] = agent.initial_capital return distribution def get_wealth_histogram(self, num_bins: int = 10) -> Tuple[List[str], List[int]]: """ Get wealth distribution as histogram data for charting. Args: num_bins: Number of bins for the histogram Returns: Tuple of (bin_labels, bin_counts) for histogram chart """ if not self.agents: return [], [] # Get current total wealth for all agents using the correct method wealth_values = [] for agent in self.agents: total_wealth = agent.get_total_wealth() wealth_values.append(total_wealth) if not wealth_values: return [], [] # Debug logging print(f"DEBUG: Wealth values - count: {len(wealth_values)}, min: {min(wealth_values)}, max: {max(wealth_values)}") # Calculate histogram bins min_wealth = min(wealth_values) max_wealth = max(wealth_values) if min_wealth == max_wealth: # All agents have same wealth result = [f"${min_wealth:.0f}"], [len(wealth_values)] print(f"DEBUG: All agents have same wealth - labels: {result[0]}, counts: {result[1]}") return result # Create bins bin_width = (max_wealth - min_wealth) / num_bins bins = [min_wealth + i * bin_width for i in range(num_bins + 1)] # Count agents in each bin bin_counts = [0] * num_bins bin_labels = [] for i in range(num_bins): # Create label for this bin bin_start = bins[i] bin_end = bins[i + 1] if i == num_bins - 1: # Last bin includes upper bound bin_labels.append(f"${bin_start:.0f} - ${bin_end:.0f}") else: bin_labels.append(f"${bin_start:.0f} - ${bin_end:.0f}") # Count agents in this bin for wealth in wealth_values: if i == num_bins - 1: # Last bin includes upper bound if bin_start <= wealth <= bin_end: bin_counts[i] += 1 else: if bin_start <= wealth < bin_end: bin_counts[i] += 1 print(f"DEBUG: Histogram result - labels: {bin_labels}, counts: {bin_counts}") return bin_labels, bin_counts def update_parameters(self, new_parameters: SimulationParameters): """ Update simulation parameters and restart with new settings. Args: new_parameters: New simulation configuration """ self.parameters = new_parameters self.current_iteration = 0 self.snapshots.clear() self._initialize_agents() def stop_simulation(self): """Stop the running simulation.""" self.is_running = False def reset_simulation(self): """Reset simulation to initial state.""" self.current_iteration = 0 self.snapshots.clear() self.is_running = False self._initialize_agents() def export_data(self, format_type: str = 'json') -> str: """ Export simulation data in specified format. Args: format_type: Export format ('json' or 'csv') Returns: Formatted data string """ if format_type.lower() == 'json': return self._export_json() elif format_type.lower() == 'csv': return self._export_csv() else: raise ValueError("Format must be 'json' or 'csv'") def _export_json(self) -> str: """Export simulation data as JSON.""" data = { 'simulation_id': self.simulation_id, 'parameters': { 'r_rate': self.parameters.r_rate, 'g_rate': self.parameters.g_rate, 'initial_capital': self.parameters.initial_capital, 'initial_consumption': self.parameters.initial_consumption, 'num_agents': self.parameters.num_agents, 'iterations': self.parameters.iterations }, 'snapshots': [] } for snapshot in self.snapshots: snapshot_data = { 'iteration': snapshot.iteration, 'timestamp': snapshot.timestamp, 'total_wealth': snapshot.total_wealth, 'gini_coefficient': snapshot.gini_coefficient, 'wealth_concentration_top10': snapshot.wealth_concentration_top10, 'capital_share': snapshot.capital_share, 'average_wealth': snapshot.average_wealth, 'median_wealth': snapshot.median_wealth } data['snapshots'].append(snapshot_data) return json.dumps(data, indent=2) def _export_csv(self) -> str: """Export simulation data as CSV.""" if not self.snapshots: return "iteration,total_wealth,gini_coefficient,wealth_concentration_top10,capital_share,average_wealth,median_wealth\n" lines = ["iteration,total_wealth,gini_coefficient,wealth_concentration_top10,capital_share,average_wealth,median_wealth"] for snapshot in self.snapshots: line = f"{snapshot.iteration},{snapshot.total_wealth},{snapshot.gini_coefficient}," \ f"{snapshot.wealth_concentration_top10},{snapshot.capital_share}," \ f"{snapshot.average_wealth},{snapshot.median_wealth}" lines.append(line) return "\n".join(lines) class SimulationManager: """ Manages multiple simulation instances for concurrent access. """ def __init__(self): """Initialize the simulation manager.""" self.simulations: Dict[str, EconomicSimulation] = {} self.active_simulations: Dict[str, EconomicSimulation] = {} def create_simulation(self, parameters: SimulationParameters) -> str: """ Create a new simulation instance. Args: parameters: Simulation configuration Returns: Unique simulation ID """ simulation = EconomicSimulation(parameters) simulation_id = simulation.simulation_id self.simulations[simulation_id] = simulation return simulation_id def get_simulation(self, simulation_id: str) -> Optional[EconomicSimulation]: """ Get simulation instance by ID. Args: simulation_id: Unique simulation identifier Returns: Simulation instance or None if not found """ return self.simulations.get(simulation_id) def start_simulation(self, simulation_id: str, iterations: Optional[int] = None) -> bool: """ Start running a simulation. Args: simulation_id: Unique simulation identifier iterations: Number of iterations to run Returns: True if simulation started successfully """ simulation = self.get_simulation(simulation_id) if not simulation: return False self.active_simulations[simulation_id] = simulation # In a real application, this would run in a separate thread simulation.run_simulation(iterations) return True def stop_simulation(self, simulation_id: str) -> bool: """ Stop a running simulation. Args: simulation_id: Unique simulation identifier Returns: True if simulation stopped successfully """ simulation = self.get_simulation(simulation_id) if simulation: simulation.stop_simulation() self.active_simulations.pop(simulation_id, None) return True return False def delete_simulation(self, simulation_id: str) -> bool: """ Delete a simulation instance. Args: simulation_id: Unique simulation identifier Returns: True if simulation deleted successfully """ if simulation_id in self.simulations: self.stop_simulation(simulation_id) del self.simulations[simulation_id] return True return False def list_simulations(self) -> List[Dict]: """ List all simulation instances with their status. Returns: List of simulation information dictionaries """ simulations_info = [] for sim_id, simulation in self.simulations.items(): info = { 'simulation_id': sim_id, 'is_running': simulation.is_running, 'current_iteration': simulation.current_iteration, 'total_iterations': simulation.parameters.iterations, 'r_rate': simulation.parameters.r_rate, 'g_rate': simulation.parameters.g_rate, 'num_agents': simulation.parameters.num_agents } simulations_info.append(info) return simulations_info