Skip to content

Commit

Permalink
Merge pull request #47 from C2SM-RCM/coli
Browse files Browse the repository at this point in the history
Country speciation
  • Loading branch information
lionel42 authored Dec 13, 2023
2 parents c50949a + 1a80b11 commit de49c09
Show file tree
Hide file tree
Showing 16 changed files with 658 additions and 50 deletions.
3 changes: 1 addition & 2 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ Categories Manipulations
Speciation
----------

.. autofunction:: emiproc.speciation.speciate_inventory
.. autofunction:: emiproc.speciation.speciate

.. autofunction:: emiproc.speciation.speciate_nox


Utilities
Expand Down
23 changes: 15 additions & 8 deletions emiproc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Emission processing package."""
from __future__ import annotations

import logging
from pathlib import Path

# directory where the data files are stored
FILES_DIR = Path(__file__).parent.parent / "files"
TESTS_DIR = FILES_DIR / "test"

logger = logging.getLogger("emiproc")

Expand All @@ -15,14 +17,19 @@
logger.setLevel(PROCESS)


def deprecated(func):
def deprecated(msg: str | None = None):
"""Decorator to mark functions as deprecated."""

def wrapper(*args, **kwargs):
logger.warning(
"Call to deprecated function {}.".format(func.__name__),
stacklevel=2,
)
return func(*args, **kwargs)
def deprecated_decorator(func, msg=msg):
def wrapper(*args, msg=msg, **kwargs):
msg_default = "Call to deprecated function {}.".format(func.__name__)
if msg is None:
msg = msg_default
else:
msg = msg_default + " " + msg
logger.warning(msg, stacklevel=2)
return func(*args, **kwargs)

return wrapper
return wrapper

return deprecated_decorator
10 changes: 9 additions & 1 deletion emiproc/exports/icon.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def export_icon_oem(
time_profiles: dict[str, list[TemporalProfile]] = {}
vertical_profiles: dict[str, VerticalProfile] = {}

# Check that the inventory has the same amount of cells
# as the icon grid
if len(inv.gdf) != ds_out["cell"].size:
raise ValueError(
f"The inventory has {len(inv.gdf)} cells, but the icon grid has"
f" {ds_out['cell'].size} cells."
)

for categorie, sub in inv._gdf_columns:
if substances is not None and sub not in substances:
continue
Expand Down Expand Up @@ -392,7 +400,7 @@ def make_icon_time_profiles(
raise NotImplementedError(f"{profiles_type} is not implemented.")

for ds in dict_ds.values():
# ds["country"] = [i for i in range(len(time_zones))]
ds["country"] = [i for i in range(len(time_zones))]
ds.assign_coords({"timezone_of_country": (("country",), time_zones)})

if out_dir is not None:
Expand Down
284 changes: 282 additions & 2 deletions emiproc/speciation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,293 @@
Other functions are hardcoded for several substances.
"""
from __future__ import annotations
from typing import TYPE_CHECKING

from os import PathLike
from typing import TYPE_CHECKING, Any

import pandas as pd
import xarray as xr
import numpy as np

from emiproc import deprecated
from emiproc.utilities import get_country_mask

if TYPE_CHECKING:
from emiproc.inventories import Inventory, CatSub, Category
from emiproc.inventories import Category, CatSub, Inventory


def read_speciation_table(path: PathLike, drop_zeros: bool = False) -> xr.DataArray:
"""Read a speciation table from a file.
Format of the file:
```
# Any comment line starting with # is ignored
# The first line is the header, it contains optional information
# to specify speciation factors for each category.
category,country,substance0,substance1,substance2, ...
cat0,c0,0.5,0.2,0.1, ...
cat1,c0,0.3,0.5,0.0, ...
cat0,c1,0.2,0.3,0.5, ...
...
```
The table can contain optional dimensions:
- country : must follow the ISO3 naming convention
In case cells belong to no country and have emissions, you will
need to set a default value for the speciation ratios.
This can be done by adding a row with the country set to `-99`.
- category
- type : Whether applies to 'gridded' or 'shapped'
- year
Ratios are the fraction of the weight of the speciated substance.
As emissions are given in mass/unit of time, the sum of the ratios
for a given speciiation must be 1.
:arg path: The path of the speciation table.
:arg drop_zeros: Whether to drop the speciation ratios that sum to 0.
:returns: The speciation ratios.
The speciation ratios are the weight fraction conversion from the
speciated substance.
"""
df = pd.read_csv(path, comment="#")

