diff --git a/pioreactor/actions/leader/backup_database.py b/pioreactor/actions/leader/backup_database.py index a91c62da..d35e917d 100644 --- a/pioreactor/actions/leader/backup_database.py +++ b/pioreactor/actions/leader/backup_database.py @@ -7,7 +7,7 @@ from pioreactor.config import config from pioreactor.exc import RsyncError from pioreactor.logging import create_logger -from pioreactor.pubsub import subscribe +from pioreactor.utils import local_intermittent_storage from pioreactor.utils import local_persistant_storage from pioreactor.utils import managed_lifecycle from pioreactor.utils.networking import resolve_to_address @@ -18,15 +18,8 @@ def count_writes_occurring(unit: str) -> int: - msg_or_none = subscribe( - f"pioreactor/{unit}/{UNIVERSAL_EXPERIMENT}/mqtt_to_db_streaming/inserts_in_last_60s", - timeout=2, - ) - if msg_or_none is not None: - count = int(msg_or_none.payload.decode()) - else: - count = 0 - return count + with local_intermittent_storage("mqtt_to_db_streaming") as c: + return c.get("local_intermittent_cache", 0) def backup_database(output_file: str, force: bool = False, backup_to_workers: int = 0) -> None: diff --git a/pioreactor/background_jobs/leader/mqtt_to_db_streaming.py b/pioreactor/background_jobs/leader/mqtt_to_db_streaming.py index 95df7fb3..bb276209 100644 --- a/pioreactor/background_jobs/leader/mqtt_to_db_streaming.py +++ b/pioreactor/background_jobs/leader/mqtt_to_db_streaming.py @@ -18,6 +18,7 @@ from pioreactor.config import config from pioreactor.hardware import PWM_TO_PIN from pioreactor.pubsub import QOS +from pioreactor.utils import local_intermittent_storage from pioreactor.utils.sqlite_worker import Sqlite3Worker from pioreactor.utils.timing import current_utc_datetime from pioreactor.utils.timing import RepeatedTimer @@ -59,11 +60,6 @@ class TopicToCallback(Struct): class MqttToDBStreamer(LongRunningBackgroundJob): job_name = "mqtt_to_db_streaming" - published_settings = { - "inserts_in_last_60s": {"datatype": "integer", "settable": False}, - } - - inserts_in_last_60s = 0 _inserts_in_last_60s = 0 def __init__( @@ -92,7 +88,9 @@ def __init__( self.initialize_callbacks(topics_and_callbacks) def publish_stats(self) -> None: - self.inserts_in_last_60s = self._inserts_in_last_60s + with local_intermittent_storage(self.job_name) as c: + c["inserts_in_last_60s"] = self._inserts_in_last_60s + self._inserts_in_last_60s = 0 def on_disconnected(self) -> None: