-
Notifications
You must be signed in to change notification settings - Fork 1
/
avro_client.py
32 lines (26 loc) · 1.28 KB
/
avro_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from confluent_kafka.serialization import *
from avro_deserializer import Deserializer
from avro_schema_registry_client import RegistryClient
from error_handler import ErrorHandler
class AvroClient:
def __init__(self, environment):
try:
self.registry_client = RegistryClient(environment).registry_client
self.msg_field = MessageField()
self.deserializer = None
self.serial_context = None
except Exception as e:
raise ErrorHandler("Unable to initialize AvroClient(): " + str(e))
def load_deserializer(self, topic_name):
try:
self.deserializer = Deserializer(self.registry_client).create_avro_deserializer(topic_name)
self.serial_context = SerializationContext(topic_name, self.msg_field)
except Exception as e:
raise ErrorHandler(
"Unable to load deserializer for topic: " + topic_name + ". Please check avro schema provided in avro schemas directory: " + str(
e))
def convert_avro_msg(self, msg):
try:
return self.deserializer.__call__(msg.value(), self.serial_context)
except Exception as e:
raise ErrorHandler("Error deserializing avro message. Check registry settings: " + str(e))