Source code for spaemis.input_data

"""
Searching and loading of a local input4MIPs data archive
"""
from __future__ import annotations

import logging
import os
from functools import lru_cache
from glob import glob
from os import PathLike
from typing import Any

import pandas as pd
import scmdata
import xarray as xr

from spaemis.config import InputTimeseries

logger = logging.getLogger(__name__)

# The sector map used for the input4MIPs emissions data
SECTOR_MAP = [
    "Agriculture",
    "Energy Sector",
    "Industrial Sector",
    "Transportation Sector",
    "Residential, Commercial, Other",
    "Solvents production and application",
    "Waste",
    "International Shipping",
    # CO2 also includes this additional sector, but we aren't dealing with that here
    # "Negative CO2 Emissions",
]


[docs]class InputEmissionsDatabase: """ Database of Input4MIPs emissions data """ def __init__(self, paths: str | list[str] | None = None): self.available_data = pd.DataFrame( columns=["variable_id", "institute_id" "source_id", "filename"] ) self.paths: list[str] = [] if paths: if isinstance(paths, str): paths = [paths] for path in paths: self.register_path(path)
[docs] def register_path(self, path: str) -> None: """ Load data from a given path Any data present in the given folder will be added to the set of available files. Parameters ---------- path Path that contains input4MIPs data """ extra_options = self._find_options(path) if not len(extra_options): logger.info(f"Did not find any files in {path}") return already_existing = extra_options.filename.isin(self.available_data.filename) extra_options = extra_options.loc[~already_existing] logger.info(f"Found {len(extra_options)} new entries") if len(extra_options): self.available_data = pd.concat( [self.available_data, extra_options] ).sort_values(["variable_id", "institute_id" "source_id"]) self.paths.append(path)
def _find_options(self, root_dir: str | PathLike[str]) -> pd.DataFrame: files = glob(os.path.join(root_dir, "**", "*.nc"), recursive=True) def parse_filename(dataset_fname: str) -> dict[str, str] | None: toks = os.path.basename(dataset_fname).split("_") if len(toks) != 7: # noqa return None return { "variable_id": toks[0], "institute_id": toks[3], "source_id": toks[4], "filename": dataset_fname, } file_info = [parse_filename(fname) for fname in files] return pd.DataFrame(list(filter(lambda item: item is not None, file_info)))
[docs] @lru_cache(maxsize=15) def load(self, variable_id: str, source_id: str) -> xr.Dataset: """ Load the input4MIPs data according to the variable and source name Parameters ---------- variable_id Variable identifier source_id Source identifier Returns ------- All of the available data for the given variable and source identifiers """ subset = self.available_data if not len(self.available_data): raise ValueError( "No input data has been found. " "Set the 'SPAEMIS_INPUT_PATHS' environment variable", ) subset = subset[subset.source_id == source_id] subset = subset[subset.variable_id == variable_id] if len(subset) == 0: raise ValueError( f"Could not find any matching data for source_id={source_id} variable_id={variable_id}" ) files_to_load = sorted(subset.filename.values) logger.info(f"Loading data from {len(files_to_load)} files: {files_to_load}") data = [xr.open_dataset(fname) for fname in files_to_load] return xr.concat(data, dim="time").sortby("time")
[docs]def initialize_database(options: list[str] | None = None) -> InputEmissionsDatabase: """ Initialise the global database of input emissions Uses the `SPAEMIS_INPUT_PATHS` environment to provide a set of paths to search for input emissions. This environment can contain a comma-separated list of paths if multiple paths are used. Returns ------- Emissions database initialised with a list of input directories """ if not options: options_from_env: str = os.environ.get("SPAEMIS_INPUT_PATHS") # type: ignore if options_from_env: options = [s.strip() for s in options_from_env.split(",")] else: options = None return InputEmissionsDatabase(options)
def _apply_filters(ts: scmdata.ScmRun, filters: list[dict[str, Any]]) -> scmdata.ScmRun: for f in filters: ts = ts.filter(**f) return ts
[docs]def load_timeseries( options: list[InputTimeseries], root_dir: str | None = None ) -> dict[str, scmdata.ScmRun]: """ Load a set of input timeseries from disk Optionally, some additional filtering can be performed on these input timeseries Parameters ---------- options List of timeseries to load root_dir Root directory used for relative timeseries file paths Defaults to the current directory if no path is provided Returns ------- Collection of loaded data The keys are determined from the name of each ``InputTimeseries`` """ data = {} for ts_config in options: if ts_config.name in data: raise ValueError(f"Duplicate input timeseries found: {ts_config.name}") ts = _apply_filters( scmdata.ScmRun(os.path.join(root_dir or ".", ts_config.path)), ts_config.filters, ) data[ts_config.name] = ts return data
database = initialize_database()