Skip to content

Commit

Permalink
Merge pull request #157 from jklynch/redis-md-fix
Browse files Browse the repository at this point in the history
Improvements to RunEngineRedisDict
  • Loading branch information
danielballan authored Jul 19, 2022
2 parents f05f135 + bda4e77 commit aef010b
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 59 deletions.
137 changes: 100 additions & 37 deletions nslsii/md_dict.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import re
from collections import ChainMap, UserDict
from pprint import pformat
from uuid import uuid4

import msgpack
Expand All @@ -14,6 +14,24 @@


class RunEngineRedisDict(UserDict):
"""
A class for storing RunEngine metadata added to RE.md on a Redis server.
This class has two strong ideas about the metadata it manages:
1. Some key-values are considered "global" or "facility-wide". These
are in use at all NSLS-II bluesky beamlines and include
proposal_id, data_session, cycle, SAF, and scan_id. The "global"
key-values are stored as Redis key-values. Redis does not support
numeric types, so the RunEngineRedisDict also keeps track of the
types of the "global" key-values. The intention is that this
metadata is accessible by any Redis client.
2. Non-global, or beamline-specific, metadata is stored in Redis as
a msgpack-ed blob. This means data type conversion between Redis
and Python is handled by msgpack, including numpy arrays.
The drawback is that this "local" metadata key-values are not
directly readable or writeable by Redis clients.
"""

PACKED_RUNENGINE_METADATA_KEY = "runengine-metadata-blob"

