Skip to content

Commit

Permalink
Merge pull request #301 from informatics-lab/fix-memory-issue
Browse files Browse the repository at this point in the history
Memory management
  • Loading branch information
andrewgryan committed Mar 27, 2020
2 parents a14dfe8 + b9a3c46 commit 51eb7f8
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 220 deletions.
2 changes: 1 addition & 1 deletion forest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
.. automodule:: forest.presets
"""
__version__ = '0.13.2'
__version__ = '0.13.3'

from .config import *
from . import (
Expand Down
15 changes: 15 additions & 0 deletions forest/colors.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,21 @@ def palettes(store, action):
yield set_colorbar({**defaults(), **settings})


def middleware():
previous = None
seen = False
def call(store, action):
nonlocal previous, seen
if not seen:
seen = True
previous = action
yield action
elif previous != action:
previous = action
yield action
return call


def is_fixed(state):
"""Helper to discover if fixed limits have been selected"""
return state.get("colorbar", {}).get("fixed", False)
Expand Down
130 changes: 10 additions & 120 deletions forest/data.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,23 @@
import os
import datetime as dt
import re
try:
import cartopy
except ImportError:
# ReadTheDocs unable to pip install cartopy
pass
import glob
import json
import pandas as pd
import numpy as np
import netCDF4
try:
import cf_units
except ImportError:
# ReadTheDocs unable to pip install cf-units
pass
from forest import (
geo,
disk)
from forest import geo
import bokeh.models
from collections import OrderedDict
from functools import partial
import scipy.ndimage
try:
import shapely.geometry
except ImportError:
# ReadTheDocs unable to pip install shapely
pass
from forest.util import (
initial_time,
coarsify)
from forest.exceptions import SearchFail
import forest.util


# Application data shared across documents
IMAGES = OrderedDict()
VECTORS = OrderedDict()
COASTLINES = {
"xs": [],
Expand Down Expand Up @@ -126,6 +108,7 @@ def xy(g):
yield xy(geometry)


# TODO: Delete this decorator in a future PR
def cache(name):
store = globals()[name]
def decorator(f):
Expand All @@ -137,6 +120,7 @@ def wrapped(*args):
return decorator


# TODO: Delete this class in a future PR
class ActiveViewer(object):
def __init__(self):
self.active = False
Expand Down Expand Up @@ -167,6 +151,7 @@ def on_state(self, state):
self.pending_state = state


# TODO: Delete this class in a future PR
class WindBarbs(ActiveViewer):
def __init__(self, paths):
self.paths = paths
Expand Down Expand Up @@ -236,8 +221,8 @@ def load_data(path, itime, ipressure):
np.zeros(len(lats), dtype="d"),
lats)
x, y = np.meshgrid(gx, gy)
u = convert_units(u, 'm s-1', 'knots')
v = convert_units(v, 'm s-1', 'knots')
u = forest.util.convert_units(u, 'm s-1', 'knots')
v = forest.util.convert_units(v, 'm s-1', 'knots')
return {
"x": x.flatten(),
"y": y.flatten(),
Expand All @@ -246,13 +231,14 @@ def load_data(path, itime, ipressure):
}


# TODO: Delete this class in a future PR
class Finder(object):
def __init__(self, paths):
self.paths = paths
self.table = {
initial_time(p): p for p in paths}
forest.util.initial_time(p): p for p in paths}
self.initial_times = np.array(
[initial_time(p) for p in paths],
[forest.util.initial_time(p) for p in paths],
dtype='datetime64[s]')
with netCDF4.Dataset(self.paths[0]) as dataset:
if "pressure" in dataset.variables:
Expand Down Expand Up @@ -295,99 +281,3 @@ def find_path(self, initial_time):
np.abs(
self.initial_times - initial_time))
return self.paths[i]


def pts_hash(pts):
if isinstance(pts, np.ndarray):
return pts.tostring()
else:
return pts


def load_image(path, variable, itime, ipressure):
return load_image_pts(path, variable, (itime,), (itime, ipressure))


def load_image_pts(path, variable, pts_3d, pts_4d):
key = (path, variable, pts_hash(pts_3d), pts_hash(pts_4d))
if key in IMAGES:
return IMAGES[key]
else:
try:
lons, lats, values, units = _load_netcdf4(path, variable, pts_3d, pts_4d)
except:
lons, lats, values, units = _load_cube(path, variable, pts_3d, pts_4d)

# Units
if variable in ["precipitation_flux", "stratiform_rainfall_rate"]:
if units == "mm h-1":
values = values
else:
values = convert_units(values, units, "kg m-2 hour-1")
elif units == "K":
values = convert_units(values, "K", "Celsius")

# Coarsify images
threshold = 200 * 200 # Chosen since TMA WRF is 199 x 199
if values.size > threshold:
fraction = 0.25
else:
fraction = 1.
lons, lats, values = coarsify(
lons, lats, values, fraction)

# Roll input data into [-180, 180] range
if np.any(lons > 180.0):
shift_by = np.sum(lons > 180.0)
lons[lons > 180.0] -= 360.
lons = np.roll(lons, shift_by)
values = np.roll(values, shift_by, axis=1)

image = geo.stretch_image(lons, lats, values)
IMAGES[key] = image
return image


def _load_cube(path, variable, pts_3d, pts_4d):
import iris
cube = iris.load_cube(path, iris.Constraint(variable))
units = cube.units
lons = cube.coord('longitude').points
if lons.ndim == 2:
lons = lons[0, :]
lats = cube.coord('latitude').points
if lons.ndim == 2:
lats = lats[:, 0]
if cube.data.ndim == 4:
values = cube.data[pts_4d]
else:
values = cube.data[pts_3d]
return lons, lats, values, units


def _load_netcdf4(path, variable, pts_3d, pts_4d):
with netCDF4.Dataset(path) as dataset:
try:
var = dataset.variables[variable]
except KeyError as e:
if variable == "precipitation_flux":
var = dataset.variables["stratiform_rainfall_rate"]
else:
raise e
for d in var.dimensions:
if "longitude" in d:
lons = dataset.variables[d][:]
if "latitude" in d:
lats = dataset.variables[d][:]
if len(var.dimensions) == 4:
values = var[pts_4d]
else:
values = var[pts_3d]
units = var.units
return lons, lats, values, units


def convert_units(values, old_unit, new_unit):
if isinstance(values, list):
values = np.asarray(values)
return cf_units.Unit(old_unit).convert(values, new_unit)
2 changes: 1 addition & 1 deletion forest/db/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def initial_state(navigator, pattern=None):

@export
def reducer(state, action):
state = copy.copy(state)
state = copy.deepcopy(state)
kind = action["kind"]
if kind == SET_VALUE:
payload = action["payload"]
Expand Down
121 changes: 89 additions & 32 deletions forest/drivers/unified_model.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import xarray
from functools import lru_cache
import os
import glob
import re
import fnmatch
import datetime as dt
import numpy as np
import netCDF4
import forest.util
from forest import (
db,
disk,
geo,
view)
from forest.data import load_image_pts
from forest.exceptions import SearchFail, PressuresNotFound
from forest.drivers import gridded_forecast
from forest.drivers import gridded_forecast
try:
import iris
except ImportError:
Expand Down Expand Up @@ -99,45 +102,36 @@ def __init__(self, name, pattern, locator):
def image(self, state):
if not self.valid(state):
return gridded_forecast.empty_image()
data = self._input_output(
self.pattern,
state.variable,
state.initial_time,
state.valid_time,
state.pressure)
data.update(gridded_forecast.coordinates(state.valid_time,
state.initial_time,
state.pressures,
state.pressure))
return data

@lru_cache(maxsize=100)
def _input_output(self, pattern, variable, initial_time, valid_time,
pressure):
"""I/O needed to load an image and its metadata"""
try:
path, pts = self.locator.locate(
self.pattern,
state.variable,
state.initial_time,
state.valid_time,
state.pressure)
pattern,
variable,
initial_time,
valid_time,
pressure)
except SearchFail:
return gridded_forecast.empty_image()

units = self.read_units(path, state.variable)
data = load_image_pts(
path,
state.variable,
pts,
pts)
if (len(state.pressures) > 0) and (state.pressure is not None):
level = "{} hPa".format(int(state.pressure))
else:
level = "Surface"
data.update(gridded_forecast.coordinates(state.valid_time,
state.initial_time,
state.pressures,
state.pressure))
data = self.load_image(path, variable, pts)
data["name"] = [self.name]
data["units"] = [units]
return data

@staticmethod
def read_units(filename,parameter):
dataset = netCDF4.Dataset(filename)
veep = dataset.variables[parameter]
# read the units and assign a blank value if there aren't any:
units = getattr(veep, 'units', '')
dataset.close()
return units


def valid(self, state):
if state.variable is None:
return False
Expand All @@ -159,6 +153,69 @@ def has_pressure(self, pressures, pressure, tolerance=0.01):
pressures = np.array(pressures)
return any(np.abs(pressures - pressure) < tolerance)

@classmethod
def load_image(cls, path, variable, pts):
"""Load bokeh image glyph data from file using slices"""
try:
lons, lats, values, units = cls._load_xarray(path, variable, pts)
except:
lons, lats, values, units = cls._load_cube(path, variable, pts)

# Units
if variable in ["precipitation_flux", "stratiform_rainfall_rate"]:
if units == "mm h-1":
values = values
else:
values = forest.util.convert_units(values, units, "kg m-2 hour-1")
units = "kg m-2 hour-1"
elif units == "K":
values = forest.util.convert_units(values, "K", "Celsius")
units = "C"

# Coarsify images
threshold = 200 * 200 # Chosen since TMA WRF is 199 x 199
if values.size > threshold:
fraction = 0.25
else:
fraction = 1.
lons, lats, values = forest.util.coarsify(
lons, lats, values, fraction)

# Roll input data into [-180, 180] range
if np.any(lons > 180.0):
shift_by = np.sum(lons > 180.0)
lons[lons > 180.0] -= 360.
lons = np.roll(lons, shift_by)
values = np.roll(values, shift_by, axis=1)

data = geo.stretch_image(lons, lats, values)
data["units"] = [units]
return data

@staticmethod
def _load_xarray(path, variable, pts):
with xarray.open_dataset(path, engine="h5netcdf") as nc:
data_array = nc[variable][pts]
lons = np.ma.masked_invalid(data_array.longitude)
lats = np.ma.masked_invalid(data_array.latitude)
values = np.ma.masked_invalid(data_array)
units = getattr(data_array, 'units', '')
return lons, lats, values, units

@staticmethod
def _load_cube(path, variable, pts):
# TODO: Is this method still needed?
cube = iris.load_cube(path, iris.Constraint(variable))
units = cube.units
lons = cube.coord('longitude').points
if lons.ndim == 2:
lons = lons[0, :]
lats = cube.coord('latitude').points
if lons.ndim == 2:
lats = lats[:, 0]
values = cube.data[pts]
return lons, lats, values, units


class Locator(object):
def __init__(self, paths):
Expand Down
Loading

0 comments on commit 51eb7f8

Please sign in to comment.