diff --git a/message_ix_models/project/ssp/transport.py b/message_ix_models/project/ssp/transport.py index cd092138d1..ee93e5efe0 100644 --- a/message_ix_models/project/ssp/transport.py +++ b/message_ix_models/project/ssp/transport.py @@ -4,8 +4,12 @@ from typing import TYPE_CHECKING, Hashable import genno +import pandas as pd import xarray as xr +from genno import KeySeq +from message_ix_models import Context +from message_ix_models.model.structure import get_codelist from message_ix_models.tools.iamc import iamc_like_data_for_query from message_ix_models.util import minimum_version @@ -115,9 +119,10 @@ def finalize( Parameters ---------- q_all : - All data. + All data. Quantity with dimensions (n, UNIT, VARIABLE). q_update : Revised data to overwrite corresponding values in `q_all`. + Quantity with dimensions (n, UNIT, VARIABLE, e, t). """ def _expand(qty): @@ -127,19 +132,22 @@ def _expand(qty): s_all = q_all.pipe(_expand).to_series() - s_all.update( + s_update = ( q_update.pipe(_expand) .to_frame() .reset_index() .assign( Variable=lambda df: ( "Emissions|" + df["e"] + "|Energy|Demand|Transportation|" + df["t"] - ).str.replace("|_T", "") + ).str.replace("|_T", ""), ) .drop(["e", "t"], axis=1) .set_index(s_all.index.names)[0] + .rename("value") ) + s_all.update(s_update) + ( s_all.unstack("y") .reorder_levels(["Model", "Scenario", "Region", "Variable", "Unit"]) @@ -176,6 +184,7 @@ def main(path_in: "pathlib.Path", path_out: "pathlib.Path", method: str) -> None k_input, iamc_like_data_for_query, path=path_in, + non_iso_3166="keep", query="Model != ''", unique="MODEL SCENARIO", ) @@ -187,17 +196,24 @@ def main(path_in: "pathlib.Path", path_out: "pathlib.Path", method: str) -> None c.add("path out", path_out) # Call a function to prepare the remaining calculations + # This returns a key like prepare_func = { "A": prepare_method_A, "B": prepare_method_B, }[method] - prepare_func(c, k_input) + + k = prepare_func(c, k_input) + + # - Collapse to IAMC "VARIABLE" dimension name + # - Recombine with other data + # - Write back to the file + c.add("target", finalize, k_input, k, "model name", "scenario name", "path out") # Execute c.get("target") -def prepare_method_A(c: "genno.Computer", k_input: "genno.Key") -> None: +def prepare_method_A(c: "genno.Computer", k_input: "genno.Key") -> "genno.Key": """Prepare calculations using method 'A'. 1. Select data with variable names matching :data:`EXPR`. @@ -253,25 +269,131 @@ def prepare_method_A(c: "genno.Computer", k_input: "genno.Key") -> None: c.add(k[4], "mul", k[3] / t, "broadcast:t:AIR emissions") # Add to the input data - c.add(k[5], "add", k[1], k[4]) + return c.add(k[5], "add", k[1], k[4]) - # - Collapse to IAMC "VARIABLE" dimension name - # - Recombine with other data - # - Write back to the file + +def prepare_method_B(c, k_input: "genno.Key") -> None: + """Prepare calculations using method 'B'.""" + from types import SimpleNamespace + + from message_ix_models.model.transport import build + from message_ix_models.model.transport import files as exo + from message_ix_models.tools.exo_data import prepare_computer + + # Fetch a Context instance + # NB It is assumed this is aligned with the contents of the input data file + context = Context.get_instance() + + # TODO Check if this is redundant, i.e. already in build.get_computer + cl_emission = get_codelist("emission") + + # Add the same structure information, notably , used in + # the build and report workflow steps for MESSAGEix-Transport + build.get_computer(context, c) + + ### Prepare data from IEA EWEB: the share of aviation in transport consumption of + ### each 'c[ommodity]' + + # Fetch data from IEA EWEB + flows = ["AVBUNK", "DOMESAIR", "TOTTRANS"] + kw = dict(provider="IEA", edition="2024", flow=flows, transform="B", regions="R12") + keys = prepare_computer(context, c, "IEA_EWEB", kw, strict=False) + + # Shorthand + k = SimpleNamespace( + iea=genno.KeySeq(keys[0]), + cnt=KeySeq("FOO:c-n-t"), + ) + k.fnp = k.iea / "y" + k.cn = k.cnt / "t" + + # Select data for 2019 only + c.add(k.fnp[0], "select", k.iea.base, indexers=dict(y=2019), drop=True) + + # Only use the aggregation on the 'product' dimension, not on 'flow' c.add( - "target", - finalize, - k_input, - k[5], - "model name", - "scenario name", - "path out", + "groups:p:iea to transport", + lambda d: {"product": d["product"]}, + "groups::iea to transport", ) + # Aggregate IEA 'product' dimension for alignment to MESSAGE 'c[ommodity]' + c.add(k.fnp[1], "aggregate", k.fnp[0], "groups:p:iea to transport", keep=False) + # Rename dimensions + c.add(k.cnt[0], "rename_dims", k.fnp[1], name_dict=dict(flow="t", product="c")) -def prepare_method_B(c, k_input: "genno.Key"): - """Prepare calculations using method 'B'.""" - raise NotImplementedError + # Reverse sign of AVBUNK + q_sign = genno.Quantity([-1.0, 1.0, 1.0], coords={"t": flows}) + c.add(k.cnt[1], "mul", k.cnt[0], q_sign) + + # Compute ratio of ('AVBUNK' + 'DOMESAIR') to 'TOTTRANS' + # TODO Confirm that this or another numerator is appropriate + c.add(k.cnt[2], "select", k.cnt[1], indexers=dict(t=["AVBUNK", "DOMESAIR"])) + c.add(k.cn[0], "sum", k.cnt[2], dimensions=["t"]) + c.add(k.cn[1], "select", k.cnt[1], indexers=dict(t="TOTTRANS"), drop=True) + c.add(k.cn[2], "div", k.cn[0], k.cn[1]) + + ### Prepare data from the input data file: total transport consumption of light oil + k.input = genno.KeySeq("input", ("n", "y", "UNIT", "e")) + + # Filter on "VARIABLE" + expr = r"^Final Energy\|Transportation\|(?PLiquids\|Oil)$" + c.add(k.input[0] / "e", select_re, k_input, indexers={"VARIABLE": expr}) + + # Extract the "e" dimensions from "VARIABLE" + c.add(k.input[1], extract_dims, k.input[0] / "e", dim_expr={"VARIABLE": expr}) + + # Convert "UNIT" dim labels to Quantity.units + c.add(k.input[2] / "UNIT", "unique_units_from_dim", k.input[1], dim="UNIT") + + # Relabel: + # - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) to 'lightoil' + # - n[ode]: 'AFR' to 'R12_AFR' etc. + labels = dict( + c={"Liquids|Oil": "lightoil"}, + n={n.id.partition("_")[2]: n.id for n in get_codelist("node/R12")}, + ) + c.add(k.input[3] / "UNIT", "relabel", k.input[2] / "UNIT", labels=labels) + + ### Compute estimate of emissions + # Product of aviation share and FE of total transport → FE of aviation + prev = c.add("aviation fe", "mul", k.input[3] / "UNIT", k.cn[2]) + + # Convert exo.emi_intensity to Mt / GWa + c.add( + exo.emi_intensity + "conv", "convert_units", exo.emi_intensity, units="Mt / GWa" + ) + + # Product of FE of aviation and emission intensity → emissions of aviation + prev = c.add("aviation emi::0", "mul", prev, exo.emi_intensity + "conv") + + # Convert units to megatonne / year + prev = c.add("aviation emi::1", "convert_units", prev, units="Mt / year") + + # In one step + # - Expand dimensions with "UNIT" containing labels to be used. + # - Adjust values for species (N2O) that are reported in kt rather than Mt. + data = [] + for e in cl_emission: + try: + label = str(e.get_annotation(id="report").text) + except KeyError: + label = e.id + try: + unit = str(e.get_annotation(id="units").text) + except KeyError: + unit = "Mt" + data.append(["AIR", e.id, f"{unit} {label}/yr", 1.0 if unit == "Mt" else 1e3]) + + dims = "t e UNIT value".split() + q = genno.Quantity(pd.DataFrame(data, columns=dims).set_index(dims[:-1])[dims[-1]]) + prev = c.add("aviation emi::2", "mul", prev, q) + + # Change labels + # - Restore e.g. "AFR" given "R12_AFR" + labels = dict(n={v: k for k, v in labels["n"].items()}, t={"AIR": "Aviation"}) + k.result = c.add("aviation emi::3", "relabel", prev / "c", labels=labels) + return k.result def select_re(qty: "AnyQuantity", indexers: dict) -> "AnyQuantity":