Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize locking in katalogus.py, reuse available data #3752

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 59 additions & 53 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,91 +43,97 @@ def __init__(self, host: str, source: str, timeout: int, pool_connections: int,
self.flush_caches()

def flush_caches(self) -> None:
self.flush_plugin_cache()
self.flush_normalizer_cache()
self.flush_boefje_cache()

def flush_plugin_cache(self) -> None:
plugins = self.flush_plugin_cache()
self.flush_boefje_cache(plugins)
self.flush_normalizer_cache(plugins)
def flush_plugin_cache(self) -> dict[str, dict]:
self.logger.debug("Flushing the katalogus plugin cache for organisations")

orgs = self.get_organisations()
plugin_cache = {}
boefjes_cache = {}
underdarknl marked this conversation as resolved.
Show resolved Hide resolved
for org in orgs:
plugin_cache.setdefault(org.id, {})
plugins = self.get_plugins_by_organisation(org.id)
plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

with self.plugin_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.plugin_cache.expiration_enabled = False
self.plugin_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

self.plugin_cache = plugin_cache
self.plugin_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus plugin cache for organisations")
return plugins
underdarknl marked this conversation as resolved.
Show resolved Hide resolved

def flush_boefje_cache(self) -> None:
def flush_boefje_cache(self, plugins=None) -> None:
"""boefje.consumes -> plugin type boefje"""
self.logger.debug("Flushing the katalogus boefje type cache for organisations")

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()
orgs = self.get_organisations()
boefjes_cache = {}

orgs = self.get_organisations()
for org in orgs:
self.boefje_cache[org.id] = {}
for org in orgs:
boefje_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue
for plugin in plugins or self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue

if plugin.enabled is False:
continue
if plugin.enabled is False:
continue

if not plugin.consumes:
continue
if not plugin.consumes:
continue

# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue
# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue

for type_ in plugin.consumes:
self.boefje_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
boefje_cache[org.id].setdefault(type_, []).append(plugin)

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()
self.boefje_cache = boefje_cache
self.boefje_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus boefje type cache for organisations")

def flush_normalizer_cache(self) -> None:
def flush_normalizer_cache(self, plugins=None) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("Flushing the katalogus normalizer type cache for organisations")

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
orgs = self.get_organisations()
normalizer_cache = {}

orgs = self.get_organisations()
for org in orgs:
self.normalizer_cache[org.id] = {}
for org in orgs:
self.normalizer_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue
for plugin in plugins or self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue

if plugin.enabled is False:
continue
if plugin.enabled is False:
continue

if not plugin.consumes:
continue
if not plugin.consumes:
continue

for type_ in plugin.consumes:
self.normalizer_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
normalizer_cache[org.id].setdefault(type_, []).append(plugin)

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
self.normalizer_cache = normalizer_cache
self.normalizer_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus normalizer type cache for organisations")
Expand Down
Loading