# Reserved columns
columns_types = {
"category": str,
"country": str,
"type": str,
"year": int,
}

new_substances = [col for col in df.columns if col not in columns_types.keys()]

# Get the ratios data
ratios = df[new_substances].to_numpy()

da = xr.DataArray(
ratios,
dims=["speciation", "substance"],
coords={
"speciation": df.index,
"substance": new_substances,
}
| {
# Add the optional dimensions that describe the speciation
k: ("speciation", df[k].to_numpy().astype(t))
for k, t in columns_types.items()
if k in df.columns
},
)

if drop_zeros:
mask_zero = da.sum("substance") == 0
da = da[~mask_zero]

# Check that all the speciation ratios sum to 1
mask_close = np.isclose(da.sum("substance"), 1.0)
if not mask_close.all():
raise ValueError(
"The speciation ratios must sum to 1, but the following rows don't:"
f" {da[~mask_close]}"
)

return da


def speciate(
inv: Inventory,
substance: str,
speciation_ratios: xr.DataArray,
drop: bool = True,
country_mask_kwargs: dict[str, Any] = {},
) -> Inventory:
"""Speciate a substance in an inventory.
.. note:: Profiles (vertical and temporal) are not speciated.
The profiles of the speciated substance are simply applied.
If you have specific profiles for the speciated substance,
you need to set them after the speciation operation.
:arg inv: The inventory to speciate.
:arg substance: The substance to speciate.
:arg speciation_ratios: The speciation ratios. See :py:func:`read_speciation_table`.
:arg drop: Whether to drop the speciated substance.
:arg country_mask_kwargs: If the speciation ratios depend on the country,
this function is used to pass optional arguments to
:py:func:`emiproc.utilities.get_country_mask`.
"""
new_inv = inv.copy()

if "year" in speciation_ratios.coords:
if inv.year is None:
raise ValueError(
f"The inventory {inv} does not have a year, but the speciation ratios"
f" {speciation_ratios} do."
)
# Get the speciation ratios for the year of the inventory
mask_year = speciation_ratios["year"] == inv.year
speciation_ratios = speciation_ratios.loc[mask_year]

if "country" in speciation_ratios.coords:
countries_fractions: xr.DataArray = get_country_mask(
new_inv.grid, return_fractions=True, **country_mask_kwargs
)

for cat, sub in inv._gdf_columns:
if sub != substance:
continue
# Check that the speciation ratios are defined for this category
if "category" in speciation_ratios.coords:
da_ratios = speciation_ratios.loc[speciation_ratios["category"] == cat]
else:
da_ratios = speciation_ratios

if "type" in speciation_ratios.coords:
da_ratios = da_ratios.loc[da_ratios["type"] == "gridded"]

if da_ratios["speciation"].size == 0:
raise ValueError(
f"The speciation ratios for {cat} and {substance} is not defined."
)

if "country" in speciation_ratios.coords:
# Calculate the ratio at each cell
# Make the dim on country instead of speciation
da_ratios_country = da_ratios.set_index(speciation="country").rename(
{"speciation": "country"}
)
da_ratios_cells = countries_fractions.dot(
da_ratios_country, dims=["country"]
)
# First check that where the sum is 0, the total emissions are 0
mask_zero_ratios = da_ratios_cells.sum("substance") == 0
mask_zero_emissions = inv.gdf[(cat, substance)] == 0
mask_problem = mask_zero_ratios & ~mask_zero_emissions
missing_value = (
None
if "-99" not in da_ratios_country.coords["country"].values
else da_ratios_country.sel(country="-99")
)
if mask_problem.any():
if missing_value is None:
raise ValueError(
f"The speciation ratios for {cat} and {substance} is not"
" defined for the following cells that have emission values :"
f" {da_ratios_cells[mask_problem]['cell']}\n"
"You can add a row with country set to -99 to define the"
" default speciation ratios for those homeless cells."
)
# Set the missing value
da_ratios_cells = da_ratios_cells.where(
~mask_problem, other=missing_value
)
# Put ones where the sum is 0 to avoid later division by 0
da_ratios_cells = da_ratios_cells.where(~mask_zero_ratios, other=1.0)
# Correct cells to ensure that the sum is 1
da_ratios_cells = da_ratios_cells / da_ratios_cells.sum("substance")

