Skip to content

Commit

Permalink
fix: send schema to fully-qualified topic when using topic prefixes (#50
Browse files Browse the repository at this point in the history
)

* fix: send schemas to fully-prefixed topic
  • Loading branch information
Rebecca Graber authored Oct 3, 2022
1 parent d28062a commit 8041046
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 7 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ Unreleased

*

[1.0.0] - 2022-10-03
********************

Changed
=======

* Fixed bug in schema registry that was sending schemas to the wrong topic
* Bump version to 1.x to acknowledge that this is in use in production

[0.7.0] - 2022-09-08
********************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, get_producer

__version__ = '0.7.0'
__version__ = '1.0.0'
10 changes: 5 additions & 5 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,19 @@ def send(
Arguments:
signal: The original OpenEdxPublicSignal the event was sent to
topic: The event bus topic for the event
topic: The base (un-prefixed) event bus topic for the event
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
event_data: The event data (kwargs) sent to the signal
"""
full_topic = get_full_topic(topic)

event_key = extract_event_key(event_data, event_key_field)
headers = {EVENT_TYPE_HEADER_KEY: signal.event_type}

key_serializer, value_serializer = get_serializers(signal, event_key_field)
key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers))

full_topic = get_full_topic(topic)
key_bytes = key_serializer(event_key, SerializationContext(full_topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(full_topic, MessageField.VALUE, headers))

self.producer.produce(
full_topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver,
Expand Down
27 changes: 26 additions & 1 deletion edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
import warnings
from unittest import TestCase
from unittest.mock import Mock, patch
from unittest.mock import Mock, call, patch

import openedx_events.learning.signals
import pytest
Expand Down Expand Up @@ -192,3 +192,28 @@ def increment_call_count(*args):
time.sleep(1.0)
assert call_count >= 3 # some small value; would actually be about 20
print(producer_api) # Use the value here to ensure it isn't GC'd early

@override_settings(EVENT_BUS_TOPIC_PREFIX='stage')
@override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345')
@override_settings(EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321')
@patch('edx_event_bus_kafka.internal.producer.SerializationContext')
def test_serialize_and_produce_to_same_topic(self, mock_context):
producer_api = ep.get_producer()
with patch('edx_event_bus_kafka.internal.producer.AvroSerializer',
return_value=lambda _x, _y: b'bytes-here'):
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
producer_api.send(
signal=self.signal, topic='user-stuff',
event_key_field='user.id', event_data=self.event_data
)

mock_context.assert_has_calls([
call('stage-user-stuff', 'key', {'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}),
call('stage-user-stuff', 'value', {'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}),
])
assert mock_context.call_count == 2
mock_producer.produce.assert_called_once_with(
'stage-user-stuff', key=b'bytes-here', value=b'bytes-here',
on_delivery=ep.on_event_deliver,
headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'},
)

0 comments on commit 8041046

Please sign in to comment.