"""
Proxy scaler
A proxy scaler uses a proxy (a 2d pattern) to disaggregate an emissions timeseries over a target area.
The proxy must cover the area of interest of the emissions timeseries
"""
import logging
from typing import Any
import scmdata
import xarray as xr
from attrs import define
from spaemis.config import ScalerMethod, TimeseriesMethod
from spaemis.input_data import _apply_filters
from spaemis.inventory import EmissionsInventory
from spaemis.unit_registry import convert_to_target_unit, unit_registry
from spaemis.utils import clip_region
from .base import BaseScaler
from .proxy import get_proxy
[docs]def get_timeseries_point(
timeseries: dict[str, scmdata.ScmRun],
source: str,
filters: list[dict[str, str]],
target_year: int,
) -> scmdata.ScmRun:
"""
Extract a single value according to some filters
Parameters
----------
timeseries
Collection of loaded timeseries
source
Timeseries name
filters
Filter to apply to the selected timeseries
These filters if applied must result in a single remaining timeseries otherwise
a ``ValueError`` will be raised.
target_year
Year to extract
Must be contained in the targeted timeseries
Raises
------
ValueError
If the selected data cannot be retrieved
Returns
-------
ScmRun with a containing a single timeseries and year.
This object will retain the metadata of the filtered timeseries
"""
try:
ts = timeseries[source]
except KeyError as exc:
raise ValueError(f"Source dataset is not loaded: {source}") from exc
# This linearly interpolates the timeseries
ts = _apply_filters(ts, filters).resample("AS")
if len(ts) == 0:
raise ValueError(f"No data matching {filters} was found in {source} input data")
if len(ts) > 1:
raise ValueError(
f"More than one match was found for {filters} in {source} input data"
)
if target_year not in ts["year"].values:
raise ValueError(f"No timeseries data for year={target_year} available")
ts = ts.filter(year=target_year)
if ts.shape != (1, 1):
raise AssertionError("Something went wrong when filtering timeseries")
return ts
def _check_unit(unit: str) -> None:
pint_unit = unit_registry(unit)
msg = "Expected unit of the form [X] * [mass] / [time]"
if (
len(pint_unit.dimensionality) != 3 # noqa
and pint_unit.dimensionality["[mass]"] != 1
and pint_unit.dimensionality["[time]"] != -1
and "[length]" not in pint_unit.dimensionality
):
raise ValueError(msg)
[docs]def apply_amount(amount: float, unit: str, proxy: xr.DataArray) -> xr.DataArray:
"""
Scale a known amount of emissions across a proxy
Parameters
----------
amount
Amount of stuff to allocate across the proxy
The stuff is usually annual emissions of a given species and currently
unit
Unit of the stuff
Must be able to be converted to a Pint unit with dimensions [X] * [mass] / [time]
where X can be any dimension (typically CO2 or other species in the case of emissions)
proxy
Proxy array
Can include nans and missing data
Returns
-------
DataArray with the same dimensions of ``proxy`` where the sum matches ``amount``
"""
# Verify units are [X] * [mass] / [time]
_check_unit(unit)
logging.debug(f"applying {amount} {unit} using a proxy")
# Adjust to kg X/yr
scale_factor = convert_to_target_unit(unit, target_unit="kg")
amount = amount * scale_factor.m
# Calculate density map
proxy_density = proxy / proxy.sum()
scaled = amount * proxy_density
scaled.attrs["units"] = str(scale_factor.u) + " / cell"
return scaled
[docs]@define
class TimeseriesScaler(BaseScaler):
"""
Scale a spatial pattern using a timeseries
"""
proxy: str
proxy_region: str
source_timeseries: str
source_filters: list[dict[str, Any]]
def __call__(
self,
*,
data: xr.DataArray,
inventory: EmissionsInventory,
timeseries: dict[str, scmdata.ScmRun],
target_year: int,
**kwargs: Any,
) -> xr.DataArray:
"""
Apply scaling
Parameters
----------
data
Timeseries to scale
inventory
timeseries
Timeseries data used by the proxy
kwargs
Returns
-------
Scaled data
"""
ts = get_timeseries_point(
timeseries,
self.source_timeseries,
self.source_filters,
target_year=target_year,
)
proxy_region = get_proxy(self.proxy_region, inventory=inventory)
proxy_region_clipped = clip_region(proxy_region, inventory.border_mask)
region_share = proxy_region_clipped.sum() / proxy_region.sum()
proxy = clip_region(
get_proxy(self.proxy, inventory=inventory), inventory.border_mask
)
proxy_interp = proxy.interp(lat=data.lat, lon=data.lon)
unit = ts.get_unique_meta("unit", True)
amount = ts.values.squeeze() * region_share
return apply_amount(amount, unit, proxy_interp)
[docs] @classmethod
def create_from_config(cls, method: ScalerMethod) -> "TimeseriesScaler":
"""
Create Scaler from configuration
"""
if not isinstance(method, TimeseriesMethod):
raise TypeError("Incompatible configuration")
return TimeseriesScaler(
proxy=method.proxy,
proxy_region=method.proxy_region or method.proxy,
source_timeseries=method.source_timeseries,
source_filters=method.source_filters,
)