def __init__(
Expand All @@ -23,6 +41,7 @@ def __init__(
db=0,
re_md_channel_name="runengine-metadata",
global_keys=None,
global_values_types=None,
):
# send no initial data to UserDict.__init__
# since we will replace UserDict.data entirely
Expand All @@ -34,12 +53,13 @@ def __init__(
self._re_md_channel_name = re_md_channel_name
self._uuid = str(uuid4())

redis_dict_log.info(f"connecting to Redis at %s:%s", self._host, self._port)
redis_dict_log.info("connecting to Redis at %s:%s", self._host, self._port)
# global metadata will be stored as Redis key-value pairs
# tell the global Redis client to do bytes-to-str conversion
# tell the global metadata Redis client to do bytes-to-str conversion
self._redis_global_client = redis.Redis(
host=host, port=port, db=db, decode_responses=True
)
# ping() will raise redis.exceptions.ConnectionError on failure
self._redis_global_client.ping()

# local metadata will be msgpack-ed, so decoding
Expand All @@ -48,7 +68,6 @@ def __init__(
self._redis_local_client = redis.Redis(
host=host, port=port, db=db, decode_responses=False
)
# ping() will raise redis.exceptions.ConnectionError on failure
self._redis_local_client.ping()

if global_keys is None:
Expand All @@ -61,33 +80,45 @@ def __init__(
"SAF",
"scan_id",
)
else:
self._global_keys = global_keys

if global_values_types is None:
# remember numeric types for global metadata
# global metadata keys not specified here will default to str
self._global_values_types = {"scan_id": int}
else:
self._global_values_types = global_values_types

# is local metadata already in redis?
packed_local_md = self._redis_local_client.get(
self.PACKED_RUNENGINE_METADATA_KEY
)
if packed_local_md is None:
redis_dict_log.info(f"no local metadata found in Redis")
redis_dict_log.info("no local metadata found in Redis")
self._local_md = dict()
self._set_local_metadata_on_server()
else:
redis_dict_log.info(f"unpacking local metadata from Redis")
redis_dict_log.info("unpacking local metadata from Redis")
self._local_md = self._get_local_metadata_from_server()
redis_dict_log.debug(f"unpacked local metadata:\n%s", self._local_md)
redis_dict_log.debug("unpacked local metadata:\n%s", self._local_md)

# what if the global keys do not exist?
# could get all Redis keys and exclude the local md blob key ?
self._global_md = dict()
for global_key in self._global_keys:
global_value = self._redis_global_client.get(global_key)
if global_value is None:
redis_dict_log.info(f"no value yet for global key {global_key}")
self._redis_global_client.set(name=global_key, value=global_key)
self._global_md[global_key] = global_value
redis_dict_log.info(f"global metadata:\n{pformat(self._global_md)}")
# if a global key does not exist on the Redis server
# then it will not exist in the RunEngineRedisDict
redis_dict_log.info("no value yet for global key %s", global_key)
else:
if global_key in self._global_values_types:
global_value = self._global_values_types[global_key](global_value)
self._global_md[global_key] = global_value
redis_dict_log.info("global metadata: %s", self._global_md)

# keep in mind _local_md is the first map in this ChainMap
# for when _local_md has to be replaced
# when self._local_md has to be replaced with the metadata
# blob in Redis we must be careful to replace the first
# dict in the ChainMap's list of mappings
self.data = ChainMap(self._local_md, self._global_md)

# Redis documentation says do not issue commands from
Expand All @@ -99,18 +130,34 @@ def __init__(
ignore_subscribe_messages=True
)

# register _update_on_message to handle Redis messages
# register self._handle_update_message to handle Redis messages
# this is how the RunEngineMetadataDict knows a key-value
# has been modified on the Redis server, and therefore
# self._local_md must be updated from the server
self._redis_pubsub.subscribe(
**{self._re_md_channel_name: self._update_on_message}
**{self._re_md_channel_name: self._handle_update_message}
)
# start a thread to pass messages to _update_on_message
self._update_on_message_thread = self._redis_pubsub.run_in_thread(
sleep_time=0.01, daemon=True
)

def __setitem__(self, key, value):
if key in self._global_md:
if key in self._global_keys:
# can't rely on self._global_md for this check because
# if global metadata is not in Redis it is not added to self._global_md
redis_dict_log.debug("setting global metadata %s:%s", key, value)
# global metadata may be constrained to be of a certain type
# check that value does not violate the type expected for key
expected_value_type = self._global_values_types.get(key, str)
if isinstance(value, expected_value_type):
# everything is good
pass
else:
raise ValueError(
f"expected value for key '{key}' to have type '{expected_value_type}'"
f"but '{value}' has type '{type(value)}'"
)
# update the global key-value pair explicitly in self._global_md
# because it can not be updated through the self.data ChainMap
# since self._global_md is not the first dictionary in that ChainMap
Expand All @@ -129,7 +176,7 @@ def __setitem__(self, key, value):

# tell subscribers a key-value has changed
redis_dict_log.debug("publishing update %s:%s", key, value)
self._publish_metadata_update(key)
self._publish_metadata_update_message(key)

def __delitem__(self, key):
if key in self._global_keys:
Expand All @@ -139,9 +186,9 @@ def __delitem__(self, key):
self._set_local_metadata_on_server()

# tell everyone a (local) key-value has been changed
self._publish_metadata_update(key)
self._publish_metadata_update_message(key)

def _publish_metadata_update(self, key):
def _publish_metadata_update_message(self, key):
"""
Publish a message that includes the updated key and
the identifying UUID for this RunEngineRedisDict.
Expand All @@ -162,38 +209,54 @@ def _set_local_metadata_on_server(self):
self.PACKED_RUNENGINE_METADATA_KEY, self._pack(self._local_md)
)

@staticmethod
def _parse_message_data(message):
_message_data_pattern = re.compile(r"^(?P<key>.+):(?P<uuid>.+)$")

@classmethod
def _parse_message_data(klass, message):
"""
The message parameter looks like this
b"abd:39f1f7fa-aeef-4d83-a802-c1c7f5ff5cb8"
message["data"] should look like this
b"abc:39f1f7fa-aeef-4d83-a802-c1c7f5ff5cb8"
Splitting the message on ":" gives the updated key
and the UUID of the RunEngineRedisDict that made
the update.
("abc" in this example) and the UUID of the RunEngineRedisDict
that made the update. The UUID is used to determine if
the update message came from this RunEngineRedisDict, in
which case it is not necessary to update the local metadata
from the Redis server.
"""
message_key, publisher_uuid = message["data"].rsplit(b":", maxsplit=1)
return message_key.decode(), publisher_uuid.decode()
decoded_message_data = message["data"].decode()
message_data_match = klass._message_data_pattern.match(decoded_message_data)

def _update_on_message(self, message):
if message_data_match is None:
raise ValueError(
f"message[data]=`{decoded_message_data}` could not be parsed"
)
return message_data_match.group("key"), message_data_match.group("uuid")

def _handle_update_message(self, message):
redis_dict_log.debug("_update_on_message: %s", message)
updated_key, publisher_uuid = self._parse_message_data(message)
if publisher_uuid == self._uuid:
# this RunEngineRedisDict is the source of this update message,
# so there is no need to go to the Redis server for the new metadata
redis_dict_log.debug("update published by me!")
pass
elif updated_key in self._global_keys:
redis_dict_log.debug("updated key belongs to global metadata")
# we can assume the updated_key is not a new key
# get the key from the Redis database
self._global_md[updated_key] = self._redis_global_client.get(
name=updated_key
)
# because the updated_key belongs to "global" metadata
# we can assume it is not a new or deleted key, so just
# get the key's value from the Redis database and convert
# its type if necessary (eg, from string to int)
updated_value = self._redis_global_client.get(name=updated_key)
if updated_key in self._global_values_types:
updated_value = self._global_values_types[updated_key](updated_value)
self._global_md[updated_key] = updated_value
else:
redis_dict_log.debug("updated key belongs to local metadata")
# the updated key belongs to local metadata
# it may be a newly added or deleted key, so
# we have to update the entire local metadata dictionary
self._local_md = self._get_local_metadata_from_server()
# update the ChainMap
# update the ChainMap - "local" metadata is always the
# first element in ChainMap.maps
self.data.maps[0] = self._local_md

@staticmethod
Expand Down
38 changes: 29 additions & 9 deletions nslsii/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from contextlib import contextmanager
from contextlib import contextmanager # noqa

import redis

import pytest

from bluesky.tests.conftest import RE # noqa
from bluesky_kafka import BlueskyConsumer
from bluesky_kafka.tests.conftest import (
from bluesky_kafka import BlueskyConsumer # noqa
from bluesky_kafka.tests.conftest import ( # noqa
pytest_addoption,
kafka_bootstrap_servers,
broker_authorization_config,
consume_documents_from_kafka_until_first_stop_document,
temporary_topics,
) # noqa
)
from ophyd.tests.conftest import hw # noqa

from nslsii.md_dict import RunEngineRedisDict
Expand All @@ -23,13 +23,33 @@ def redis_dict_factory():
"""
Return a "fixture as a factory" that will build identical RunEngineRedisDicts.
Before the factory is returned, the Redis server will be cleared.
The factory builds only RunEngineRedisDict instances for a Redis server running
on localhost:6379, db=0.
If "host", "port", or "db" are specified as kwargs to the factory function
an exception will be raised.
"""
redis_client = redis.Redis(host="localhost", port=6379, db=0)
redis_client.flushdb()
redis_server_kwargs = {
"host": "localhost",
"port": 6379,
"db": 0,
}

def _factory(re_md_channel_name):
return RunEngineRedisDict(host="localhost", port=6379, db=0, re_md_channel_name=re_md_channel_name)
redis_client = redis.Redis(**redis_server_kwargs)
redis_client.flushdb()

return _factory
def _factory(**kwargs):
disallowed_kwargs_preset = set(redis_server_kwargs.keys()).intersection(
kwargs.keys()
)
if len(disallowed_kwargs_preset) > 0:
raise KeyError(
f"{disallowed_kwargs_preset} given, but 'host', 'port', and 'db' may not be specified"
)
else:
kwargs.update(redis_server_kwargs)

return RunEngineRedisDict(**kwargs)

return _factory
Loading

0 comments on commit aef010b

Please sign in to comment.