Skip to content

Commit

Permalink
Build state tracker (#2074)
Browse files Browse the repository at this point in the history
Define a SQLAlchemy DB class "Dataset" which defines persistent state for a
Pbench dataset including most importantly the dataset's username (which is not
otherwise recorded until we index) and the dataset's current state so that we
can track the progress of a dataset through the Pbench server pipeline.

We also support Metadata associated with each Dataset, describing additional
information about datasets beyond the "state". For example, the backup
component will mark a dataset as "ARCHIVED" and pbench-reindex marks the
selected datasets to "REINDEX".

A new dataset starts in UPLOADING state and will progress through the steps as
we perform operations on it. We're defining both "in progress" -ing steps such
as INDEXING as well as completion steps (which are ready for the next
operation) such as INDEXED. There are also two "terminal" states, EXPIRED and
QUARANTINED, from which a dataset cannot exit.
  • Loading branch information
dbutenhof authored Mar 11, 2021
1 parent 6f41da4 commit ee28a87
Show file tree
Hide file tree
Showing 116 changed files with 1,801 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ build
dist
*~
????-*.patch
.env
.npmrc
.yarnrc
*.egg-info
Expand Down
1 change: 1 addition & 0 deletions jenkins/development.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ RUN \
rsync \
screen \
sos \
sqlite \
tar \
xz \
&& \
Expand Down
19 changes: 17 additions & 2 deletions lib/pbench/cli/server/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import sys

from configparser import NoSectionError, NoOptionError
from sqlalchemy_utils import database_exists, create_database

from pbench.common.exceptions import BadConfig, ConfigFileNotSpecified
from pbench.server.api import create_app, get_server_config
from pbench.server.database.database import Database
from pbench.common.logger import get_pbench_logger


def app():
Expand All @@ -24,12 +27,24 @@ def main():
except (ConfigFileNotSpecified, BadConfig) as e:
print(e)
sys.exit(1)
logger = get_pbench_logger(__name__, server_config)
try:
host = str(server_config.get("pbench-server", "bind_host"))
port = str(server_config.get("pbench-server", "bind_port"))
db = str(server_config.get("Postgres", "db_uri"))
workers = str(server_config.get("pbench-server", "workers"))
except (NoOptionError, NoSectionError) as e:
print(f"{__name__}: ERROR: {e.__traceback__}")

# Multiple gunicorn workers will attempt to connect to the DB; rather
# than attempt to synchronize them, detect a missing DB (from the
# postgres URI) and create it here. It's safer to do this here,
# where we're single-threaded.
if not database_exists(db):
logger.info("Postgres DB {} doesn't exist", db)
create_database(db)
logger.info("Created DB {}", db)
Database.init_db(server_config, logger)
except (NoOptionError, NoSectionError):
logger.exception(f"{__name__}: ERROR")
sys.exit(1)

subprocess.run(
Expand Down
1 change: 0 additions & 1 deletion lib/pbench/server/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from pbench.server.database.database import Database
from pbench.server.api.resources.query_apis.query_month_indices import QueryMonthIndices
from pbench.server.api.auth import Auth

from pbench.server.api.resources.users_api import (
RegisterUser,
Login,
Expand Down
Empty file.
23 changes: 23 additions & 0 deletions lib/pbench/server/api/resources/upload_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import tempfile
import hashlib
from pathlib import Path
from http import HTTPStatus

from flask_restful import Resource, abort
from flask import request, jsonify
from werkzeug.utils import secure_filename
from pbench.server.utils import filesize_bytes
from pbench.server.database.models.tracker import Dataset, States


ALLOWED_EXTENSIONS = {"xz"}

Expand Down Expand Up @@ -109,6 +113,20 @@ def put(self, controller):
md5_full_path = Path(path, f"{filename}.md5")
bytes_received = 0

# TODO: Need real user from PUT!

# Create a tracking dataset object; it'll begin in UPLOADING state
try:
dataset = Dataset(controller=controller, path=tar_full_path, md5=md5sum)
dataset.add()
except Exception:
self.logger.exception("unable to create dataset for {}", filename)
abort(
HTTPStatus.INTERNAL_SERVER_ERROR, message="INTERNAL ERROR",
)

self.logger.info("Uploading file {} to {}", filename, dataset)

with tempfile.NamedTemporaryFile(mode="wb", dir=path) as ofp:
chunk_size = 4096
self.logger.debug("Writing chunks")
Expand Down Expand Up @@ -181,6 +199,11 @@ def put(self, controller):
)
raise

try:
dataset.advance(States.UPLOADED)
except Exception:
self.logger.exception("Unable to finalize {}", dataset)
abort(HTTPStatus.INTERNAL_SERVER_ERROR, message="INTERNAL ERROR")
response = jsonify(dict(message="File successfully uploaded"))
response.status_code = 201
return response
Expand Down
12 changes: 6 additions & 6 deletions lib/pbench/server/api/resources/users_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ def post(self):

first_name = user_data.get("first_name")
if not first_name:
self.logger.warning("Missing firstName field")
abort(400, message="Missing firstName field")
self.logger.warning("Missing first_name field")
abort(400, message="Missing first_name field")

last_name = user_data.get("last_name")
if not last_name:
self.logger.warning("Missing lastName field")
abort(400, message="Missing lastName field")
self.logger.warning("Missing last_name field")
abort(400, message="Missing last_name field")

try:
user = User(
Expand Down Expand Up @@ -325,8 +325,8 @@ def get(self, username):
"message": "Success"/"failure message",
"data": {
"username": <username>,
"firstName": <firstName>,
"lastName": <lastName>,
"first_name": <firstName>,
"last_name": <lastName>,
"registered_on": registered_on,
}
}
Expand Down
24 changes: 9 additions & 15 deletions lib/pbench/server/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
class Database:
# Create declarative base model that our model can inherit from
Base = declarative_base()
# Initialize the db scoped session
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False))

@staticmethod
def get_engine_uri(config, logger):
Expand All @@ -21,24 +19,20 @@ def get_engine_uri(config, logger):
)
return None

# return f"postgresql+{psql_driver}://{psql_username}:{psql_password}@{psql_host}:{psql_port}/{psql_db}"

@staticmethod
def init_engine(server_config, logger):
try:
return create_engine(Database.get_engine_uri(server_config, logger))
except Exception:
logger.exception("Exception while creating a sqlalchemy engine")
return None

@staticmethod
def init_db(server_config, logger):
# Attach the logger to the base class for models to find
if not hasattr(Database.Base, "logger"):
Database.Base.logger = logger

# WARNING:
# Make sure all the models are imported before this function gets called
# so that they will be registered properly on the metadata. Otherwise
# metadata will not have any tables and create_all functionality will do nothing

Database.Base.query = Database.db_session.query_property()

engine = create_engine(Database.get_engine_uri(server_config, logger))
Database.Base.metadata.create_all(bind=engine)
Database.db_session.configure(bind=engine)
Database.db_session = scoped_session(
sessionmaker(bind=engine, autocommit=False, autoflush=False)
)
Database.Base.query = Database.db_session.query_property()
Loading

0 comments on commit ee28a87

Please sign in to comment.