Source code for spaemis.scaling.point_source

"""
Point source scaler

Split a timeseries of emissions across a number of points

"""
import logging
import os
from typing import Any

import numpy as np
import pandas as pd
import scmdata
import xarray as xr
from attrs import define

from spaemis.config import PointSourceMethod, ScalerMethod
from spaemis.constants import RAW_DATA_DIR
from spaemis.inventory import EmissionsInventory
from spaemis.utils import clip_region

from .base import BaseScaler
from .timeseries import apply_amount, get_timeseries_point

logger = logging.getLogger(__name__)


[docs]@define class Point: """ Single spatial point """ lat: float lon: float
[docs]@define class PointSourceScaler(BaseScaler): """ Split emissions across some point sources """ point_sources: list[Point] source_timeseries: str source_filters: list[dict[str, Any]] def __call__( self, *, data: xr.DataArray, inventory: EmissionsInventory, target_year: int, timeseries: dict[str, scmdata.ScmRun], **kwargs: Any, ) -> xr.DataArray: """ Apply scaling Parameters ---------- data Timeseries to scale inventory timeseries Timeseries data used by the proxy kwargs Returns ------- Scaled data """ ts = get_timeseries_point( timeseries, self.source_timeseries, self.source_filters, target_year=target_year, ) scaled = data.copy() scaled = clip_region(scaled, inventory.border_mask) scaled[:, :] = 0 num_points = len(self.point_sources) num_valid_points = 0 d_lat = scaled.lat[1] - scaled.lat[0] for source in self.point_sources: try: field_location = scaled.sel( lat=source.lat, lon=source.lon, method="nearest", tolerance=np.abs(d_lat.values), ) scaled.loc[field_location.lat, field_location.lon] += 1 num_valid_points += 1 except KeyError: # Value not in domain pass if num_points == 0: raise ValueError("No point sources are available") if scaled.sum().values.squeeze() != num_valid_points: raise AssertionError(f"Something went wrong with proxy field: {scaled}") portion_in_domain = num_valid_points / float(num_points) logger.info(f"{num_valid_points} / {num_points} points sources are in domain.") amount = ts.values.squeeze() * portion_in_domain unit = ts.get_unique_meta("unit", True) return apply_amount(amount, unit, scaled)
[docs] @classmethod def create_from_config(cls, method: ScalerMethod) -> "PointSourceScaler": """ Create a scaler from configuration Point source information will be loaded from a directory specified by the environment variable `SPAEMIS_POINT_SOURCE_DIRECTORY`. This defaults to `data/raw/configuration/point_sources` if no environment variable is specified. """ if not isinstance(method, PointSourceMethod): raise TypeError("Incompatible configuration") point_source_directory = os.environ.get( "SPAEMIS_POINT_SOURCE_DIRECTORY", os.path.join(RAW_DATA_DIR, "configuration", "point_sources"), ) point_info = pd.read_csv( os.path.join(point_source_directory, method.point_sources) ) points = [ Point(lat=item["lat"], lon=item["lon"]) for item in point_info.to_dict("records") ] return PointSourceScaler( point_sources=points, source_timeseries=method.source_timeseries, source_filters=method.source_filters, )