Skip to content

Commit

Permalink
(PR #30) Implement AiiDA 2.x monitoring feature
Browse files Browse the repository at this point in the history
This PR migrated `aiida-aurora` from using parallel monitoring calcjobs (supported by the `aiida-calcmonitor` plugin) to the now-built-in calcjob monitoring feature.
  • Loading branch information
edan-bainglass authored Aug 30, 2023
2 parents 2ef4462 + ce8ff4c commit 9cd28d2
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 0 deletions.
90 changes: 90 additions & 0 deletions aiida_aurora/monitors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from json import load
from tempfile import NamedTemporaryFile
from typing import Optional

from aiida.orm import CalcJobNode
from aiida.transports import Transport

from .utils.analyzers import CapacityAnalyzer


def monitor_capacity_threshold(
node: CalcJobNode,
transport: Transport,
settings: dict,
filename="snapshot.json",
) -> Optional[str]:
"""Retrieve and inspect snapshot to determine if capacity has
fallen below threshold for several consecutive cycles.
Parameters
----------
`node` : `CalcJobNode`
The calculation node.
`transport` : `Transport`
The associated transport instance.
`settings` : `dict`
The monitor settings.
`filename` : `str`
The polled source file, `"snapshot.json"` by default.
Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
Raises
------
`TypeError`
If source file is not in expected dictionary format (JSON).
`ValueError`
If source file is empty.
`FileNotFoundError`
If the file does not exist in the working directory.
`OSError`
If another error occurred while reading the file.
`Exception`
If something else prevented analysis.
"""

analyzer = CapacityAnalyzer(**settings)
analyzer.set_logger(node.logger)

try:

with transport:

remote_path = f"{node.get_remote_workdir()}/{filename}"

if not transport.isfile(remote_path):
node.logger.info(f"'{filename}' not yet produced; continue")
return None

try:

with NamedTemporaryFile("w+") as temp_file:
transport.getfile(remote_path, temp_file.name)
snapshot = load(temp_file)

if not isinstance(snapshot, dict):
raise TypeError

if not snapshot:
raise ValueError

return analyzer.analyze(snapshot)

except TypeError:
node.logger.error(f"'{filename}' not in dictionary format")
except ValueError:
node.logger.error(f"'{filename}' is empty")
except FileNotFoundError:
node.logger.error(f"error fetching '{filename}'")
except OSError as err:
node.logger.error(str(err))

return None

except Exception as err:
node.logger.error(str(err))
return None
188 changes: 188 additions & 0 deletions aiida_aurora/utils/analyzers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from itertools import groupby
from logging import LoggerAdapter
from typing import Optional

from aiida.common.log import AIIDA_LOGGER, LOG_LEVEL_REPORT

from .parsers import get_data_from_raw


class Analyzer:
"""Base class for all analyzers.
Attributes
==========
`logger` : `Union[AiidaLoggerType, LoggerAdapter]`
The associated logger.
"""

logger = AIIDA_LOGGER.getChild("monitor")

def set_logger(self, logger: LoggerAdapter) -> None:
"""Set the analyzer logger.
Parameters
----------
`logger` : `LoggerAdapter`
The logger of the analyzed calculation node.
"""
self.logger = logger

def analyze(self, snapshot: dict) -> Optional[str]:
"""Analyze the experiment snapshot against a condition.
Condition is defined in subclass analyzers.
Parameters
----------
`snapshot` : `dict`
The loaded snapshot dictionary.
Returns
-------
`Optional[str]`
A string if a defined condition has been met,
`None` otherwise.
"""
raise NotImplementedError


class CapacityAnalyzer(Analyzer):
"""A battery capacity analyzer.
Attributes
==========
`check_type` : `str`
The half-cycle to analyze (charge/discharge),
`"discharge_capacity"` by default.
`threshold` : `float`
The capacity threshold in percent, `0.8` by default.
`consecutive_cycles` : `int`
The number of required below-threshold consecutive cycles,
`2` by default
"""

def __init__(
self,
check_type="discharge_capacity",
threshold=0.8,
consecutive_cycles=2,
) -> None:
"""`CapacityAnalyzer` constructor.
Parameters
----------
`check_type` : `str`
The half-cycle to analyze,
`"discharge_capacity"` by default.
`threshold` : `float`
The capacity threshold in percent, `0.8` by default.
`consecutive_cycles` : `int`
The number of required consecutive cycles,
`2` by default.
Raises
------
`TypeError`
If `check_type` is not supported.
"""

if check_type not in {"discharge_capacity", "charge_capacity"}:
raise TypeError(f"{check_type=} not supported")

self.threshold = threshold
self.consecutive = consecutive_cycles
self.is_discharge = check_type == "discharge_capacity"

def analyze(self, snapshot: dict) -> Optional[str]:
"""Analyze the snapshot.
Check if capacity has fallen below threshold for required
consecutive cycles.
Parameters
----------
`snapshot` : `dict`
The loaded snapshot dictionary.
Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
"""
self.capacities = self._get_capacities(snapshot)
self.cycles = len(self.capacities)
return None if self.cycles < 1 else self._check_capacity()

###########
# PRIVATE #
###########

def _get_capacities(self, snapshot: dict):
"""Post-process the snapshot to extract capacities.
Parameters
----------
`snapshot` : `dict`
The loaded snapshot dictionary.
Returns
-------
`_type_`
A `numpy` array of capacities (in mAh), or empty list
if failed to process snapshot.
"""
try:
data = get_data_from_raw(snapshot)
capacities = data['Qd'] if self.is_discharge else data['Qc']
return capacities / 3.6 # As -> mAh
except KeyError as err:
self.logger.error(f"missing '{str(err)}' in snapshot")
return []

def _check_capacity(self) -> Optional[str]:
"""Check if capacity has fallen below threshold for required
consecutive cycles.
Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
"""

n = self.cycles
Qs = self.capacities[0]
Q = self.capacities[-1]
Qt = self.threshold * Qs

message = f"cycle #{n} : {Q = :.2f} mAh ({Qs / Q * 100:.1f}%)"

if Q < Qt:
message += f" : {(Qt - Q) / Qt * 100:.1f}% below threshold"

self.logger.log(LOG_LEVEL_REPORT, message)

below_threshold = self._count_cycles_below_threshold()
if below_threshold >= self.consecutive:
return f"Capacity below threshold ({Qt:.2f} mAh) " \
f"for {below_threshold} cycles!"

return None

def _count_cycles_below_threshold(self) -> int:
"""Count the number of consecutive cycles below threshold.
Returns
-------
`int`
The number of consecutive cycles below threshold.
"""
Qt = self.threshold * self.capacities[0]
return next(
(
len(list(group)) # cycle-count of first below-threshold group
for below, group in groupby(self.capacities < Qt)
if below
),
0,
)
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ docs = [
"aurora.fake" = "aiida_aurora.calculations.fake:BatteryFakeExperiment"
"aurora.cycler" = "aiida_aurora.calculations.cycler:BatteryCyclerExperiment"

[project.entry-points."aiida.calculations.monitors"]
"aurora.monitors.capacity_threshold" = "aiida_aurora.monitors:monitor_capacity_threshold"

[project.entry-points."aiida.parsers"]
"aurora" = "aiida_aurora.parsers:TomatoParser"

Expand Down

0 comments on commit 9cd28d2

Please sign in to comment.