Skip to content

Commit

Permalink
feat: use topic record name strategy (#172)
Browse files Browse the repository at this point in the history
Update serializers to allow multiple event types per topic. See https://docs.openedx.org/projects/event-bus-kafka/en/latest/decisions/0011_multiple_event_types_per_topic.html for the full explanation of why.
  • Loading branch information
Rebecca Graber authored May 17, 2023
1 parent e466e21 commit 1c0b149
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[5.1.0] - 2023-05-17
********************
Changed
=======
* Reconfigured serializers to use topic_record_name_strategy, allowing multiple event types per topic

[5.0.0] - 2023-05-17
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.0.0'
__version__ = '5.1.0'
4 changes: 4 additions & 0 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
try:
import confluent_kafka
from confluent_kafka import Producer
from confluent_kafka.schema_registry import topic_record_subject_name_strategy
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import MessageField, SerializationContext
except ImportError: # pragma: no cover
Expand Down Expand Up @@ -165,11 +166,14 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
schema_str=extract_key_schema(signal_serializer, event_key_field),
schema_registry_client=client,
to_dict=inner_to_dict,
conf={'subject.name.strategy': topic_record_subject_name_strategy}
)

value_serializer = AvroSerializer(
schema_str=signal_serializer.schema_string(),
schema_registry_client=client,
to_dict=inner_to_dict,
conf={'subject.name.strategy': topic_record_subject_name_strategy}
)

return key_serializer, value_serializer
Expand Down
5 changes: 5 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
from confluent_kafka.schema_registry import topic_record_subject_name_strategy
from confluent_kafka.schema_registry.avro import AvroSerializer
except ImportError: # pragma: no cover
pass
Expand Down Expand Up @@ -79,6 +80,10 @@ def test_serializers_configured(self):
# We can't actually call them because they want to talk to the schema server.
assert isinstance(key_ser, AvroSerializer)
assert isinstance(value_ser, AvroSerializer)
assert key_ser._subject_name_func ==\
topic_record_subject_name_strategy # pylint: disable=protected-access,comparison-with-callable
assert value_ser._subject_name_func ==\
topic_record_subject_name_strategy # pylint: disable=protected-access,comparison-with-callable

def test_serializers_unconfigured(self):
with pytest.raises(Exception, match="missing library or settings"):
Expand Down

0 comments on commit 1c0b149

Please sign in to comment.