From 515b7977423afec71ba47f6a3a06edc8d8750bd2 Mon Sep 17 00:00:00 2001 From: Wega Date: Thu, 10 Apr 2025 13:47:07 +0200 Subject: [PATCH 1/4] Initial commit, vectorized operations in gdp and regional_differntiation --- .../tests/tools/costs/test_gdp_parity.py | 108 +++++++++ .../test_regional_differentiation_parity.py | 154 +++++++++++++ message_ix_models/tools/costs/gdp.py | 138 ++++++++++- .../tools/costs/regional_differentiation.py | 215 +++++++++++++++++- 4 files changed, 612 insertions(+), 3 deletions(-) create mode 100644 message_ix_models/tests/tools/costs/test_gdp_parity.py create mode 100644 message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py diff --git a/message_ix_models/tests/tools/costs/test_gdp_parity.py b/message_ix_models/tests/tools/costs/test_gdp_parity.py new file mode 100644 index 0000000000..ea45e77261 --- /dev/null +++ b/message_ix_models/tests/tools/costs/test_gdp_parity.py @@ -0,0 +1,108 @@ +import time + +import pandas as pd +import pandas.testing as pdt +import pytest +from sdmx.model.common import Code + +from message_ix_models.model.structure import get_codes +from message_ix_models.tools.costs import Config +from message_ix_models.tools.costs.gdp import ( + adjust_cost_ratios_with_gdp, + adjust_cost_ratios_with_gdp_vectorized, + process_raw_ssp_data, +) +from message_ix_models.tools.costs.gdp import ( + process_raw_ssp_data as process_raw_ssp_data_legacy, +) +from message_ix_models.tools.costs.regional_differentiation import ( + apply_regional_differentiation, +) + + +def assert_equal_result(legacy, refactored): + if isinstance(legacy, dict) and isinstance(refactored, dict): + # Ensure the dictionaries have the same keys + assert set(legacy.keys()) == set(refactored.keys()), ( + "Dictionary keys do not match" + ) + # Recursively compare each value in the dictionary + for key in legacy: + assert_equal_result(legacy[key], refactored[key]) + elif isinstance(legacy, pd.DataFrame) and isinstance(refactored, pd.DataFrame): + legacy = legacy.sort_index(axis=1) + refactored = refactored.sort_index(axis=1) + pdt.assert_frame_equal(legacy, refactored) + elif isinstance(legacy, pd.Series) and isinstance(refactored, pd.Series): + legacy = legacy.sort_index() + refactored = refactored.sort_index() + pdt.assert_series_equal(legacy, refactored) + else: + raise ValueError( + f"Type mismatch: legacy type {type(legacy)} vs refactored type {type(refactored)}" + ) + + +#@pytest.mark.skip(reason="Skipping test_process_raw_ssp_data") +@pytest.mark.parametrize("node", ("R11", "R12")) +def test_process_raw_ssp_data(test_context, node) -> None: + # Set the "regions" value on the context + test_context.model.regions = node + config = Config(node=node) + + # Retrieve list of node IDs + nodes = get_codes(f"node/{node}") + # Convert to string + regions = set(map(str, nodes[nodes.index(Code(id="World"))].child)) + + # Function runs + # - context is ignored by process_raw_ssp_data + # - node is ignored by process_raw_ssp_data1 + n_iter = 1 + start_time = time.time() + for _ in range(n_iter): + result = process_raw_ssp_data(context=test_context, config=config) + end_time = time.time() + with open("time_taken_gdp.txt", "a") as f: + f.write(f"Time taken for process_raw_ssp_data: {(end_time - start_time) / n_iter} seconds\n") + + start_time = time.time() + for _ in range(n_iter): + result_legacy = process_raw_ssp_data_legacy(context=test_context, config=config) + end_time = time.time() + with open("time_taken_gdp.txt", "a") as f: + f.write(f"Time taken for process_raw_ssp_data_legacy: {(end_time - start_time) / n_iter} seconds\n") + + # Assert that the results are equal + assert_equal_result(result, result_legacy) + +@pytest.mark.skip(reason="Skipping test_adjust_cost_ratios_with_gdp") +@pytest.mark.parametrize("module", ("energy", "materials", "cooling")) +def test_adjust_cost_ratios_with_gdp(test_context, module) -> None: + # Set parameters + test_context.model.regions = "R12" + + # Mostly defaults + config = Config(module=module, node="R12", scenario="SSP2") + + # Get regional differentiation + region_diff = apply_regional_differentiation(config) + n_iter = 10 + # Get adjusted cost ratios based on GDP per capita + start_time = time.time() + for _ in range(n_iter): + result = adjust_cost_ratios_with_gdp(region_diff, config) + end_time = time.time() + with open("time_taken_gdp.txt", "a") as f: + f.write(f"Time taken for adjust_cost_ratios_with_gdp: {(end_time - start_time) / n_iter} seconds\n") + + # Get adjusted cost ratios based on GDP per capita using vectorized approach + start_time = time.time() + for _ in range(n_iter): + result_vectorized = adjust_cost_ratios_with_gdp_vectorized(region_diff, config) + end_time = time.time() + with open("time_taken_gdp.txt", "a") as f: + f.write(f"Time taken for adjust_cost_ratios_with_gdp_vectorized: {(end_time - start_time) / n_iter} seconds\n") + + # Assert that the results are equal + assert_equal_result(result, result_vectorized) diff --git a/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py b/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py new file mode 100644 index 0000000000..75e9bcb731 --- /dev/null +++ b/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py @@ -0,0 +1,154 @@ +import time +import cProfile +import pstats + +import numpy as np +import pandas as pd +import pandas.testing as pdt +import pytest + +from message_ix_models.tools.costs import Config +from message_ix_models.tools.costs.regional_differentiation import ( + adjust_technology_mapping, + apply_regional_differentiation, + get_intratec_data, + get_raw_technology_mapping, + get_weo_data, + get_weo_data_fast, +) + + +def assert_equal_result(legacy, refactored): + if isinstance(legacy, dict) and isinstance(refactored, dict): + # Ensure the dictionaries have the same keys + assert set(legacy.keys()) == set(refactored.keys()), ( + "Dictionary keys do not match" + ) + # Recursively compare each value in the dictionary + for key in legacy: + assert_equal_result(legacy[key], refactored[key]) + elif isinstance(legacy, pd.DataFrame) and isinstance(refactored, pd.DataFrame): + legacy = legacy.sort_index(axis=1) + refactored = refactored.sort_index(axis=1) + pdt.assert_frame_equal(legacy, refactored) + elif isinstance(legacy, pd.Series) and isinstance(refactored, pd.Series): + legacy = legacy.sort_index() + refactored = refactored.sort_index() + pdt.assert_series_equal(legacy, refactored) + else: + raise ValueError( + f"Type mismatch: legacy type {type(legacy)} vs refactored type {type(refactored)}" + ) + +@pytest.mark.skip(reason="Skipping test_get_weo_data") +def test_get_weo_data() -> None: + + start_time = time.time() + result_legacy = get_weo_data() + end_time = time.time() + with open("weo_data_time.txt", "a") as f: + f.write(f"Time taken for legacy get_weo_data: {end_time - start_time} seconds\n") + start_time = time.time() + result_fast = get_weo_data_fast() + end_time = time.time() + with open("weo_data_time.txt", "a") as f: + f.write(f"Time taken for fast get_weo_data: {end_time - start_time} seconds\n") + + assert_equal_result(result_legacy, result_fast) + +@pytest.mark.skip(reason="Skipping test_get_intratec_data") +def test_get_intratec_data() -> None: + res = get_intratec_data() + + # Check that the regions of R12 are present + assert all( + [ + "R11_NAM", + "R11_LAM", + "R11_WEU", + "R11_EEU", + "R11_FSU", + "R11_AFR", + "R11_MEA", + "R11_SAS", + "R11_CPA", + "R11_PAS", + "R11_PAO", + ] + == res.node.unique() + ) + + +@pytest.mark.parametrize( + "module, t_exp, rds_exp", + ( + ("energy", {"coal_ppl", "gas_ppl", "gas_cc", "solar_res1"}, {"weo"}), + ("materials", {"biomass_NH3", "meth_h2", "furnace_foil_steel"}, {"energy"}), + ( + "cooling", + {"coal_ppl__cl_fresh", "gas_cc__air", "nuc_lc__ot_fresh"}, + {"energy"}, + ), + ), +) +@pytest.mark.skip(reason="Skipping test_get_raw_technology_mapping") +def test_get_raw_technology_mapping(module, t_exp, rds_exp) -> None: + # Function runs without error + result = get_raw_technology_mapping(module) + + # Expected technologies are present + assert t_exp <= set(result.message_technology.unique()) + + # Expected values for regional differentiation sources + assert rds_exp <= set(result.reg_diff_source.unique()) + + +@pytest.mark.parametrize("module", ("energy", "materials", "cooling")) +@pytest.mark.skip(reason="Skipping test_adjust_technology_mapping") +def test_adjust_technology_mapping(module) -> None: + energy_raw = get_raw_technology_mapping("energy") + + # Function runs without error + result = adjust_technology_mapping(module) + + # For module="energy", adjustment has no effect; output data are the same + if module == "energy": + assert energy_raw.equals(result) + + # The "energy" regional differentiation source is not present in the result data + assert "energy" not in result.reg_diff_source.unique() + + # The "weo" regional differentiation source is present in the result data + assert "weo" in result.reg_diff_source.unique() + + +@pytest.mark.parametrize( + "module, t_exp", + ( + ("energy", {"coal_ppl", "gas_ppl", "gas_cc", "solar_res1"}), + ("materials", {"biomass_NH3", "meth_h2", "furnace_foil_steel"}), + ("cooling", {"coal_ppl__cl_fresh", "gas_cc__air", "nuc_lc__ot_fresh"}), + ), +) + +#@pytest.mark.skip(reason="Skipping test_apply_regional_differentiation") +def test_apply_regional_differentiation(module, t_exp) -> None: + """Regional differentiation is applied correctly for each `module`.""" + config = Config(module=module) + + n_iter = 5 + # Function runs without error + start_time = time.time() + for _ in range(n_iter): + result = apply_regional_differentiation(config, vectorized=True) + end_time = time.time() + + with open("apply_regional_differentiation_time.txt", "a") as f: + f.write(f"Time taken for vectorized apply_regional_differentiation: {(end_time - start_time) / n_iter} seconds\n") + start_time = time.time() + for _ in range(n_iter): + result_legacy = apply_regional_differentiation(config, vectorized=False) + end_time = time.time() + with open("apply_regional_differentiation_time.txt", "a") as f: + f.write(f"Time taken for legacy apply_regional_differentiation: {(end_time - start_time) / n_iter} seconds\n") + assert_equal_result(result_legacy, result) diff --git a/message_ix_models/tools/costs/gdp.py b/message_ix_models/tools/costs/gdp.py index 21c8c5dc2f..3d4b599333 100644 --- a/message_ix_models/tools/costs/gdp.py +++ b/message_ix_models/tools/costs/gdp.py @@ -123,7 +123,7 @@ def merge(*dfs: pd.DataFrame) -> pd.DataFrame: return result -def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): +def adjust_cost_ratios_with_gdp_legacy(region_diff_df, config: Config): """Calculate adjusted region-differentiated cost ratios. This function takes in a data frame with region-differentiated cost ratios and @@ -247,3 +247,139 @@ def _constrain_cost_ratio(df: pd.DataFrame, base_year): ] ] ) + +def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): + """Calculate adjusted region-differentiated cost ratios. + + This function takes in a data frame with region-differentiated cost ratios and + calculates adjusted region-differentiated cost ratios using GDP per capita data. + + Parameters + ---------- + region_diff_df : pandas.DataFrame + Output of :func:`apply_regional_differentiation`. + config : .Config + The function responds to, or passes on to other functions, the fields: + :attr:`~.Config.base_year`, + :attr:`~.Config.node`, + :attr:`~.Config.ref_region`, + :attr:`~.Config.scenario`, and + :attr:`~.Config.scenario_version`. + + Returns + ------- + pandas.DataFrame + DataFrame with columns: + - scenario_version: scenario version + - scenario: SSP scenario + - message_technology: message technology + - region: R11, R12, or R20 region + - year + - gdp_ratio_reg_to_reference: ratio of GDP per capita in respective region to + GDP per capita in reference region. + - reg_cost_ratio_adj: adjusted region-differentiated cost ratio + """ + # Import helper functions (they remain in use) + from .projections import _maybe_query_scenario, _maybe_query_scenario_version + + # Set region context for GDP extraction + context = Context.get_instance(-1) + context.model.regions = config.node + + # Retrieve and prepare GDP data (dropping totals, converting dtypes, filtering by y0, + # reassigning scenario_version values, and applying any scenario filters) + df_gdp = ( + process_raw_ssp_data(context, config) + .query("year >= @config.y0") + .drop(columns=["total_gdp", "total_population"]) + .assign( + scenario_version=lambda x: np.where( + x.scenario_version.str.contains("2013"), + "Previous (2013)", + "Review (2023)", + ) + ) + .astype({"year": int}) + .pipe(_maybe_query_scenario, config) + .pipe(_maybe_query_scenario_version, config) + ) + + # Ensure base_year exists in GDP data; otherwise choose the earliest year and warn. + base_year = config.base_year + if base_year not in df_gdp.year.unique(): + new_base_year = df_gdp.year.min() + log.warning(f"Use year={new_base_year} GDP data as proxy for {base_year}") + base_year = new_base_year + + # --- Step 1: Calculate slope and intercept using the base-year data --- + # 1a. Subset GDP data to the base year and drop the year column. + df_base = df_gdp.query("year == @base_year").drop("year", axis=1) + + # 1b. Merge the base-year GDP data with region_diff_df to get the base "reg_cost_ratio". + df_intermediate = df_base.merge(region_diff_df, on=["region"]) + + # 1c. Calculate the slope and intercept from the base-year values. + df_intermediate["slope"] = (df_intermediate["reg_cost_ratio"] - 1) / ( + df_intermediate["gdp_ratio_reg_to_reference"] - 1 + ) + df_intermediate["intercept"] = 1 - df_intermediate["slope"] + # Drop the GDP ratio from the base data as it will be re-added from the full data. + df_intermediate = df_intermediate.drop(columns=["gdp_ratio_reg_to_reference"]) + + # --- Step 2: Merge full GDP data and compute adjusted cost ratios --- + # Merge the intermediate (base-year derived) data with the full set of GDP data; + # this adds yearly "gdp_ratio_reg_to_reference" values to each record. + df_merged = df_intermediate.merge( + df_gdp, on=["scenario_version", "scenario", "region"], how="right" + ) + # Compute the adjusted cost ratio for all rows. + df_merged["reg_cost_ratio_adj"] = df_merged["slope"] * df_merged[ + "gdp_ratio_reg_to_reference" + ] + df_merged["intercept"] + # Fill any NaNs (e.g. for the reference region) with 1.0. + df_merged["reg_cost_ratio_adj"] = df_merged["reg_cost_ratio_adj"].fillna(1.0) + + # --- Step 3: Vectorize the constrain logic that was in _constrain_cost_ratio --- + # Instead of iterating per group, we extract the base-year values for each group. + base_values = ( + df_merged.query("year == @base_year") + .loc[:, [ + "scenario_version", + "scenario", + "region", + "message_technology", + "gdp_ratio_reg_to_reference", + "reg_cost_ratio_adj", + ]] + .rename( + columns={ + "gdp_ratio_reg_to_reference": "base_gdp_ratio", + "reg_cost_ratio_adj": "base_reg_cost", + } + ) + ) + # Merge these base-year values back onto the main data. + df_merged = df_merged.merge( + base_values, + on=["scenario_version", "scenario", "region", "message_technology"], + how="left", + ) + # For groups where the base-year GDP ratio is less than 1 and the base-year cost ratio is greater than 1, + # clip the reg_cost_ratio_adj to the base-year cost ratio. + condition = (df_merged["base_gdp_ratio"] < 1) & (df_merged["base_reg_cost"] > 1) + df_merged.loc[condition, "reg_cost_ratio_adj"] = df_merged.loc[ + condition, "reg_cost_ratio_adj" + ].clip(upper=df_merged.loc[condition, "base_reg_cost"]) + + # --- Step 4: Select and return the final desired columns --- + return df_merged[ + [ + "scenario_version", + "scenario", + "message_technology", + "region", + "year", + "gdp_ratio_reg_to_reference", + "reg_cost_ratio_adj", + ] + ] diff --git a/message_ix_models/tools/costs/regional_differentiation.py b/message_ix_models/tools/costs/regional_differentiation.py index 8957475d96..a4b4ce200b 100644 --- a/message_ix_models/tools/costs/regional_differentiation.py +++ b/message_ix_models/tools/costs/regional_differentiation.py @@ -1,3 +1,4 @@ + import logging from collections.abc import Mapping from functools import lru_cache @@ -30,6 +31,112 @@ def get_weo_region_map(regions: str) -> Mapping[str, str]: # Map from the child's (node's) ID to the value of the "iea-weo-region" annotation return {n.id: str(n.get_annotation(id="iea-weo-region").text) for n in nodes} +def get_weo_data_fast() -> pd.DataFrame: + """Read in raw WEO investment/capital costs and O&M costs data. + + Returns + ------- + pandas.DataFrame + DataFrame with columns: + + - cost_type: investment or fixed O&M cost + - weo_technology: WEO technology name + - weo_region: WEO region + - year: year + - value: cost value + """ + + # Dict of all technologies, their Excel sheet name, and the starting row + DICT_TECH_ROWS = { + "bioenergy_ccus": ["Renewables", 99], + "bioenergy_cofiring": ["Renewables", 79], + "bioenergy_large": ["Renewables", 69], + "bioenergy_medium_chp": ["Renewables", 89], + "ccgt": ["Gas", 9], + "ccgt_ccs": ["Fossil fuels equipped with CCUS", 29], + "ccgt_chp": ["Gas", 29], + "csp": ["Renewables", 109], + "fuel_cell": ["Gas", 39], + "gas_turbine": ["Gas", 19], + "geothermal": ["Renewables", 119], + "hydropower_large": ["Renewables", 49], + "hydropower_small": ["Renewables", 59], + "igcc": ["Coal", 39], + "igcc_ccs": ["Fossil fuels equipped with CCUS", 19], + "marine": ["Renewables", 129], + "nuclear": ["Nuclear", 9], + "pulverized_coal_ccs": ["Fossil fuels equipped with CCUS", 9], + "solarpv_buildings": ["Renewables", 19], + "solarpv_large": ["Renewables", 9], + "steam_coal_subcritical": ["Coal", 9], + "steam_coal_supercritical": ["Coal", 19], + "steam_coal_ultrasupercritical": ["Coal", 29], + "wind_offshore": ["Renewables", 39], + "wind_onshore": ["Renewables", 29], + } + + # Dict of cost types to read and columns specification + DICT_COST_COLS = {"inv_cost": "A,B:D", "fix_cost": "A,F:H"} + + # Set the file path for the raw IEA WEO cost data Excel file + file_path = package_data_path( + "iea", "WEO_2023_PG_Assumptions_STEPSandNZE_Scenario.xlsx" + ) + + # Retrieve conversion factor from 2022 USD to 2005 USD + conversion_factor = registry("1.0 USD_2022").to("USD_2005").magnitude + + dfs_cost = [] + # Open the Excel file once for all reads + with pd.ExcelFile(file_path) as xls: + # Loop over each technology and cost type combination + for tech_key, cost_key in product(DICT_TECH_ROWS, DICT_COST_COLS): + sheet_name, skiprow_val = DICT_TECH_ROWS[tech_key] + cols_range = DICT_COST_COLS[cost_key] + # Read data with na_values so "n.a." becomes NaN + df = ( + pd.read_excel( + xls, + sheet_name=sheet_name, + header=None, + skiprows=skiprow_val, + nrows=9, + usecols=cols_range, + na_values=["n.a."], + ) + .set_axis(["weo_region", "2022", "2030", "2050"], axis=1) + .melt(id_vars=["weo_region"], var_name="year", value_name="value") + .assign( + weo_technology=tech_key, + cost_type=cost_key, + units="usd_per_kw", + ) + .reindex( + [ + "cost_type", + "weo_technology", + "weo_region", + "year", + "units", + "value", + ], + axis=1, + ) + .assign(value=lambda x: x.value * conversion_factor) + ) + dfs_cost.append(df) + + # By setting ignore_index=True we create a clean, continuous RangeIndex + all_cost_df = pd.concat(dfs_cost, ignore_index=True) + + # Replace missing values with the median cost per technology and cost type. + all_cost_df["value"] = all_cost_df.groupby( + ["weo_technology", "cost_type"] + )["value"].transform(lambda x: x.fillna(x.median())) + + return all_cost_df + + def get_weo_data() -> pd.DataFrame: """Read in raw WEO investment/capital costs and O&M costs data. @@ -562,6 +669,107 @@ def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: return df_cost_ratios +def get_weo_regional_differentiation_vectorized(config: "Config") -> pd.DataFrame: + """Apply WEO regional differentiation. + + 1. Retrieve WEO data using :func:`.get_weo_data`. + 2. Map data to MESSAGEix-GLOBIOM regions according to the :attr:`.Config.node`. + 3. Calculate cost ratios for each region relative to the + :attr:`~.Config.ref_region`. + + Parameters + ---------- + config : .Config + The function responds to the fields: + :attr:`~.Config.base_year`, + :attr:`~.Config.node`, and + :attr:`~.Config.ref_region`. + + Returns + ------- + pandas.DataFrame + DataFrame with columns: + + - message_technology: MESSAGEix technology name + - region: MESSAGEix region + - weo_ref_region_cost: WEO cost in reference region + - reg_cost_ratio: regional cost ratio relative to reference region + - weo_fix_ratio: fixed O&M cost to investment cost ratio + """ + + # Retrieve the full set of WEO data and focus on investment (and later fixed O&M) costs + df_weo = get_weo_data_fast() + + # Even if config.base_year is greater than 2022, use 2022 WEO values + sel_year = "2022" + log.info("…using year " + sel_year + " data from WEO") + + # --- FIX: Use a merge instead of dict inversion --- + # Get mapping: keys are MESSAGEix region IDs (message_node), and values are WEO region names. + mapping = get_weo_region_map(config.node) + # Create a DataFrame from mapping so that duplicate WEO regions for different nodes are preserved. + map_df = pd.DataFrame(list(mapping.items()), columns=["region", "weo_region"]) + # Filter the WEO data for the selected year + df_weo_sel = df_weo[df_weo["year"] == sel_year].copy() + # Merge the mapping DataFrame with the filtered WEO data on the "weo_region" column. + df_sel_weo = map_df.merge(df_weo_sel, on="weo_region", how="inner") + # Rename the cost column to be consistent with later code and reindex columns in the legacy order. + df_sel_weo = df_sel_weo.rename(columns={"value": "weo_cost"}) + df_sel_weo = df_sel_weo.reindex( + ["cost_type", "weo_technology", "weo_region", "region", "year", "weo_cost"], + axis=1, + ) + + # Verify that the specified reference region is contained in the data (use uppercase). + assert config.ref_region is not None + ref_region = config.ref_region.upper() + if ref_region not in df_sel_weo["region"].unique(): + raise ValueError( + f"Reference region {ref_region} not found in WEO data. " + "Please specify a different reference region. " + f"Available regions are: {df_sel_weo['region'].unique()}" + ) + + # === Calculate regional investment cost ratio relative to reference region === + # Get rows with investment costs for the reference region. + mask_inv = df_sel_weo["cost_type"] == "inv_cost" + mask_ref = mask_inv & (df_sel_weo["region"] == ref_region) + df_ref = ( + df_sel_weo.loc[mask_ref, ["weo_technology", "year", "weo_cost"]] + .rename(columns={"weo_cost": "weo_ref_region_cost"}) + ) + # Get all rows with investment cost irrespective of the region. + inv_df = df_sel_weo.loc[mask_inv, ["weo_technology", "year", "weo_cost", "region"]] + # Merge to get the reference cost for each technology and year. + df_reg_ratios = inv_df.merge(df_ref, on=["weo_technology", "year"]) + df_reg_ratios["reg_cost_ratio"] = df_reg_ratios["weo_cost"] / df_reg_ratios["weo_ref_region_cost"] + df_reg_ratios = df_reg_ratios[["weo_technology", "region", "weo_ref_region_cost", "reg_cost_ratio"]] + + # === Calculate fixed O&M cost ratio relative to investment cost === + # Extract investment costs for the selected year. + df_inv = ( + df_sel_weo.query("cost_type == 'inv_cost' and year == @sel_year") + .rename(columns={"weo_cost": "inv_cost"}) + .drop(columns=["year", "cost_type"]) + ) + # Extract fixed O&M costs for the selected year. + df_fix = ( + df_sel_weo.query("cost_type == 'fix_cost' and year == @sel_year") + .rename(columns={"weo_cost": "fix_cost"}) + .drop(columns=["year", "cost_type"]) + ) + # Merge investment and fixed O&M costs based on technology and region. + df_fom_inv = ( + df_inv.merge(df_fix, on=["weo_technology", "weo_region", "region"]) + .assign(weo_fix_ratio=lambda x: x.fix_cost / x.inv_cost) + .drop(columns=["inv_cost", "fix_cost", "weo_region"]) + ) + + # === Combine the two ratios together === + df_cost_ratios = df_reg_ratios.merge(df_fom_inv, on=["weo_technology", "region"]) + + return df_cost_ratios + def get_intratec_regional_differentiation(node: str, ref_region: str) -> pd.DataFrame: """Apply Intratec regional differentiation. @@ -638,7 +846,7 @@ def get_intratec_regional_differentiation(node: str, ref_region: str) -> pd.Data return df_reg_ratios -def apply_regional_differentiation(config: "Config") -> pd.DataFrame: +def apply_regional_differentiation(config: "Config", vectorized: bool = True) -> pd.DataFrame: """Apply regional differentiation depending on mapping source. 1. Retrieve an adjusted technology mapping from :func:`.adjust_technology_mapping`. @@ -675,7 +883,10 @@ def apply_regional_differentiation(config: "Config") -> pd.DataFrame: """ df_map = adjust_technology_mapping(config.module) assert config.ref_region is not None - df_weo = get_weo_regional_differentiation(config) + if vectorized: + df_weo = get_weo_regional_differentiation_vectorized(config) + else: + df_weo = get_weo_regional_differentiation(config) df_intratec = get_intratec_regional_differentiation(config.node, config.ref_region) # Get mapping of technologies From 24de69dc77e700a15e4aa8651ca1d6a0f663387b Mon Sep 17 00:00:00 2001 From: Wega Date: Thu, 10 Apr 2025 13:47:16 +0200 Subject: [PATCH 2/4] Revert "Simplify path handling in SSP{Original,Update}" This reverts commit f886d2d9a4f96b3d9ad844ac216b531c25a2c47b. --- message_ix_models/project/ssp/data.py | 60 +++++++++++++++++---------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/message_ix_models/project/ssp/data.py b/message_ix_models/project/ssp/data.py index 9975612feb..88a2264b66 100644 --- a/message_ix_models/project/ssp/data.py +++ b/message_ix_models/project/ssp/data.py @@ -1,8 +1,10 @@ import logging +from platformdirs import user_cache_path + from message_ix_models.tools.exo_data import ExoDataSource, register_source from message_ix_models.tools.iamc import iamc_like_data_for_query -from message_ix_models.util import path_fallback +from message_ix_models.util import package_data_path, private_data_path __all__ = [ "SSPOriginal", @@ -11,18 +13,6 @@ log = logging.getLogger(__name__) -#: :py:`where` argument to :func:`path_fallback`, used by both :class:`.SSPOriginal` and -#: :class:`.SSPUpdate`. In order: -#: -#: 1. Currently data is stored in message-static-data, cloned and linked from within the -#: user's 'local' data directory. -#: 2. Previously some files were stored directly within message_ix_models (available in -#: an editable install from a clone of the git repository, 'package') or in -#: :mod:`message_data` ('private'). These settings are only provided for backward -#: compatibility. -#: 3. If the above are not available, use the fuzzed/random test data ('test'). -WHERE = "local package private test" - @register_source class SSPOriginal(ExoDataSource): @@ -103,17 +93,25 @@ def __init__(self, source, source_kw): self.raise_on_extra_kw(source_kw) - # Identify input data path - self.path = path_fallback("ssp", self.filename, where=WHERE) - if "test" in self.path.parts: - log.warning(f"Read random data from {self.path}") - # Assemble a query string extra = "d" if ssp_id == "4" and model == "IIASA-WiC POP" else "" self.query = ( f"SCENARIO == 'SSP{ssp_id}{extra}_v9_{date}' and VARIABLE == '{measure}'" + (f" and MODEL == '{model}'" if model else "") ) + # log.debug(query) + + # Iterate over possible locations for the data file + dirs = [private_data_path("ssp"), package_data_path("test", "ssp")] + for path in [d.joinpath(self.filename) for d in dirs]: + if not path.exists(): + log.info(f"Not found: {path}") + continue + if "test" in path.parts: + log.warning(f"Reading random data from {path}") + break + + self.path = path def __call__(self): # Use prepared path, query, and replacements @@ -187,6 +185,12 @@ def __init__(self, source, source_kw): scenarios = [] if release in ("3.1", "3.0.1", "3.0"): + # Directories in which to locate `self.filename`: + # - User's local cache (retrieved with "mix-models fetch" or equivalent). + # - Stored directly within message_ix_models (editable install from a clone + # of the git repository). + dirs = [user_cache_path("message-ix-models"), package_data_path("ssp")] + scenarios.append(f"SSP{ssp_id}") if measure == "GDP|PPP": @@ -199,6 +203,9 @@ def __init__(self, source, source_kw): Scenario={"Historical Reference": scenarios[0]}, ) elif release == "preview": + # Look first in message_data, then in message_ix_models test data + dirs = [private_data_path("ssp"), package_data_path("test", "ssp")] + models.extend([model] if model is not None else []) scenarios.append(f"SSP{ssp_id} - Review Phase 1") else: @@ -208,15 +215,22 @@ def __init__(self, source, source_kw): ) raise ValueError(release) - # Identify input data path - self.path = path_fallback("ssp", self.filename[release], where=WHERE) - if "test" in self.path.parts: - log.warning(f"Read random data from {self.path}") - # Assemble and store a query string self.query = f"Scenario in {scenarios!r} and Variable == '{measure}'" + ( f"and Model in {models!r}" if models else "" ) + # log.info(f"{self.query = }") + + # Iterate over possible locations for the data file + for path in [d.joinpath(self.filename[release]) for d in dirs]: + if not path.exists(): + log.info(f"Not found: {path}") + continue + if "test" in path.parts: + log.warning(f"Reading random data from {path}") + break + + self.path = path def __call__(self): # Use prepared path, query, and replacements From 4629ddf9b8e8b50a804cf8892608aa82fbde1015 Mon Sep 17 00:00:00 2001 From: Wega Date: Thu, 10 Apr 2025 17:57:42 +0200 Subject: [PATCH 3/4] Vectorized test_adjust_cost_ratios_with_gdp and get_weo_data_fast added and tested --- .../tests/tools/costs/test_gdp_parity.py | 65 ++------ .../test_regional_differentiation_parity.py | 117 ++------------ message_ix_models/tools/costs/gdp.py | 33 ++-- .../tools/costs/regional_differentiation.py | 144 ++++-------------- 4 files changed, 77 insertions(+), 282 deletions(-) diff --git a/message_ix_models/tests/tools/costs/test_gdp_parity.py b/message_ix_models/tests/tools/costs/test_gdp_parity.py index ea45e77261..3a40533f5a 100644 --- a/message_ix_models/tests/tools/costs/test_gdp_parity.py +++ b/message_ix_models/tests/tools/costs/test_gdp_parity.py @@ -3,17 +3,11 @@ import pandas as pd import pandas.testing as pdt import pytest -from sdmx.model.common import Code -from message_ix_models.model.structure import get_codes from message_ix_models.tools.costs import Config from message_ix_models.tools.costs.gdp import ( adjust_cost_ratios_with_gdp, - adjust_cost_ratios_with_gdp_vectorized, - process_raw_ssp_data, -) -from message_ix_models.tools.costs.gdp import ( - process_raw_ssp_data as process_raw_ssp_data_legacy, + adjust_cost_ratios_with_gdp_legacy, ) from message_ix_models.tools.costs.regional_differentiation import ( apply_regional_differentiation, @@ -39,44 +33,11 @@ def assert_equal_result(legacy, refactored): pdt.assert_series_equal(legacy, refactored) else: raise ValueError( - f"Type mismatch: legacy type {type(legacy)} vs refactored type {type(refactored)}" + f"Type mismatch: legacy type {type(legacy)} vs " + f"refactored type {type(refactored)}" ) - -#@pytest.mark.skip(reason="Skipping test_process_raw_ssp_data") -@pytest.mark.parametrize("node", ("R11", "R12")) -def test_process_raw_ssp_data(test_context, node) -> None: - # Set the "regions" value on the context - test_context.model.regions = node - config = Config(node=node) - - # Retrieve list of node IDs - nodes = get_codes(f"node/{node}") - # Convert to string - regions = set(map(str, nodes[nodes.index(Code(id="World"))].child)) - - # Function runs - # - context is ignored by process_raw_ssp_data - # - node is ignored by process_raw_ssp_data1 - n_iter = 1 - start_time = time.time() - for _ in range(n_iter): - result = process_raw_ssp_data(context=test_context, config=config) - end_time = time.time() - with open("time_taken_gdp.txt", "a") as f: - f.write(f"Time taken for process_raw_ssp_data: {(end_time - start_time) / n_iter} seconds\n") - - start_time = time.time() - for _ in range(n_iter): - result_legacy = process_raw_ssp_data_legacy(context=test_context, config=config) - end_time = time.time() - with open("time_taken_gdp.txt", "a") as f: - f.write(f"Time taken for process_raw_ssp_data_legacy: {(end_time - start_time) / n_iter} seconds\n") - - # Assert that the results are equal - assert_equal_result(result, result_legacy) - -@pytest.mark.skip(reason="Skipping test_adjust_cost_ratios_with_gdp") +#@pytest.mark.skip(reason="Skipping test_adjust_cost_ratios_with_gdp") @pytest.mark.parametrize("module", ("energy", "materials", "cooling")) def test_adjust_cost_ratios_with_gdp(test_context, module) -> None: # Set parameters @@ -87,22 +48,28 @@ def test_adjust_cost_ratios_with_gdp(test_context, module) -> None: # Get regional differentiation region_diff = apply_regional_differentiation(config) - n_iter = 10 + n_iter = 5 # Get adjusted cost ratios based on GDP per capita start_time = time.time() for _ in range(n_iter): - result = adjust_cost_ratios_with_gdp(region_diff, config) + result_legacy = adjust_cost_ratios_with_gdp_legacy(region_diff, config) end_time = time.time() with open("time_taken_gdp.txt", "a") as f: - f.write(f"Time taken for adjust_cost_ratios_with_gdp: {(end_time - start_time) / n_iter} seconds\n") + f.write( + f"Time taken for adjust_cost_ratios_with_gdp: " + f"{(end_time - start_time) / n_iter} seconds\n" + ) # Get adjusted cost ratios based on GDP per capita using vectorized approach start_time = time.time() for _ in range(n_iter): - result_vectorized = adjust_cost_ratios_with_gdp_vectorized(region_diff, config) + result_vectorized = adjust_cost_ratios_with_gdp(region_diff, config) end_time = time.time() with open("time_taken_gdp.txt", "a") as f: - f.write(f"Time taken for adjust_cost_ratios_with_gdp_vectorized: {(end_time - start_time) / n_iter} seconds\n") + f.write( + f"Time taken for adjust_cost_ratios_with_gdp_vectorized:" + f"{(end_time - start_time) / n_iter} seconds\n" + ) # Assert that the results are equal - assert_equal_result(result, result_vectorized) + assert_equal_result(result_legacy, result_vectorized) diff --git a/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py b/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py index 75e9bcb731..07e6ba64e8 100644 --- a/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py +++ b/message_ix_models/tests/tools/costs/test_regional_differentiation_parity.py @@ -1,6 +1,4 @@ import time -import cProfile -import pstats import numpy as np import pandas as pd @@ -9,10 +7,7 @@ from message_ix_models.tools.costs import Config from message_ix_models.tools.costs.regional_differentiation import ( - adjust_technology_mapping, apply_regional_differentiation, - get_intratec_data, - get_raw_technology_mapping, get_weo_data, get_weo_data_fast, ) @@ -40,115 +35,23 @@ def assert_equal_result(legacy, refactored): f"Type mismatch: legacy type {type(legacy)} vs refactored type {type(refactored)}" ) -@pytest.mark.skip(reason="Skipping test_get_weo_data") +#@pytest.mark.skip(reason="Skipping test_get_weo_data") def test_get_weo_data() -> None: - + n_iter = 5 start_time = time.time() - result_legacy = get_weo_data() + for _ in range(n_iter): + result_legacy = get_weo_data() end_time = time.time() with open("weo_data_time.txt", "a") as f: - f.write(f"Time taken for legacy get_weo_data: {end_time - start_time} seconds\n") + f.write(f"Time taken for legacy get_weo_data:" + f"{(end_time - start_time) / n_iter} seconds\n") start_time = time.time() - result_fast = get_weo_data_fast() + for _ in range(n_iter): + result_fast = get_weo_data_fast() end_time = time.time() with open("weo_data_time.txt", "a") as f: - f.write(f"Time taken for fast get_weo_data: {end_time - start_time} seconds\n") + f.write(f"Time taken for fast get_weo_data:" + f"{(end_time - start_time) / n_iter} seconds\n") assert_equal_result(result_legacy, result_fast) -@pytest.mark.skip(reason="Skipping test_get_intratec_data") -def test_get_intratec_data() -> None: - res = get_intratec_data() - - # Check that the regions of R12 are present - assert all( - [ - "R11_NAM", - "R11_LAM", - "R11_WEU", - "R11_EEU", - "R11_FSU", - "R11_AFR", - "R11_MEA", - "R11_SAS", - "R11_CPA", - "R11_PAS", - "R11_PAO", - ] - == res.node.unique() - ) - - -@pytest.mark.parametrize( - "module, t_exp, rds_exp", - ( - ("energy", {"coal_ppl", "gas_ppl", "gas_cc", "solar_res1"}, {"weo"}), - ("materials", {"biomass_NH3", "meth_h2", "furnace_foil_steel"}, {"energy"}), - ( - "cooling", - {"coal_ppl__cl_fresh", "gas_cc__air", "nuc_lc__ot_fresh"}, - {"energy"}, - ), - ), -) -@pytest.mark.skip(reason="Skipping test_get_raw_technology_mapping") -def test_get_raw_technology_mapping(module, t_exp, rds_exp) -> None: - # Function runs without error - result = get_raw_technology_mapping(module) - - # Expected technologies are present - assert t_exp <= set(result.message_technology.unique()) - - # Expected values for regional differentiation sources - assert rds_exp <= set(result.reg_diff_source.unique()) - - -@pytest.mark.parametrize("module", ("energy", "materials", "cooling")) -@pytest.mark.skip(reason="Skipping test_adjust_technology_mapping") -def test_adjust_technology_mapping(module) -> None: - energy_raw = get_raw_technology_mapping("energy") - - # Function runs without error - result = adjust_technology_mapping(module) - - # For module="energy", adjustment has no effect; output data are the same - if module == "energy": - assert energy_raw.equals(result) - - # The "energy" regional differentiation source is not present in the result data - assert "energy" not in result.reg_diff_source.unique() - - # The "weo" regional differentiation source is present in the result data - assert "weo" in result.reg_diff_source.unique() - - -@pytest.mark.parametrize( - "module, t_exp", - ( - ("energy", {"coal_ppl", "gas_ppl", "gas_cc", "solar_res1"}), - ("materials", {"biomass_NH3", "meth_h2", "furnace_foil_steel"}), - ("cooling", {"coal_ppl__cl_fresh", "gas_cc__air", "nuc_lc__ot_fresh"}), - ), -) - -#@pytest.mark.skip(reason="Skipping test_apply_regional_differentiation") -def test_apply_regional_differentiation(module, t_exp) -> None: - """Regional differentiation is applied correctly for each `module`.""" - config = Config(module=module) - - n_iter = 5 - # Function runs without error - start_time = time.time() - for _ in range(n_iter): - result = apply_regional_differentiation(config, vectorized=True) - end_time = time.time() - - with open("apply_regional_differentiation_time.txt", "a") as f: - f.write(f"Time taken for vectorized apply_regional_differentiation: {(end_time - start_time) / n_iter} seconds\n") - start_time = time.time() - for _ in range(n_iter): - result_legacy = apply_regional_differentiation(config, vectorized=False) - end_time = time.time() - with open("apply_regional_differentiation_time.txt", "a") as f: - f.write(f"Time taken for legacy apply_regional_differentiation: {(end_time - start_time) / n_iter} seconds\n") - assert_equal_result(result_legacy, result) diff --git a/message_ix_models/tools/costs/gdp.py b/message_ix_models/tools/costs/gdp.py index 3d4b599333..ec7d7df9e2 100644 --- a/message_ix_models/tools/costs/gdp.py +++ b/message_ix_models/tools/costs/gdp.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd from genno import KeySeq +from scipy.stats import linregress from message_ix_models import Context @@ -248,6 +249,7 @@ def _constrain_cost_ratio(df: pd.DataFrame, base_year): ] ) + def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): """Calculate adjusted region-differentiated cost ratios. @@ -278,6 +280,11 @@ def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): - gdp_ratio_reg_to_reference: ratio of GDP per capita in respective region to GDP per capita in reference region. - reg_cost_ratio_adj: adjusted region-differentiated cost ratio + + Differences from the legacy function: + - Uses vectorized DataFrame operations to compute slope and intercept from base-year data, reducing reliance on iterative group processing. + - Merges base-year GDP values directly to compute and constrain the adjusted cost ratios. + - Eliminates the need for an explicit group-wise constraint function by applying clipping conditions directly on the merged data. """ # Import helper functions (they remain in use) from .projections import _maybe_query_scenario, _maybe_query_scenario_version @@ -333,9 +340,10 @@ def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): df_gdp, on=["scenario_version", "scenario", "region"], how="right" ) # Compute the adjusted cost ratio for all rows. - df_merged["reg_cost_ratio_adj"] = df_merged["slope"] * df_merged[ - "gdp_ratio_reg_to_reference" - ] + df_merged["intercept"] + df_merged["reg_cost_ratio_adj"] = ( + df_merged["slope"] * df_merged["gdp_ratio_reg_to_reference"] + + df_merged["intercept"] + ) # Fill any NaNs (e.g. for the reference region) with 1.0. df_merged["reg_cost_ratio_adj"] = df_merged["reg_cost_ratio_adj"].fillna(1.0) @@ -343,14 +351,17 @@ def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): # Instead of iterating per group, we extract the base-year values for each group. base_values = ( df_merged.query("year == @base_year") - .loc[:, [ - "scenario_version", - "scenario", - "region", - "message_technology", - "gdp_ratio_reg_to_reference", - "reg_cost_ratio_adj", - ]] + .loc[ + :, + [ + "scenario_version", + "scenario", + "region", + "message_technology", + "gdp_ratio_reg_to_reference", + "reg_cost_ratio_adj", + ], + ] .rename( columns={ "gdp_ratio_reg_to_reference": "base_gdp_ratio", diff --git a/message_ix_models/tools/costs/regional_differentiation.py b/message_ix_models/tools/costs/regional_differentiation.py index a4b4ce200b..bf2d8db769 100644 --- a/message_ix_models/tools/costs/regional_differentiation.py +++ b/message_ix_models/tools/costs/regional_differentiation.py @@ -1,4 +1,3 @@ - import logging from collections.abc import Mapping from functools import lru_cache @@ -31,6 +30,7 @@ def get_weo_region_map(regions: str) -> Mapping[str, str]: # Map from the child's (node's) ID to the value of the "iea-weo-region" annotation return {n.id: str(n.get_annotation(id="iea-weo-region").text) for n in nodes} + def get_weo_data_fast() -> pd.DataFrame: """Read in raw WEO investment/capital costs and O&M costs data. @@ -38,12 +38,17 @@ def get_weo_data_fast() -> pd.DataFrame: ------- pandas.DataFrame DataFrame with columns: - + - cost_type: investment or fixed O&M cost - weo_technology: WEO technology name - weo_region: WEO region - year: year - value: cost value + + Changes from get_weo_data: + This function opens the Excel file once using pd.ExcelFile, + reusing the file handle to read data from multiple sheets. + Reducing IO overhead from repeated file access. """ # Dict of all technologies, their Excel sheet name, and the starting row @@ -130,14 +135,13 @@ def get_weo_data_fast() -> pd.DataFrame: all_cost_df = pd.concat(dfs_cost, ignore_index=True) # Replace missing values with the median cost per technology and cost type. - all_cost_df["value"] = all_cost_df.groupby( - ["weo_technology", "cost_type"] - )["value"].transform(lambda x: x.fillna(x.median())) + all_cost_df["value"] = all_cost_df.groupby(["weo_technology", "cost_type"])[ + "value" + ].transform(lambda x: x.fillna(x.median())) return all_cost_df - def get_weo_data() -> pd.DataFrame: """Read in raw WEO investment/capital costs and O&M costs data. @@ -552,7 +556,9 @@ def adjust_technology_mapping( return module_all -def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: +def get_weo_regional_differentiation( + config: "Config", flag_fast: bool = True, +) -> pd.DataFrame: """Apply WEO regional differentiation. 1. Retrieve WEO data using :func:`.get_weo_data`. @@ -567,6 +573,8 @@ def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: :attr:`~.Config.base_year`, :attr:`~.Config.node`, and :attr:`~.Config.ref_region`. + flag_fast : bool + If True calls :func:`.get_weo_data_fast`, otherwise calls :func:`.get_weo_data`. Returns ------- @@ -580,7 +588,10 @@ def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: """ # Grab WEO data and keep only investment costs - df_weo = get_weo_data() + if flag_fast: + df_weo = get_weo_data_fast() # Using the faster version. + else: + df_weo = get_weo_data() # Even if config.base_year is greater than 2022, use 2022 WEO values sel_year = str(2022) @@ -669,107 +680,6 @@ def get_weo_regional_differentiation(config: "Config") -> pd.DataFrame: return df_cost_ratios -def get_weo_regional_differentiation_vectorized(config: "Config") -> pd.DataFrame: - """Apply WEO regional differentiation. - - 1. Retrieve WEO data using :func:`.get_weo_data`. - 2. Map data to MESSAGEix-GLOBIOM regions according to the :attr:`.Config.node`. - 3. Calculate cost ratios for each region relative to the - :attr:`~.Config.ref_region`. - - Parameters - ---------- - config : .Config - The function responds to the fields: - :attr:`~.Config.base_year`, - :attr:`~.Config.node`, and - :attr:`~.Config.ref_region`. - - Returns - ------- - pandas.DataFrame - DataFrame with columns: - - - message_technology: MESSAGEix technology name - - region: MESSAGEix region - - weo_ref_region_cost: WEO cost in reference region - - reg_cost_ratio: regional cost ratio relative to reference region - - weo_fix_ratio: fixed O&M cost to investment cost ratio - """ - - # Retrieve the full set of WEO data and focus on investment (and later fixed O&M) costs - df_weo = get_weo_data_fast() - - # Even if config.base_year is greater than 2022, use 2022 WEO values - sel_year = "2022" - log.info("…using year " + sel_year + " data from WEO") - - # --- FIX: Use a merge instead of dict inversion --- - # Get mapping: keys are MESSAGEix region IDs (message_node), and values are WEO region names. - mapping = get_weo_region_map(config.node) - # Create a DataFrame from mapping so that duplicate WEO regions for different nodes are preserved. - map_df = pd.DataFrame(list(mapping.items()), columns=["region", "weo_region"]) - # Filter the WEO data for the selected year - df_weo_sel = df_weo[df_weo["year"] == sel_year].copy() - # Merge the mapping DataFrame with the filtered WEO data on the "weo_region" column. - df_sel_weo = map_df.merge(df_weo_sel, on="weo_region", how="inner") - # Rename the cost column to be consistent with later code and reindex columns in the legacy order. - df_sel_weo = df_sel_weo.rename(columns={"value": "weo_cost"}) - df_sel_weo = df_sel_weo.reindex( - ["cost_type", "weo_technology", "weo_region", "region", "year", "weo_cost"], - axis=1, - ) - - # Verify that the specified reference region is contained in the data (use uppercase). - assert config.ref_region is not None - ref_region = config.ref_region.upper() - if ref_region not in df_sel_weo["region"].unique(): - raise ValueError( - f"Reference region {ref_region} not found in WEO data. " - "Please specify a different reference region. " - f"Available regions are: {df_sel_weo['region'].unique()}" - ) - - # === Calculate regional investment cost ratio relative to reference region === - # Get rows with investment costs for the reference region. - mask_inv = df_sel_weo["cost_type"] == "inv_cost" - mask_ref = mask_inv & (df_sel_weo["region"] == ref_region) - df_ref = ( - df_sel_weo.loc[mask_ref, ["weo_technology", "year", "weo_cost"]] - .rename(columns={"weo_cost": "weo_ref_region_cost"}) - ) - # Get all rows with investment cost irrespective of the region. - inv_df = df_sel_weo.loc[mask_inv, ["weo_technology", "year", "weo_cost", "region"]] - # Merge to get the reference cost for each technology and year. - df_reg_ratios = inv_df.merge(df_ref, on=["weo_technology", "year"]) - df_reg_ratios["reg_cost_ratio"] = df_reg_ratios["weo_cost"] / df_reg_ratios["weo_ref_region_cost"] - df_reg_ratios = df_reg_ratios[["weo_technology", "region", "weo_ref_region_cost", "reg_cost_ratio"]] - - # === Calculate fixed O&M cost ratio relative to investment cost === - # Extract investment costs for the selected year. - df_inv = ( - df_sel_weo.query("cost_type == 'inv_cost' and year == @sel_year") - .rename(columns={"weo_cost": "inv_cost"}) - .drop(columns=["year", "cost_type"]) - ) - # Extract fixed O&M costs for the selected year. - df_fix = ( - df_sel_weo.query("cost_type == 'fix_cost' and year == @sel_year") - .rename(columns={"weo_cost": "fix_cost"}) - .drop(columns=["year", "cost_type"]) - ) - # Merge investment and fixed O&M costs based on technology and region. - df_fom_inv = ( - df_inv.merge(df_fix, on=["weo_technology", "weo_region", "region"]) - .assign(weo_fix_ratio=lambda x: x.fix_cost / x.inv_cost) - .drop(columns=["inv_cost", "fix_cost", "weo_region"]) - ) - - # === Combine the two ratios together === - df_cost_ratios = df_reg_ratios.merge(df_fom_inv, on=["weo_technology", "region"]) - - return df_cost_ratios - def get_intratec_regional_differentiation(node: str, ref_region: str) -> pd.DataFrame: """Apply Intratec regional differentiation. @@ -846,7 +756,10 @@ def get_intratec_regional_differentiation(node: str, ref_region: str) -> pd.Data return df_reg_ratios -def apply_regional_differentiation(config: "Config", vectorized: bool = True) -> pd.DataFrame: +def apply_regional_differentiation( + config: "Config", + flag_fast: bool = True, +) -> pd.DataFrame: """Apply regional differentiation depending on mapping source. 1. Retrieve an adjusted technology mapping from :func:`.adjust_technology_mapping`. @@ -865,7 +778,10 @@ def apply_regional_differentiation(config: "Config", vectorized: bool = True) -> :attr:`~.Config.module`, :attr:`~.Config.node`, and :attr:`~.Config.ref_region`. - + flag_fast : bool + If True is passed to :func:`.get_weo_regional_differentiation`, + then :func:`.get_weo_data_fast` is called, + otherwise :func:`.get_weo_data` is called. Returns ------- pandas.DataFrame @@ -883,10 +799,8 @@ def apply_regional_differentiation(config: "Config", vectorized: bool = True) -> """ df_map = adjust_technology_mapping(config.module) assert config.ref_region is not None - if vectorized: - df_weo = get_weo_regional_differentiation_vectorized(config) - else: - df_weo = get_weo_regional_differentiation(config) + + df_weo = get_weo_regional_differentiation(config, flag_fast) df_intratec = get_intratec_regional_differentiation(config.node, config.ref_region) # Get mapping of technologies From 79376c7e83d1b6f04fb86a1e486da881bd14be9c Mon Sep 17 00:00:00 2001 From: Wega Date: Thu, 10 Apr 2025 18:01:08 +0200 Subject: [PATCH 4/4] Reapply "Simplify path handling in SSP{Original,Update}" This reverts commit 24de69dc77e700a15e4aa8651ca1d6a0f663387b. --- message_ix_models/project/ssp/data.py | 60 ++++++++++----------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/message_ix_models/project/ssp/data.py b/message_ix_models/project/ssp/data.py index 88a2264b66..9975612feb 100644 --- a/message_ix_models/project/ssp/data.py +++ b/message_ix_models/project/ssp/data.py @@ -1,10 +1,8 @@ import logging -from platformdirs import user_cache_path - from message_ix_models.tools.exo_data import ExoDataSource, register_source from message_ix_models.tools.iamc import iamc_like_data_for_query -from message_ix_models.util import package_data_path, private_data_path +from message_ix_models.util import path_fallback __all__ = [ "SSPOriginal", @@ -13,6 +11,18 @@ log = logging.getLogger(__name__) +#: :py:`where` argument to :func:`path_fallback`, used by both :class:`.SSPOriginal` and +#: :class:`.SSPUpdate`. In order: +#: +#: 1. Currently data is stored in message-static-data, cloned and linked from within the +#: user's 'local' data directory. +#: 2. Previously some files were stored directly within message_ix_models (available in +#: an editable install from a clone of the git repository, 'package') or in +#: :mod:`message_data` ('private'). These settings are only provided for backward +#: compatibility. +#: 3. If the above are not available, use the fuzzed/random test data ('test'). +WHERE = "local package private test" + @register_source class SSPOriginal(ExoDataSource): @@ -93,25 +103,17 @@ def __init__(self, source, source_kw): self.raise_on_extra_kw(source_kw) + # Identify input data path + self.path = path_fallback("ssp", self.filename, where=WHERE) + if "test" in self.path.parts: + log.warning(f"Read random data from {self.path}") + # Assemble a query string extra = "d" if ssp_id == "4" and model == "IIASA-WiC POP" else "" self.query = ( f"SCENARIO == 'SSP{ssp_id}{extra}_v9_{date}' and VARIABLE == '{measure}'" + (f" and MODEL == '{model}'" if model else "") ) - # log.debug(query) - - # Iterate over possible locations for the data file - dirs = [private_data_path("ssp"), package_data_path("test", "ssp")] - for path in [d.joinpath(self.filename) for d in dirs]: - if not path.exists(): - log.info(f"Not found: {path}") - continue - if "test" in path.parts: - log.warning(f"Reading random data from {path}") - break - - self.path = path def __call__(self): # Use prepared path, query, and replacements @@ -185,12 +187,6 @@ def __init__(self, source, source_kw): scenarios = [] if release in ("3.1", "3.0.1", "3.0"): - # Directories in which to locate `self.filename`: - # - User's local cache (retrieved with "mix-models fetch" or equivalent). - # - Stored directly within message_ix_models (editable install from a clone - # of the git repository). - dirs = [user_cache_path("message-ix-models"), package_data_path("ssp")] - scenarios.append(f"SSP{ssp_id}") if measure == "GDP|PPP": @@ -203,9 +199,6 @@ def __init__(self, source, source_kw): Scenario={"Historical Reference": scenarios[0]}, ) elif release == "preview": - # Look first in message_data, then in message_ix_models test data - dirs = [private_data_path("ssp"), package_data_path("test", "ssp")] - models.extend([model] if model is not None else []) scenarios.append(f"SSP{ssp_id} - Review Phase 1") else: @@ -215,22 +208,15 @@ def __init__(self, source, source_kw): ) raise ValueError(release) + # Identify input data path + self.path = path_fallback("ssp", self.filename[release], where=WHERE) + if "test" in self.path.parts: + log.warning(f"Read random data from {self.path}") + # Assemble and store a query string self.query = f"Scenario in {scenarios!r} and Variable == '{measure}'" + ( f"and Model in {models!r}" if models else "" ) - # log.info(f"{self.query = }") - - # Iterate over possible locations for the data file - for path in [d.joinpath(self.filename[release]) for d in dirs]: - if not path.exists(): - log.info(f"Not found: {path}") - continue - if "test" in path.parts: - log.warning(f"Reading random data from {path}") - break - - self.path = path def __call__(self): # Use prepared path, query, and replacements