diff --git a/policyengine_core/model_api.py b/policyengine_core/model_api.py index 68f1f2dd..aa4876d5 100644 --- a/policyengine_core/model_api.py +++ b/policyengine_core/model_api.py @@ -20,7 +20,7 @@ ) from policyengine_core.periods import DAY, ETERNITY, MONTH, YEAR, period from policyengine_core.populations import ADD, DIVIDE -from policyengine_core.reforms import Reform +from policyengine_core.reforms import Reform, StructuralReform from policyengine_core.simulations import ( calculate_output_add, calculate_output_divide, diff --git a/policyengine_core/reforms/__init__.py b/policyengine_core/reforms/__init__.py index a7ce20a8..ff3ff344 100644 --- a/policyengine_core/reforms/__init__.py +++ b/policyengine_core/reforms/__init__.py @@ -1 +1,2 @@ from .reform import Reform, set_parameter +from .structural_reform import StructuralReform diff --git a/policyengine_core/reforms/structural_reform.py b/policyengine_core/reforms/structural_reform.py new file mode 100644 index 00000000..6f62f020 --- /dev/null +++ b/policyengine_core/reforms/structural_reform.py @@ -0,0 +1,453 @@ +from typing import Annotated, Callable, Literal, Any +from datetime import datetime +from dataclasses import dataclass +from policyengine_core.variables import Variable +from policyengine_core.parameters import ( + Parameter, + ParameterNode, + ParameterAtInstant, +) +from policyengine_core.periods import config +from policyengine_core.taxbenefitsystems import TaxBenefitSystem +from policyengine_core.errors import ( + VariableNotFoundError, + VariableNameConflictError, +) + + +@dataclass +class TransformationLogItem: + """ + A log item for a transformation applied to a variable. + """ + + variable_name: str + variable: Variable | None # None is present when using neutralize + transformation: Literal["neutralize", "add", "update"] + + +class StructuralReform: + + DEFAULT_START_INSTANT = "0000-01-01" + transformation_log: list[TransformationLogItem] = [] + + tax_benefit_system: TaxBenefitSystem | None = None + start_instant: Annotated[str, "YYYY-MM-DD"] | None = None + end_instant: Annotated[str, "YYYY-MM-DD"] | None = None + trigger_parameter_path: str = "" + + def __init__(self, trigger_parameter_path: str): + """ + Initialize a structural reform. + + Args: + trigger_parameter: Path to the parameter that triggers the structural reform; + this parameter must be Boolean + """ + self.reform_id = hash( + f"{self.__class__.__module__}_{self.__class__.__qualname__}" + ) + self.trigger_parameter_path = trigger_parameter_path + + def activate( + self, + tax_benefit_system: TaxBenefitSystem, + ): + """ + Activate the structural reform. + + Args: + tax_benefit_system: The tax benefit system to which the structural reform will be applied + """ + if tax_benefit_system is None: + raise ValueError("Tax benefit system must be provided.") + + if not isinstance(tax_benefit_system, TaxBenefitSystem): + raise TypeError( + "Tax benefit system must be an instance of the TaxBenefitSystem class." + ) + + self.tax_benefit_system = tax_benefit_system + + # Fetch the trigger parameter + trigger_parameter: Parameter = self._fetch_parameter( + self.trigger_parameter_path + ) + + # Parse date out of trigger parameter and set + start_instant: Annotated[str, "YYYY-MM-DD"] | None + end_instant: Annotated[str, "YYYY-MM-DD"] | None + start_instant, end_instant = self._parse_activation_period( + trigger_parameter + ) + + self.start_instant = start_instant + self.end_instant = end_instant + + if self.start_instant is None: + return + + self._activate_transformation_log() + + def neutralize_variable(self, name: str): + """ + When structural reform is activated, neutralize a variable + by setting its formula to return the default value from the + StructuralReform's start_instant date to its end_instant date. + + Args: + name: The name of the variable + """ + self.transformation_log.append( + TransformationLogItem( + variable_name=name, + transformation="neutralize", + ) + ) + + def add_variable(self, variable: Variable): + """ + When structural reform is activated, add a variable + to the StructuralReform. + + Args: + variable: The variable to be added + """ + self.transformation_log.append( + TransformationLogItem( + variable_name=variable.__name__, + variable=variable, + transformation="add", + ) + ) + + def update_variable(self, variable: Variable): + """ + When structural reform is activated, update a + variable in the tax benefit system; if the variable + does not yet exist, it will be added. + + Args: + variable: The variable to be updated + """ + self.transformation_log.append( + TransformationLogItem( + variable_name=variable.__name__, + variable=variable, + transformation="update", + ) + ) + + def _activate_transformation_log(self): + """ + Activate the transformation log by applying the transformations to the tax benefit system. + """ + for log_item in self.transformation_log: + if log_item.transformation == "neutralize": + self._neutralize_variable(log_item.variable_name) + elif log_item.transformation == "add": + self._add_variable(log_item.variable) + elif log_item.transformation == "update": + self._update_variable(log_item.variable) + + def _neutralize_variable(self, name: str) -> Variable: + """ + Neutralize a variable by setting its formula to return the default value + from the StructuralReform's start_instant date onward. + + Args: + name: The name of the variable + + Raises: + VariableNotFoundError: If the variable is not found in the tax benefit system + """ + # Fetch variable + fetched_variable: Variable | None = self._fetch_variable(name) + + if fetched_variable is None: + raise VariableNotFoundError( + f"Unable to neutralize {name}; variable not found.", + self.tax_benefit_system, + ) + + # Add formula to variable that returns all defaults + neutralized_formula = self._neutralized_formula(fetched_variable) + self._add_formula( + fetched_variable, + neutralized_formula, + self.start_instant, + self.end_instant, + ) + + return fetched_variable + + def _add_variable(self, variable: Variable) -> Variable: + """ + Only partially implemented; Add a variable to the StructuralReform. + + Args: + variable: The variable to be added + + Raises: + VariableNameConflictError: If a variable with the same name already exists in the tax benefit system + """ + if not issubclass(variable, Variable): + raise TypeError( + "Variable must be an instance of the Variable class." + ) + + # Attempt to fetch variable + fetched_variable: Variable | None = self._fetch_variable( + variable.__name__ + ) + + if fetched_variable is not None: + raise VariableNameConflictError( + f"Unable to add {variable.__name__}; variable with the same name already exists." + ) + + # Insert variable into the tax-benefit system; this will apply default formula over + # entire period, which we will modify below + added_variable = self.tax_benefit_system.add_variable(variable) + + # First, neutralize over entire period + neutralized_formula = self._neutralized_formula(variable) + self._add_formula( + added_variable, + neutralized_formula, + self.DEFAULT_START_INSTANT, + ) + + # Then, re-add formula in order to format correctly + self._add_formula( + added_variable, + variable.formula, + self.start_instant, + self.end_instant, + ) + + return added_variable + + def _update_variable(self, variable: Variable) -> Variable: + """ + Update a variable in the tax benefit system; if the variable does not + yet exist, it will be added. + + Args: + variable: The variable to be updated + """ + + # Ensure variable is a Variable + if not issubclass(variable, Variable): + raise TypeError( + "Variable must be an instance of the Variable class." + ) + + # Fetch variable + fetched_variable: Variable | None = self._fetch_variable( + variable.__name__ + ) + + # If variable doesn't exist, run self._add_variable + if fetched_variable is None: + return self._add_variable(variable) + + # Otherwise, add new formula to existing variable + self._add_formula( + fetched_variable, + variable.formula, + self.start_instant, + self.end_instant, + ) + + return fetched_variable + + def _fetch_variable(self, name: str) -> Variable | None: + """ + Fetch the variable by reference from the tax benefit system. + + Args: + name: The name of the variable + """ + return self.tax_benefit_system.get_variable(name) + + def _fetch_parameter(self, parameter_path: str) -> Parameter: + """ + Given a dot-notated string, fetch a parameter by + reference from the tax benefit system. + + Args: + parameter_path: The dot-notated path to the parameter + + Raises: + AttributeError: If the parameter cannot be found + """ + root: ParameterNode | Parameter = self.tax_benefit_system.parameters + current: ParameterNode | Parameter = root + full_path: str = "" + + for index, key in enumerate(parameter_path.split(".")): + full_path += f"{key}" if index == 0 else f".{key}" + try: + current = getattr(current, key) + except AttributeError: + raise AttributeError( + f"Unable to find parameter at path '{full_path}'" + ) from None + + if not isinstance(current, Parameter): + raise AttributeError( + f"Parameter at path '{full_path}' is not a Parameter, but a {type(current)}" + ) + + return current + + # Method to modify metadata based on new items? + + # Method to add formula based on date + def _add_formula( + self, + variable: Variable, + formula: Callable, + start_instant: Annotated[str, "YYYY-MM-DD"], + end_instant: Annotated[str, "YYYY-MM-DD"] | None = None, + ) -> Variable: + """ + Mutatively add an evolved formula, beginning at start_instant and ending at end_instant, + to a variable, and return said variable. + + For more on evolved formulas, consult + https://openfisca.org/doc/coding-the-legislation/40_legislation_evolutions.html + + Args: + variable: The variable to which the formula will be added + formula: The formula to be added + start_instant: The date on which the formula will take effect + end_instant: The date on which the formula ends, exclusive (i.e., + the formula will be applied up to but not including this date); if None, + the formula will be applied indefinitely + + Returns: + The variable with the evolved formula + """ + # Prior to manipulation, get formula at end_instant + 1 day + next_formula: Callable | None = None + if end_instant is not None: + next_formula = variable.get_formula(end_instant) + + # Insert formula into variable's formulas at start_instant + variable.formulas.update({start_instant: formula}) + + # Remove all formulas between start_instant and end_instant (or into perpetuity + # if end_instant is None) + for date in variable.formulas.keys(): + if date > start_instant and ( + end_instant is None or date <= end_instant + ): + variable.formulas.pop(date) + + # If end_instant, add back in formula at end_instant + if end_instant is not None: + variable.formulas[end_instant] = next_formula + + return variable + + def _neutralized_formula(self, variable: Variable) -> Callable: + """ + Return a formula that itself returns the default value of a variable. + + Args: + variable: The variable to be neutralized + + Returns: + The neutralized formula + """ + return lambda: variable.default_value + + # Validate start instant + def _validate_instant(self, instant: Any) -> bool: + """ + Validate an instant. + + Args: + instant: The instant to be validated + """ + if not isinstance(instant, str): + raise TypeError( + "Instant must be a string in the format 'YYYY-MM-DD'." + ) + + if not config.INSTANT_PATTERN.match(instant): + raise ValueError( + "'{}' is not a valid instant. Instants are described using the 'YYYY-MM-DD' format, for instance '2015-06-15'.".format( + instant + ) + ) + + return True + + def _parse_activation_period(self, trigger_parameter: Parameter) -> tuple[ + Annotated[str, "YYYY-MM-DD"] | None, + Annotated[str, "YYYY-MM-DD"] | None, + ]: + """ + Given a trigger parameter, parse the reform start and end dates and return them. + + Returns: + A tuple containing the start and end dates of the reform, + or None if the reform is not triggered + """ + + # Crash if trigger param isn't Boolean; this shouldn't be used as a trigger + if (trigger_parameter.metadata is None) or ( + trigger_parameter.metadata["unit"] != "bool" + ): + raise ValueError("Trigger parameter must be a Boolean.") + + # Build custom representation of trigger parameter instants and values + values_dict: dict[Annotated[str, "YYYY-MM-DD"], int | float] = ( + self._generate_param_values_dict(trigger_parameter.values_list) + ) + + if list(values_dict.values()).count(True) > 1: + raise ValueError("Trigger parameter must only be activated once.") + + if list(values_dict.values()).count(True) == 0: + return (None, None) + + # Now that True only occurs once, find it + start_instant_index: int = list(values_dict.values()).index(True) + start_instant: Annotated[str, "YYYY-MM-DD"] = list(values_dict.keys())[ + start_instant_index + ] + self._validate_instant(start_instant) + + # If it's the last item, the reform occurs into perpetuity, else + # the reform ends at the next instant + if start_instant_index == len(values_dict) - 1: + return (start_instant, None) + + end_instant: Annotated[str, "YYYY-MM-DD"] = list(values_dict.keys())[ + start_instant_index + 1 + ] + self._validate_instant(end_instant) + return (start_instant, end_instant) + + def _generate_param_values_dict( + self, values_list: list[ParameterAtInstant] + ) -> dict[Annotated[str, "YYYY-MM-DD"], int | float]: + """ + Given a list of ParameterAtInstant objects, generate a dictionary of the form {instant: value}. + + Args: + values_list: The list of ParameterAtInstant objects + """ + unsorted_dict = { + value.instant_str: value.value for value in values_list + } + sorted_dict = dict( + sorted(unsorted_dict.items(), key=lambda item: item[0]) + ) + return sorted_dict + + # Default outputs method of some sort? diff --git a/policyengine_core/taxbenefitsystems/tax_benefit_system.py b/policyengine_core/taxbenefitsystems/tax_benefit_system.py index 0e689729..5be3faeb 100644 --- a/policyengine_core/taxbenefitsystems/tax_benefit_system.py +++ b/policyengine_core/taxbenefitsystems/tax_benefit_system.py @@ -50,6 +50,9 @@ log = logging.getLogger(__name__) +if TYPE_CHECKING: + from policyengine_core.reforms import StructuralReform + class TaxBenefitSystem: """ @@ -90,6 +93,8 @@ class TaxBenefitSystem: """Short list of basic inputs to get medium accuracy.""" modelled_policies: str = None """A YAML filepath containing metadata describing the modelled policies.""" + possible_structural_reforms: List[StructuralReform] = None + """List of possible structural reforms that can be applied to the tax and benefit system.""" def __init__(self, entities: Sequence[Entity] = None, reform=None) -> None: if entities is None: diff --git a/tests/core/test_reforms.py b/tests/core/reforms/test_reforms.py similarity index 100% rename from tests/core/test_reforms.py rename to tests/core/reforms/test_reforms.py diff --git a/tests/core/reforms/test_structural_reforms.py b/tests/core/reforms/test_structural_reforms.py new file mode 100644 index 00000000..7392297e --- /dev/null +++ b/tests/core/reforms/test_structural_reforms.py @@ -0,0 +1,256 @@ +import warnings + +import pytest +from datetime import datetime +from policyengine_core.errors import ( + VariableNotFoundError, + VariableNameConflictError, +) +from policyengine_core.model_api import * +from policyengine_core.country_template.entities import Person +from policyengine_core.variables import Variable +from policyengine_core.reforms import StructuralReform +from policyengine_core.country_template import CountryTaxBenefitSystem + + +@pytest.fixture(scope="module") +def prefilled_tax_benefit_system(): + + class test_variable_to_add(Variable): + value_type = str + default_value = "Returning default value [pre-loaded]" + entity = Person + label = "Dworkin" + definition_period = YEAR + + def formula(): + return "Returning value from formula [pre-loaded]" + + tax_benefit_system = CountryTaxBenefitSystem() + tax_benefit_system.add_variable(test_variable_to_add) + return tax_benefit_system + + +class test_variable_to_add(Variable): + value_type = str + default_value = "Returning default value" + entity = Person + label = "Maxwell" + definition_period = YEAR + + def formula(): + return "Returning value from formula" + + +class TestGivenEmptyTaxBenefitSystem: + # Given an empty tax-benefit system... + def test_structural_reform_init(self, isolated_tax_benefit_system): + + # When a new structural reform is created with default settings... + test_reform = StructuralReform(isolated_tax_benefit_system) + + # Then the reform is created successfully for the current year + assert test_reform.tax_benefit_system == isolated_tax_benefit_system + assert test_reform.start_instant == str(datetime.now().year) + "-01-01" + assert test_reform.end_instant == None + + def test_structural_reform_init_with_dates( + self, isolated_tax_benefit_system + ): + + # When a new structural reform is created with specific dates... + reform = StructuralReform( + isolated_tax_benefit_system, "2020-01-01", "2021-01-01" + ) + + # Then the reform is created successfully for the specified dates + assert reform.tax_benefit_system == isolated_tax_benefit_system + assert reform.start_instant == "2020-01-01" + assert reform.end_instant == "2021-01-01" + + def test_structural_reform_init_with_invalid_date_type( + self, isolated_tax_benefit_system + ): + + # When a new structural reform is created with incorrectly typed dates... + + # Then the reform raises a TypeError + + with pytest.raises(TypeError): + StructuralReform(isolated_tax_benefit_system, "2020-01-01", 15) + + def test_structural_reform_init_with_invalid_date_format( + self, isolated_tax_benefit_system + ): + + # When a new structural reform is created with incorrectly formatted dates... + + # Then the reform raises a ValueError + + with pytest.raises(ValueError): + StructuralReform( + isolated_tax_benefit_system, "2020-01-01", "2020-13-01" + ) + + def test_add_variable_no_end_dates( + self, + isolated_tax_benefit_system, + ): + # Given an empty tax-benefit system with an endless structural reform... + test_reform = StructuralReform( + isolated_tax_benefit_system, + "2025-01-01", + ) + + # When a new variable is added... + test_reform.add_variable(test_variable_to_add) + + # Then add_variable(test_var) adds new variable with proper formulas + assert ( + "test_variable_to_add" + in test_reform.tax_benefit_system.variables.keys() + ) + + added_test_variable = test_reform.tax_benefit_system.get_variable( + "test_variable_to_add" + ) + assert added_test_variable.value_type == str + assert added_test_variable.label == "Maxwell" + assert ( + added_test_variable.get_formula("2025-01-01")() + == "Returning value from formula" + ) + assert ( + added_test_variable.get_formula("2021-01-01")() + == "Returning default value" + ) + + def test_update_variable_no_end_dates( + self, + isolated_tax_benefit_system, + ): + # Given an empty tax-benefit system with an endless structural reform... + test_reform = StructuralReform( + isolated_tax_benefit_system, + "2025-01-01", + ) + + # When update_variable is called on a variable that does not exist... + test_reform.update_variable(test_variable_to_add) + + # Then update_variable(test_var) adds new variable with proper formulas + assert ( + "test_variable_to_add" + in test_reform.tax_benefit_system.variables.keys() + ) + + added_test_variable = test_reform.tax_benefit_system.get_variable( + "test_variable_to_add" + ) + assert added_test_variable.value_type == str + assert added_test_variable.label == "Maxwell" + assert ( + added_test_variable.get_formula("2025-01-01")() + == "Returning value from formula" + ) + assert ( + added_test_variable.get_formula("2021-01-01")() + == "Returning default value" + ) + + def test_neutralize_variable_no_end_dates( + self, + isolated_tax_benefit_system, + ): + # Given an empty tax-benefit system with an endless structural reform... + test_reform = StructuralReform( + isolated_tax_benefit_system, + "2025-01-01", + ) + + # When neutralize_variable is called on a variable that does not exist... + + # Then neutralize_variable(test_var) raises error + with pytest.raises(VariableNotFoundError): + test_reform.neutralize_variable("test_variable_to_add") + + +class TestGivenPreFilledTaxBenefitSystem: + + # Given a tax-benefit system with variable test_variable_to_add... + + def test_add_variable_no_end_dates( + self, + prefilled_tax_benefit_system, + ): + # Given a tax-benefit system with an endless structural reform... + test_reform = StructuralReform( + prefilled_tax_benefit_system, + "2025-01-01", + ) + + # When a new variable is added... + + # Then add_variable(test_var) raises error + with pytest.raises(VariableNameConflictError): + test_reform.add_variable(test_variable_to_add) + + def test_update_variable_no_end_dates( + self, + prefilled_tax_benefit_system, + ): + # Given a tax-benefit system with an endless structural reform... + test_reform = StructuralReform( + prefilled_tax_benefit_system, + "2025-01-01", + ) + + # When update_variable is called on a variable that already exists... + test_reform.update_variable(test_variable_to_add) + + # Then update_variable(test_var) updates variable with proper formulas + added_test_variable = test_reform.tax_benefit_system.get_variable( + "test_variable_to_add" + ) + assert added_test_variable.value_type == str + assert added_test_variable.label == "Maxwell" + assert ( + added_test_variable.get_formula("2025-01-01")() + == "Returning value from formula" + ) + assert ( + added_test_variable.get_formula("2021-01-01")() + == "Returning default value" + ) + + def test_neutralize_variable_no_end_dates( + self, + prefilled_tax_benefit_system, + ): + # Given a tax-benefit system with an endless structural reform... + test_reform = StructuralReform( + prefilled_tax_benefit_system, + "2025-01-01", + ) + + # When neutralize_variable is called on a variable that already exists... + test_reform.neutralize_variable("test_variable_to_add") + + # Then neutralize_variable(test_var) neutralizes variable + added_test_variable = test_reform.tax_benefit_system.get_variable( + "test_variable_to_add" + ) + assert added_test_variable.value_type == str + assert added_test_variable.label == "Maxwell" + assert ( + added_test_variable.get_formula("2025-01-01")() + == "Returning default value" + ) + assert ( + added_test_variable.get_formula("2021-01-01")() + == "Returning default value" + ) + + +# Given a TBS with a complex structural reform... +# The reform successfully adds a variable, updates a variable, then neutralizes a variable