Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize locking in katalogus.py, reuse available data #3752

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 57 additions & 52 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,90 +44,95 @@ def __init__(self, host: str, source: str, timeout: int, pool_connections: int,

def flush_caches(self) -> None:
self.flush_plugin_cache()
self.flush_normalizer_cache()
self.flush_boefje_cache()
self.flush_boefje_cache(self.plugin_cache)
self.flush_normalizer_cache(self.plugin_cache)

def flush_plugin_cache(self) -> None:
def flush_plugin_cache(self):
self.logger.debug("Flushing the katalogus plugin cache for organisations")

plugin_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

with self.plugin_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.plugin_cache.expiration_enabled = False
self.plugin_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

self.plugin_cache.cache = plugin_cache
self.plugin_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus plugin cache for organisations")

def flush_boefje_cache(self) -> None:
def flush_boefje_cache(self, plugins=None) -> None:
"""boefje.consumes -> plugin type boefje"""
self.logger.debug("Flushing the katalogus boefje type cache for organisations")

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.boefje_cache[org.id] = {}
boefje_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
boefje_cache.setdefault(org.id, {})

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue
org_plugins = plugins[org.id].values() if plugins else self.get_plugins_by_organisation(org.id)
for plugin in org_plugins:
if plugin.type != "boefje":
continue

if plugin.enabled is False:
continue
if plugin.enabled is False:
continue

if not plugin.consumes:
continue
if not plugin.consumes:
continue

# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue
# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue

for type_ in plugin.consumes:
self.boefje_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
boefje_cache[org.id].setdefault(type_, []).append(plugin)

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()
self.boefje_cache.cache = boefje_cache
self.boefje_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus boefje type cache for organisations")

def flush_normalizer_cache(self) -> None:
def flush_normalizer_cache(self, plugins=None) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("Flushing the katalogus normalizer type cache for organisations")

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
normalizer_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
normalizer_cache.setdefault(org.id, {})

orgs = self.get_organisations()
for org in orgs:
self.normalizer_cache[org.id] = {}
org_plugins = plugins[org.id].values() if plugins else self.get_plugins_by_organisation(org.id)
for plugin in org_plugins:
if plugin.type != "normalizer":
continue

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue
if plugin.enabled is False:
continue

if plugin.enabled is False:
continue
if not plugin.consumes:
continue

if not plugin.consumes:
continue

for type_ in plugin.consumes:
self.normalizer_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
normalizer_cache[org.id].setdefault(type_, []).append(plugin)

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
self.normalizer_cache.cache = normalizer_cache
self.normalizer_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus normalizer type cache for organisations")
Expand Down