Source code for spaemis.inventory

"""
Loading emissions inventories
"""
from __future__ import annotations

import functools
import glob
import logging
import os
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, TypeVar

import geopandas  # type: ignore
import pandas as pd
import xarray as xr
from attrs import define, field
from typing_extensions import Self

from spaemis.constants import RAW_DATA_DIR, TEST_DATA_DIR
from spaemis.utils import clip_region, load_australia_boundary

logger = logging.getLogger(__name__)


T = TypeVar("T", xr.Dataset, xr.DataArray)

if TYPE_CHECKING:
    import attr

    ValidatorType = Callable[[Any, attr.Attribute[T], Any], Any]


[docs]def has_dimensions( dimensions: str | list[str], ) -> ValidatorType[T]: """ Check if a xarray Dataset or DataArray has the expected dimensions Parameters ---------- dimensions Dimensions to checks Raises ------ ValueError If any of the requested dimensions are not present in the data array """ dims: list[str] = [dimensions] if isinstance(dimensions, str) else dimensions def _check(instance: Any, attribute: attr.Attribute[T], value: Any) -> None: for exp_dim in dims: if exp_dim not in value.dims: raise ValueError(f"Missing dimension: {exp_dim}") return _check
[docs]@define class EmissionsInventory: """ An emissions inventory is a set of bottom up """ data: xr.Dataset = field( validator=[ has_dimensions(["lat", "lon", "sector"]), ] ) border_mask: geopandas.GeoDataFrame year: int
[docs] @classmethod def load_from_directory(cls, data_directory: str, year: int, **kwargs: Any) -> Self: """ Load an emissions inventory from a directory """ raise NotImplementedError()
[docs]class Grid: """ Configuration for a regular lat/lon mesh """ nx: int ny: int x0: float y0: float dx: float dy: float @property def lats(self) -> list[float]: """ Latitudes of the grid """ return [self.y0 + self.dy * i for i in range(self.ny)] @property def lons(self) -> list[float]: """ Longitudes of the grid """ return [self.x0 + self.dx * i for i in range(self.nx)]
[docs]class VictoriaGrid(Grid): """ Information about the grid used in Victoria """ nx: int = 903 ny: int = 592 x0: float = 140.6291 y0: float = -39.5402 dx: float = 0.01059988 dy: float = 0.01059988
[docs]class AustraliaGrid(Grid): """ Information about the grid used in Australia lat/lons represent the center of the cell """ nx: int = 450 ny: int = 400 x0: float = 110.05 y0: float = -10.05 dx: float = 0.1 dy: float = -0.1
[docs]@define class VictoriaEPAInventory(EmissionsInventory): """ Victorian EPA data CSV files of 1D datapoints. Each grid is slightly different so some post-processing is required to get all the variables and sectors on to the same grid. """
[docs] @classmethod def load_from_directory( cls, data_directory: str, year: int, file_suffix: str = "_tif_to_csv3.csv", grid: Grid | None = None, **kwargs: Any, ) -> Self: """ Load Victorian EPA inventory data Parameters ---------- data_directory Folder containing CSV input files grid Object containing information about the target grid Returns ------- Loaded data """ fnames = sorted(glob.glob(os.path.join(data_directory, "*.csv"))) if not fnames: raise ValueError("No inventory files found for Victoria") if grid is None: grid = VictoriaGrid() # Explicitly cast to grid _grid: Grid = grid def read_file(fname: str) -> xr.Dataset: dataframe = pd.read_csv(fname).set_index(["lat", "lon"]) dataset = xr.Dataset.from_dataframe(dataframe) dataset["lat_new"] = _grid.lats dataset["lon_new"] = _grid.lons del dataset["lat"] del dataset["lon"] return dataset.rename({"lat_new": "lat", "lon_new": "lon"}).assign_coords( {"sector": os.path.basename(fname).replace(file_suffix, "")} ) # Data should be nan outside of the region of interest, but zero if no emissions # present merged_data = xr.concat([read_file(f) for f in fnames], dim="sector").fillna(0) vic_border = load_australia_boundary() vic_border = vic_border[vic_border.shapeName == "Victoria"] return cls( data=clip_region(merged_data, vic_border), border_mask=vic_border, year=year )
[docs]@define class AustraliaInventory(EmissionsInventory): """ Australian data CSV files of 1D datapoints. Each grid is slightly different so some post-processing is required to get all the variables and sectors on to the same grid. """
[docs] @classmethod def load_from_directory( cls, data_directory: str, year: int, file_suffix: str = ".nc", **kwargs: Any, ) -> Self: """ Load Australian EDGAR data Parameters ---------- data_directory Folder containing CSV input files grid Object containing information about the target grid Returns ------- Loaded data """ fnames = sorted(glob.glob(os.path.join(data_directory, "*.nc"))) if not fnames: raise ValueError("No inventory files found for Australia") australia_boundary = load_australia_boundary() # Data should be nan outside of the region of interest, but zero if no emissions # present merged_data = xr.merge([xr.load_dataset(f) for f in fnames], join="outer") merged_data = merged_data.where(merged_data > 0).fillna(0) clipped_data = clip_region(merged_data, australia_boundary) return cls( data=clipped_data, border_mask=australia_boundary, year=year, )
[docs]@define class TestInventory(EmissionsInventory): """ Test inventory using decimated Vic inventory data """
[docs] @classmethod def load_from_directory(cls, data_directory: str, year: int, **kwargs: Any) -> Self: """ Load test inventory data For testing we use a decimated version of the vic inventory generated using ``scripts/downsample_inventory.py`` Parameters ---------- data_directory Folder containing CSV input files grid Object containing information about the target grid Returns ------- Loaded data """ vic_border = load_australia_boundary() vic_border = vic_border[vic_border.shapeName == "Victoria"] data = xr.load_dataset( os.path.join( TEST_DATA_DIR, "inventory", "decimated", "inventory_decimated.nc", ) ).drop_vars("spatial_ref") return cls(data, border_mask=vic_border, year=2016)
@functools.lru_cache(5) def load_inventory( inventory: str, year: int, data_directory: str | None = None ) -> EmissionsInventory: """ Load an emissions inventory Parameters ---------- inventory Inventory name year Year of inventory to load data_directory If provided, override the default directory to load data from If not provided the data directory will be constructed from the `SPAEMIS_INVENTORY_DIRECTORY` environment variable, the inventory name and inventory year. Returns ------- EmissionsInventory with loaded data Raises ------ KeyError Could not determine the appropriate inventory to load """ inventory_root_directory = os.environ.get( "SPAEMIS_INVENTORY_DIRECTORY", os.path.join(RAW_DATA_DIR, "inventories") ) data_directory = data_directory or os.path.join( inventory_root_directory, inventory, str(year) ) mapping = { ("test", 2016): TestInventory, ("victoria", 2016): VictoriaEPAInventory, ("australia", 2016): AustraliaInventory, ("australia", 2018): AustraliaInventory, } try: loader = mapping[(inventory, year)] except KeyError as exc: raise ValueError(f"No inventory matching {inventory}|{year}") from exc return loader.load_from_directory(data_directory=data_directory, year=year)
[docs]def write_inventory_csvs(ds: xr.Dataset, output_dir: str) -> None: """ Serialize a Dataset to CSV files with the same format as the input inventory data Each sector is written into a different file. The CSV file contains rows of data for each datapoint on a lat, lon grid. Each variable will be written as a separate column. The ordering of the datapoints are lat-major (i.e. iterate over latitude and then longitude) Parameters ---------- ds Data to output This dataset should be similar to the format from :attr:`EmissionsInventory.data`. in that each data variable contains a sector, lat and lon dimension. output_dir Output directory for the """ sectors = ds["sector"].values for sector in sectors: sector_data = ds.sel(sector=sector) sector_data = sector_data.reset_coords( [k for k in sector_data.coords.keys() if k not in ["lat", "lon"]], drop=True ) output_fname = f"{sector}_projected.csv" logger.info(f"Writing output file: {os.path.join(output_dir, output_fname)}") df = sector_data.fillna(0).to_dataframe(["lon", "lat"]).reset_index() # Round to be similar to the input inventory files df["lat"] = df["lat"].round(4) df["lon"] = df["lon"].round(4) df.to_csv(os.path.join(output_dir, output_fname), index=False)