diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5e69ed7..fb73dfe 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,20 @@ Unreleased * +[0.5.0] - 2022-08-31 +******************** + +Changed +======= + +* **Breaking changes** in the producer module, refactored to expose a better API: + + * Rather than `send_to_event_bus(...)`, relying code should now call `get_producer().send(...)`. + * The `sync` kwarg is gone; to flush and sync messages before shutdown, call `get_producer().prepare_for_shutdown()` instead. + +* Clarify that config module is for internal use only. +* Implementation changes: Only a single Producer is created, and is used for all signals. + [0.4.4] - 2022-08-26 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index fea9846..4f23f3a 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -2,4 +2,4 @@ Kafka implementation for Open edX event bus. """ -__version__ = '0.4.4' +__version__ = '0.5.0' diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index aa1654d..886231d 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -1,11 +1,16 @@ """ Configuration loading and validation. + +This module is for internal use only. """ import warnings +from functools import lru_cache from typing import Optional from django.conf import settings +from django.dispatch import receiver +from django.test.signals import setting_changed # See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst try: @@ -16,10 +21,15 @@ # return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import -def create_schema_registry_client(): +@lru_cache # will just be one cache entry, in practice +def get_schema_registry_client(): """ Create a schema registry client from common settings. + This is cached on the assumption of a performance benefit (avoid reloading settings and + reconstructing client) but it may also be that the client keeps around long-lived + connections that we could benefit from. + Returns None if confluent_kafka library is not available or the settings are invalid. SchemaRegistryClient if it is. @@ -69,3 +79,9 @@ def load_common_settings() -> Optional[dict]: }) return base_settings + + +@receiver(setting_changed) +def _reset_state(sender, **kwargs): # pylint: disable=unused-argument + """Reset caches when settings change during unit tests.""" + get_schema_registry_client.cache_clear() diff --git a/edx_event_bus_kafka/consumer/event_consumer.py b/edx_event_bus_kafka/consumer/event_consumer.py index 0a4c285..695f21d 100644 --- a/edx_event_bus_kafka/consumer/event_consumer.py +++ b/edx_event_bus_kafka/consumer/event_consumer.py @@ -13,7 +13,7 @@ from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED from openedx_events.tooling import OpenEdxPublicSignal -from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings +from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings logger = logging.getLogger(__name__) @@ -68,7 +68,7 @@ def _create_consumer(self): DeserializingConsumer if it is. """ - schema_registry_client = create_schema_registry_client() + schema_registry_client = get_schema_registry_client() # TODO (EventBus): # 1. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset) diff --git a/edx_event_bus_kafka/management/commands/produce_event.py b/edx_event_bus_kafka/management/commands/produce_event.py index 162cf2c..92fa058 100644 --- a/edx_event_bus_kafka/management/commands/produce_event.py +++ b/edx_event_bus_kafka/management/commands/produce_event.py @@ -9,7 +9,7 @@ from django.utils.module_loading import import_string from openedx_events.tooling import OpenEdxPublicSignal -from edx_event_bus_kafka.publishing.event_producer import send_to_event_bus +from edx_event_bus_kafka.publishing.event_producer import get_producer logger = logging.getLogger(__name__) @@ -53,12 +53,13 @@ def add_arguments(self, parser): def handle(self, *args, **options): try: - send_to_event_bus( + producer = get_producer() + producer.send( signal=import_string(options['signal'][0]), topic=options['topic'][0], event_key_field=options['key_field'][0], event_data=json.loads(options['data'][0]), - sync=True, # otherwise command may exit before delivery is complete ) + producer.prepare_for_shutdown() # otherwise command may exit before delivery is complete except Exception: # pylint: disable=broad-except logger.exception("Error producing Kafka event") diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 03785f3..4503d10 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -1,28 +1,29 @@ """ Produce Kafka events from signals. -Main function is ``send_to_event_bus``. +Main function is ``get_producer()``. """ import json import logging from functools import lru_cache -from typing import Any, List +from typing import Any, List, Optional from django.dispatch import receiver from django.test.signals import setting_changed from openedx_events.event_bus.avro.serializer import AvroSignalSerializer from openedx_events.tooling import OpenEdxPublicSignal -from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings +from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings logger = logging.getLogger(__name__) # See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst try: import confluent_kafka - from confluent_kafka import SerializingProducer + from confluent_kafka import Producer from confluent_kafka.schema_registry.avro import AvroSerializer + from confluent_kafka.serialization import MessageField, SerializationContext except ImportError: # pragma: no cover confluent_kafka = None @@ -113,63 +114,25 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: @lru_cache -def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: +def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str): """ - Get the serializer for a signal. + Get the key and value serializers for a signal and a key field path. This is cached in order to save work re-transforming classes into Avro schemas. - """ - return AvroSignalSerializer(signal) - - -# Note: This caching is required, since otherwise the Producer will -# fall out of scope and be garbage-collected, destroying the -# outbound-message queue and threads. The use of this cache allows the -# producers to be long-lived. -# -# We are also likely to need to iterate through this cache at server -# shutdown in order to flush each of the producers, which means the -# cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11 -# for more details. -# -# (Why not change the code to use a single Producer rather than multiple -# SerializerProducer? Because the code actually turns out to be significantly -# uglier that way due to the number of separate values that need to be passed -# around in bundles. There aren't clear "cut-off" points. Additionally, it -# makes unit testing harder/uglier since now the mocks need to either deal with -# serialized bytes or mock out the serializers. Getting this down to a single -# Producer doesn't really seem worth the trouble.) - -# return type (Optional[SerializingProducer]) removed from signature to avoid error on import - -@lru_cache(maxsize=None) # Never evict an entry -- it's a small set and we need to keep all of them. -def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str): - """ - Create the producer for a signal and a key field path. - - If essential settings are missing or invalid, warn and return None. Arguments: - signal: The OpenEdxPublicSignal to make a producer for - event_key_field: Path to the event data field to use as the event key (period-delimited - string naming the dictionary keys to descend) + signal: The OpenEdxPublicSignal to make a serializer for. + event_key_field: Path to descend in the signal schema to find the subschema for the key + (period-delimited string naming the field names to descend). + Returns: - None if confluent_kafka is not defined or the settings are invalid. - SerializingProducer if it is. + 2-tuple of AvroSignalSerializers, for event key and value """ - if not confluent_kafka: # pragma: no cover - logger.warning('Library confluent-kafka not available. Cannot create event producer.') - return None + client = get_schema_registry_client() + if client is None: + raise Exception('Cannot create Kafka serializers -- missing library or settings') - schema_registry_client = create_schema_registry_client() - if schema_registry_client is None: - return None - - producer_settings = load_common_settings() - if producer_settings is None: - return None - - signal_serializer = get_serializer(signal) + signal_serializer = AvroSignalSerializer(signal) def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument """Tells Avro how to turn objects into dictionaries.""" @@ -178,21 +141,95 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument # Serializers for key and value components of Kafka event key_serializer = AvroSerializer( schema_str=extract_key_schema(signal_serializer, event_key_field), - schema_registry_client=schema_registry_client, + schema_registry_client=client, to_dict=inner_to_dict, ) value_serializer = AvroSerializer( schema_str=signal_serializer.schema_string(), - schema_registry_client=schema_registry_client, + schema_registry_client=client, to_dict=inner_to_dict, ) - producer_settings.update({ - 'key.serializer': key_serializer, - 'value.serializer': value_serializer, - }) + return key_serializer, value_serializer + + +class EventProducerKafka(): + """ + API singleton for event production to Kafka. + + This is just a wrapper around a confluent_kafka Producer that knows how to + serialize a signal to event wire format. + + Only one instance (of Producer or this wrapper) should be created, + since it is stateful and needs lifecycle management. + """ + + def __init__(self, producer): + self.producer = producer + + def send( + self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, + ) -> None: + """ + Send a signal event to the event bus under the specified topic. + + Arguments: + signal: The original OpenEdxPublicSignal the event was sent to + topic: The event bus topic for the event + event_key_field: Path to the event data field to use as the event key (period-delimited + string naming the dictionary keys to descend) + event_data: The event data (kwargs) sent to the signal + """ + event_key = extract_event_key(event_data, event_key_field) + headers = {EVENT_TYPE_HEADER_KEY: signal.event_type} + + key_serializer, value_serializer = get_serializers(signal, event_key_field) + key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers)) + value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers)) + + self.producer.produce( + topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver, + ) + + # Opportunistically ensure any pending callbacks from recent event-sends are triggered. + # + # This assumes events come regularly, or that we're not concerned about + # high latency between delivery and callback. If those assumptions are + # false, we should switch to calling poll(1.0) or similar in a loop on + # a separate thread. Or do both. + # + # Issue: https://github.com/openedx/event-bus-kafka/issues/31 + self.producer.poll(0) + + def prepare_for_shutdown(self): + """ + Prepare producer for a clean shutdown. - return SerializingProducer(producer_settings) + Flush pending outbound events, wait for acknowledgement, and process callbacks. + """ + self.producer.flush(-1) + + +# Note: This caching is required, since otherwise the Producer will +# fall out of scope and be garbage-collected, destroying the +# outbound-message queue and threads. The use of this cache allows the +# producer to be long-lived. +@lru_cache # will just be one cache entry, in practice +def get_producer() -> Optional[EventProducerKafka]: + """ + Create or retrieve Producer API singleton. + + If confluent-kafka library or essential settings are missing, warn and return None. + """ + if not confluent_kafka: # pragma: no cover + logger.warning('Library confluent-kafka not available. Cannot create event producer.') + return None + + producer_settings = load_common_settings() + if producer_settings is None: + return None + + return EventProducerKafka(Producer(producer_settings)) def on_event_deliver(err, evt): @@ -214,51 +251,8 @@ def on_event_deliver(err, evt): f"partition={evt.partition()}") -def send_to_event_bus( - signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, - sync: bool = False, -) -> None: - """ - Send a signal event to the event bus under the specified topic. - - If the Kafka settings are missing or invalid, return with a warning. - - Arguments: - signal: The original OpenEdxPublicSignal the event was sent to - topic: The event bus topic for the event - event_key_field: Path to the event data field to use as the event key (period-delimited - string naming the dictionary keys to descend) - event_data: The event data (kwargs) sent to the signal - sync: Whether to wait indefinitely for event to be received by the message bus (probably - only want to use this for testing) - """ - producer = get_producer_for_signal(signal, event_key_field) - if producer is None: # Note: SerializingProducer has False truthiness when len() == 0 - return - - event_key = extract_event_key(event_data, event_key_field) - producer.produce(topic, key=event_key, value=event_data, - on_delivery=on_event_deliver, - headers={EVENT_TYPE_HEADER_KEY: signal.event_type}) - - if sync: - # Wait for all buffered events to send, then wait for all of - # them to be acknowledged, and trigger all callbacks. - producer.flush(-1) - else: - # Opportunistically ensure any pending callbacks from recent events are triggered. - # - # This assumes events come regularly, or that we're not concerned about - # high latency between delivery and callback. If those assumptions are - # false, we should switch to calling poll(1.0) or similar in a loop on - # a separate thread. - # - # Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 - producer.poll(0) - - @receiver(setting_changed) def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument - """Reset caches during testing when settings change.""" - get_serializer.cache_clear() - get_producer_for_signal.cache_clear() + """Reset caches when settings change during unit tests.""" + get_serializers.cache_clear() + get_producer.cache_clear() diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 81521bc..3ebb1e6 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -16,7 +16,7 @@ # See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst try: - from confluent_kafka import SerializingProducer + from confluent_kafka.schema_registry.avro import AvroSerializer except ImportError: # pragma: no cover pass @@ -24,8 +24,10 @@ class TestEventProducer(TestCase): """Test producer.""" - def test_extract_event_key(self): - event_data = { + def setUp(self): + super().setUp() + self.signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED + self.event_data = { 'user': UserData( id=123, is_active=True, @@ -37,17 +39,17 @@ def test_extract_event_key(self): ) } - assert ep.extract_event_key(event_data, 'user.pii.username') == 'foobob' + def test_extract_event_key(self): + assert ep.extract_event_key(self.event_data, 'user.pii.username') == 'foobob' with pytest.raises(Exception, match="Could not extract key from event; lookup in xxx failed at 'xxx' in dictionary"): - ep.extract_event_key(event_data, 'xxx') + ep.extract_event_key(self.event_data, 'xxx') with pytest.raises(Exception, match="Could not extract key from event; lookup in user.xxx failed at 'xxx' in object"): - ep.extract_event_key(event_data, 'user.xxx') + ep.extract_event_key(self.event_data, 'user.xxx') def test_descend_avro_schema(self): - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - schema = AvroSignalSerializer(signal).schema + schema = AvroSignalSerializer(self.signal).schema assert ep.descend_avro_schema(schema, ['user', 'pii', 'username']) == {"name": "username", "type": "string"} @@ -57,22 +59,30 @@ def test_descend_avro_schema(self): assert isinstance(excinfo.value.__cause__, IndexError) def test_extract_key_schema(self): - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username') + schema = ep.extract_key_schema(AvroSignalSerializer(self.signal), 'user.pii.username') assert schema == '{"name": "username", "type": "string"}' - def test_get_producer_for_signal_unconfigured(self): + def test_serializers_configured(self): + with override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345'): + key_ser, value_ser = ep.get_serializers(self.signal, 'user.id') + # We can't actually call them because they want to talk to the schema server. + assert isinstance(key_ser, AvroSerializer) + assert isinstance(value_ser, AvroSerializer) + + def test_serializers_unconfigured(self): + with pytest.raises(Exception, match="missing library or settings"): + ep.get_serializers(self.signal, 'user.id') + + def test_get_producer_unconfigured(self): """With missing essential settings, just warn and return None.""" - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter('always') - assert ep.get_producer_for_signal(signal, 'user.id') is None + assert ep.get_producer() is None assert len(caught_warnings) == 1 assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ") - def test_get_producer_for_signal_configured(self): + def test_get_producer_configured(self): """Creation succeeds when all settings are present.""" - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with override_settings( EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key', @@ -82,7 +92,7 @@ def test_get_producer_for_signal_configured(self): EVENT_BUS_KAFKA_API_KEY='some_other_key', EVENT_BUS_KAFKA_API_SECRET='some_other_secret', ): - assert isinstance(ep.get_producer_for_signal(signal, 'user.id'), SerializingProducer) + assert isinstance(ep.get_producer(), ep.EventProducerKafka) @patch('edx_event_bus_kafka.publishing.event_producer.logger') def test_on_event_deliver(self, mock_logger): @@ -99,26 +109,31 @@ def test_on_event_deliver(self, mock_logger): 'Event delivered to topic some_topic; key=some_key; partition=some_partition' ) - def test_send_to_event_bus(self): - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - event_data = { - 'user': UserData( - id=123, - is_active=True, - pii=UserPersonalData( - username='foobob', - email='bob@foo.example', - name="Bob Foo", + # Mock out the serializers for this one so we don't have to deal + # with expected Avro bytes -- and they can't call their schema server. + @patch( + 'edx_event_bus_kafka.publishing.event_producer.get_serializers', autospec=True, + return_value=( + lambda _key, _ctx: b'key-bytes-here', + lambda _value, _ctx: b'value-bytes-here', + ) + ) + def test_send_to_event_bus(self, mock_get_serializers): + with override_settings( + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + ): + producer_api = ep.get_producer() + with patch.object(producer_api, 'producer', autospec=True) as mock_producer: + producer_api.send( + signal=self.signal, topic='user_stuff', + event_key_field='user.id', event_data=self.event_data ) - ) - } - mock_producer = MagicMock() - with patch('edx_event_bus_kafka.publishing.event_producer.get_producer_for_signal', return_value=mock_producer): - ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data) + mock_get_serializers.assert_called_once_with(self.signal, 'user.id') mock_producer.produce.assert_called_once_with( - 'user_stuff', key=123, value=event_data, + 'user_stuff', key=b'key-bytes-here', value=b'value-bytes-here', on_delivery=ep.on_event_deliver, headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}, ) diff --git a/edx_event_bus_kafka/tests/test_config.py b/edx_event_bus_kafka/tests/test_config.py index 80e684a..c81b804 100644 --- a/edx_event_bus_kafka/tests/test_config.py +++ b/edx_event_bus_kafka/tests/test_config.py @@ -17,11 +17,11 @@ class TestSchemaRegistryClient(TestCase): def test_unconfigured(self): - assert config.create_schema_registry_client() is None + assert config.get_schema_registry_client() is None def test_configured(self): with override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345'): - assert isinstance(config.create_schema_registry_client(), SchemaRegistryClient) + assert isinstance(config.get_schema_registry_client(), SchemaRegistryClient) class TestCommonSettings(TestCase):