Skip to content

Commit

Permalink
Merge pull request #422 from MetOffice/fix-unified-model-oserror-table
Browse files Browse the repository at this point in the history
Fix unified model oserror table
  • Loading branch information
andrewgryan committed Jul 3, 2020
2 parents 8af680a + ecd5adf commit a53b012
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 7 deletions.
2 changes: 1 addition & 1 deletion forest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
.. automodule:: forest.services
"""
__version__ = '0.20.6'
__version__ = '0.20.7'

from .config import *
from . import (
Expand Down
59 changes: 59 additions & 0 deletions forest/db/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
S3 object health status
"""
import sqlite3


class HealthDB:
"""Maintain meta-data related to S3 objects"""
def __init__(self, connection):
self.connection = connection
self.cursor = self.connection.cursor()
self.cursor.execute("""
CREATE TABLE
IF NOT EXISTS health (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
errno INTEGER,
strerror TEXT,
time TEXT,
UNIQUE(name))
""")

@classmethod
def connect(cls, path_or_memory):
"""Connect to sqlite3 database"""
return cls(sqlite3.connect(path_or_memory))

def checked_files(self, pattern):
"""Files that are in the database
:returns files: either successfully processed or marked as OSError
"""
return sorted(set(self.files(pattern)) |
set(self.error_files(pattern)))

def files(self, pattern):
query = "SELECT name FROM file WHERE name GLOB :pattern;"
params = {"pattern": pattern}
return [path for path, in self.cursor.execute(query, params)]

def error_files(self, pattern):
query = "SELECT name FROM health WHERE name GLOB :pattern;"
params = {"pattern": pattern}
return [path for path, in self.cursor.execute(query, params)]

def insert_error(self, path, error, check_time):
"""Insert OSError into table"""
query = """
INSERT OR IGNORE
INTO health (name, errno, strerror, time)
VALUES (:path, :errno, :strerror, :time);
"""
params = {
"path": path,
"errno": error.errno,
"strerror": error.strerror,
"time": check_time.isoformat()
}
self.cursor.execute(query, params)
12 changes: 6 additions & 6 deletions forest/drivers/unified_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import netCDF4
import sqlite3
import forest.db
import forest.db.health
import forest.util
import forest.map_view
from forest import (
Expand Down Expand Up @@ -45,12 +46,9 @@ def __call__(self):

# Find names in database
connection = sqlite3.connect(self.database_path)
cursor = connection.cursor()
query = "SELECT name FROM file WHERE name GLOB :pattern;"
sql_names = []
for row in cursor.execute(query, {"pattern": self.pattern}):
path, = row
sql_names.append(os.path.basename(path))
health_db = forest.db.health.HealthDB(connection)
sql_names = [os.path.basename(path)
for path in health_db.checked_files(self.pattern)]
connection.close()

# Find extra files
Expand All @@ -61,12 +59,14 @@ def __call__(self):
if len(extra_paths) > 0:
print("connecting to: {}".format(self.database_path))
with forest.db.Database.connect(self.database_path) as database:
health_db = forest.db.health.HealthDB(database.connection)
for path in extra_paths:
print("inserting: '{}'".format(path))
try:
database.insert_netcdf(path)
except OSError as e:
# S3 Glacier objects inaccessible via goofys
health_db.insert_error(path, e, dt.datetime.now())
print(e)
print(f"skip file: {path}")
continue
Expand Down
25 changes: 25 additions & 0 deletions test/test_db_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import sqlite3
import datetime as dt
import forest.db
import forest.db.health


def test_db_health_check():
"""Database tables to monitor S3 object availability"""
database = forest.db.Database.connect(":memory:")
database.insert_file_name("file.nc")
pattern = "*.nc"
health_db = forest.db.health.HealthDB(database.connection)
assert health_db.checked_files(pattern) == ["file.nc"]


def test_db_health_check_mark_oserror():
"""Database tables to monitor S3 object availability"""
database = forest.db.Database.connect(":memory:")
database.insert_file_name("file-0.nc")
health_db = forest.db.health.HealthDB(database.connection)
health_db.insert_error("file-1.nc",
OSError("Error message"),
dt.datetime(2020, 1, 1))
pattern = "*.nc"
assert health_db.checked_files(pattern) == ["file-0.nc", "file-1.nc"]

0 comments on commit a53b012

Please sign in to comment.