da_ratios = da_ratios_cells

else:
# No country, just speciate every cell the same way
if da_ratios["speciation"].size > 1:
raise ValueError(
f"The speciation ratios for {cat} and {substance} is not unique:"
f" {da_ratios=}."
)
# Speciate the gdf
for new_sub in da_ratios["substance"].values:
# Simply get the single value
ratios = da_ratios.sel(substance=new_sub).values
# Should not happen, but just in case
if (cat, new_sub) in inv._gdf_columns:
raise KeyError(f"{cat}/{new_sub} already in the gdf of {inv}")
new_inv.gdf[(cat, new_sub)] = inv.gdf[(cat, substance)] * ratios
if drop:
# Drop the speciated substance
new_inv.gdf.drop(columns=[(cat, substance)], inplace=True)

# Now for the gdfs
for cat in inv.gdfs.keys():
if substance not in inv.gdfs[cat].columns:
continue
# Check that the speciation ratios are defined for this category
if "category" in speciation_ratios.coords:
da_ratios = speciation_ratios.loc[speciation_ratios["category"] == cat]
else:
da_ratios = speciation_ratios

if "type" in speciation_ratios.coords:
da_ratios = da_ratios.loc[da_ratios["type"] == "shapped"]
if "country" in speciation_ratios.coords:
country_mask = get_country_mask(
inv.gdfs[cat].geometry, **country_mask_kwargs
)
mask_no_country = country_mask == "-99"
if any(mask_no_country) and "-99" not in da_ratios.coords["country"]:
raise ValueError(
f"The speciation ratios for {cat} and {substance} is not defined"
f" for the following shapes {inv.gdfs[cat].loc[mask_no_country]}"
"You can add a country with iso3='-99' to define the"
" default speciation ratios for those homeless shapes."
)
da_ratios_country = da_ratios.set_index(speciation="country").rename(
{"speciation": "country"}
)
da_ratios = da_ratios_country.loc[country_mask]
else:
if da_ratios["speciation"].size == 0:
raise ValueError(
f"The speciation ratios for {cat} and {substance} is not defined"
" for shapped emissions."
)
if da_ratios["speciation"].size > 1:
raise ValueError(
f"The speciation ratios for {cat} and {substance} is not unique"
f" {da_ratios=}."
)

# Speciate the gdf
for new_sub in da_ratios["substance"].values:
speciation_ratio = da_ratios.sel(substance=new_sub).values
new_inv.gdfs[cat][new_sub] = inv.gdfs[cat][substance] * speciation_ratio
if drop:
new_inv.gdfs[cat].drop(columns=[substance], inplace=True)

# profiles
# this is easy because we can simply duplicate the indexes in substance
for indexes_name in ["t_profiles_indexes", "v_profiles_indexes"]:
if not hasattr(inv, indexes_name):
continue
indexes: xr.DataArray = getattr(inv, indexes_name)
if indexes is None or "substance" not in getattr(inv, indexes_name).dims:
continue

speciated_indexes = (
indexes.sel(substance=substance)
.drop("substance")
.expand_dims(dim={"substance": da_ratios["substance"].values})
)
new_indexes = xr.concat([indexes, speciated_indexes], dim="substance")
if drop:
# Remove the substance from the substance coordinate
new_indexes = new_indexes.drop_sel(substance=[substance])

setattr(new_inv, indexes_name, new_indexes)

new_inv.history.append(f"Speciated {substance} to {da_ratios['substance'].values}.")

return new_inv


@deprecated(msg="Use speciate instead.")
def speciate_inventory(
inv: Inventory,
speciation_dict: dict[CatSub, dict[CatSub, float]],
Expand Down
Loading

0 comments on commit de49c09

Please sign in to comment.