Skip to content

Commit

Permalink
fix: Never evict producers from cache (#28)
Browse files Browse the repository at this point in the history
This should close out #16
by just embracing the use of multiple producers.

Also:

- Improve cache-clearing during tests by connecting to the setting_changed
  signal -- this allows knowledge of what's cached to be local to the
  module where the caching happens, rather than relying on each test module
  to know about all of the modules it depends on indirectly.
- Remove outdated comment about caching, and clarify other caching
  comments.
  • Loading branch information
timmc-edx authored Aug 24, 2022
1 parent cfb9710 commit b2b0405
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 13 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Unreleased

*

[0.4.3] - 2022-08-24
********************

Fixed
=====

* Never evict producers from cache. There wasn't a real risk of this, but now we can rely on them being long-lived. Addresses remainder of `<https://github.com/openedx/event-bus-kafka/issues/16>`__.

[0.4.2] - 2022-08-24
********************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Kafka implementation for Open edX event bus.
"""

__version__ = '0.4.2'
__version__ = '0.4.3'
31 changes: 24 additions & 7 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from functools import lru_cache
from typing import Any, List

from django.dispatch import receiver
from django.test.signals import setting_changed
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

Expand Down Expand Up @@ -115,7 +117,7 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
"""
Get the serializer for a signal.
This is just defined to allow caching of serializers.
This is cached in order to save work re-transforming classes into Avro schemas.
"""
return AvroSignalSerializer(signal)

Expand All @@ -124,10 +126,23 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producers to be long-lived.
#
# We are also likely to need to iterate through this cache at server
# shutdown in order to flush each of the producers, which means the
# cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11
# for more details.
#
# (Why not change the code to use a single Producer rather than multiple
# SerializerProducer? Because the code actually turns out to be significantly
# uglier that way due to the number of separate values that need to be passed
# around in bundles. There aren't clear "cut-off" points. Additionally, it
# makes unit testing harder/uglier since now the mocks need to either deal with
# serialized bytes or mock out the serializers. Getting this down to a single
# Producer doesn't really seem worth the trouble.)

# return type (Optional[SerializingProducer]) removed from signature to avoid error on import

@lru_cache
@lru_cache(maxsize=None) # Never evict an entry -- it's a small set and we need to keep all of them.
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str):
"""
Create the producer for a signal and a key field path.
Expand All @@ -141,11 +156,6 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str):
Returns:
None if confluent_kafka is not defined or the settings are invalid.
SerializingProducer if it is.
Performance note:
This could be cached, but requires care such that it allows changes to settings via
remote-config (and in particular does not result in mixed cache/uncached configuration).
This complexity is being deferred until this becomes a performance issue.
"""
if not confluent_kafka: # pragma: no cover
logger.warning('Library confluent-kafka not available. Cannot create event producer.')
Expand Down Expand Up @@ -245,3 +255,10 @@ def send_to_event_bus(
#
# Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079
producer.poll(0)


@receiver(setting_changed)
def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches during testing when settings change."""
get_serializer.cache_clear()
get_producer_for_signal.cache_clear()
5 changes: 0 additions & 5 deletions edx_event_bus_kafka/publishing/test_event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
class TestEventProducer(TestCase):
"""Test producer."""

def setUp(self):
super().setUp()
ep.get_producer_for_signal.cache_clear()
ep.get_serializer.cache_clear()

def test_extract_event_key(self):
event_data = {
'user': UserData(
Expand Down

0 comments on commit b2b0405

Please sign in to comment.