Skip to content

Commit

Permalink
GH-1808 | Proposition to fix conversion logical type decimal from byt…
Browse files Browse the repository at this point in the history
…es to json as string
  • Loading branch information
MateuszDobrowolski committed Oct 31, 2024
1 parent da6d83a commit 5006bbc
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package pl.allegro.tech.hermes.common.message.converter;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import org.apache.avro.Conversion;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -11,6 +9,11 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.allegro.schema.json2avro.converter.AvroConversionException;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.util.List;

public class AvroBinaryDecoders {

private static ThreadLocal<InputStream> threadLocalEmptyInputStream =
Expand All @@ -20,11 +23,13 @@ public class AvroBinaryDecoders {
ThreadLocal.withInitial(
() -> DecoderFactory.get().binaryDecoder(threadLocalEmptyInputStream.get(), null));

static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema) {
static GenericRecord decodeReusingThreadLocalBinaryDecoder(byte[] message, Schema schema, List<Conversion<?>> logicalTypeConversions) {
try (FlushableBinaryDecoderHolder holder = new FlushableBinaryDecoderHolder()) {
BinaryDecoder binaryDecoder =
DecoderFactory.get().binaryDecoder(message, holder.getBinaryDecoder());
return new GenericDatumReader<GenericRecord>(schema).read(null, binaryDecoder);
GenericDatumReader<GenericRecord> genericDatumWriter = new GenericDatumReader<>(schema);
logicalTypeConversions.forEach(conversion -> genericDatumWriter.getData().addLogicalTypeConversion(conversion));
return genericDatumWriter.read(null, binaryDecoder);
} catch (Exception e) {
String reason =
e.getMessage() == null ? ExceptionUtils.getRootCauseMessage(e) : e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package pl.allegro.tech.hermes.common.message.converter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.avro.Conversion;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;

public interface AvroRecordToBytesConverter {

static GenericRecord bytesToRecord(byte[] data, Schema schema) {
return AvroBinaryDecoders.decodeReusingThreadLocalBinaryDecoder(data, schema);
return bytesToRecord(data, schema, Collections.emptyList());
}

static GenericRecord bytesToRecord(byte[] data, Schema schema, List<Conversion<?>> logicalTypeConversions) {
return AvroBinaryDecoders.decodeReusingThreadLocalBinaryDecoder(data, schema, logicalTypeConversions);
}

static byte[] recordToBytes(GenericRecord genericRecord, Schema schema) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import pl.allegro.tech.hermes.consumers.consumer.Message;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

import java.util.List;

public class AvroToJsonMessageConverter implements MessageConverter {

private final JsonAvroConverter converter;
Expand All @@ -35,7 +37,7 @@ public Message convert(Message message, Topic topic) {
}

private GenericRecord recordWithoutMetadata(byte[] data, Schema schema) {
GenericRecord original = bytesToRecord(data, schema);
GenericRecord original = bytesToRecord(data, schema, List.of(new DecimalToStringConversion()));
Schema schemaWithoutMetadata = removeMetadataField(schema);
GenericRecordBuilder builder = new GenericRecordBuilder(schemaWithoutMetadata);
schemaWithoutMetadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pl.allegro.tech.hermes.consumers.consumer.converter;

import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;

import java.math.BigDecimal;
import java.nio.ByteBuffer;

class DecimalToStringConversion extends Conversion<String> {
private final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();

@Override
public Class<String> getConvertedType() {
return String.class;
}

@Override
public String fromBytes(ByteBuffer value, Schema schema, LogicalType type) {
return decimalConversion.fromBytes(value, schema, type).toString();
}

@Override
public ByteBuffer toBytes(String value, Schema schema, LogicalType type) {
return decimalConversion.toBytes(new BigDecimal(value), schema, type);
}

@Override
public String getLogicalTypeName() {
return "decimal";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package pl.allegro.tech.hermes.consumers.consumer.converter;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.math.BigDecimal;
import java.nio.ByteBuffer;

@RunWith(MockitoJUnitRunner.class)
public class DecimalToStringConversionTest {

private Schema schema;
private LogicalType logicalType;

@Before
public void setup() {
schema = Schema.create(Schema.Type.BYTES);
schema.addProp("logicalType", "decimal");
schema.addProp("precision", 10);
schema.addProp("scale", 2);
logicalType = LogicalTypes.fromSchema(schema);
}

@Test
public void toFromBytes() {
// given
final String value = "19.91";
final DecimalToStringConversion conversion = new DecimalToStringConversion();

//when
final ByteBuffer byteBuffer = conversion.toBytes(value, schema, logicalType);
final String result = conversion.fromBytes(byteBuffer, schema, logicalType);

//then
Assert.assertEquals(result, value);
}

@Test
public void fromToBytes() {
// given
final ByteBuffer value = ByteBuffer.wrap(new BigDecimal("19.91").unscaledValue().toByteArray());
final DecimalToStringConversion conversion = new DecimalToStringConversion();

//when
final String decimal = conversion.fromBytes(value, schema, logicalType);
final ByteBuffer result = conversion.toBytes(decimal, schema, logicalType);

//then
Assert.assertEquals(result, value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.javacrumbs.jsonunit.core.Option;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -55,6 +60,63 @@ public class PublishingAvroTest {

private static final AvroUser user = new AvroUser("Bob", 50, "blue");

private static final ObjectMapper objMapper = new ObjectMapper();

@Test
public void shouldConsumeJsonMessageWithDecimalFromAvroTopic() throws JsonProcessingException {
// given
String schema = """
{
"namespace": "pl.allegro",
"type": "record",
"name": "User",
"fields": [
{
"name": "__metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null
},
{"name": "name", "type": "string"},
{"name": "balance","type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
}
]
}""";


TopicWithSchema topicWithSchema = topicWithSchema(topicWithRandomName()
.withContentType(AVRO)
.build(), schema);
Topic topic = hermes.initHelper().createTopicWithSchema(topicWithSchema);

TestSubscriber subscriber = subscribers.createSubscriber();

hermes.initHelper().createSubscription(
subscription(topic.getQualifiedName(), "subscription", subscriber.getEndpoint()).build()
);

Map<String, Object> map = Map.of(
"name", "Bob", "balance", "1.20"
);

String userWithBalance = objMapper.writeValueAsString(map);

// when
hermes.api().publishJSONUntilSuccess(topic.getQualifiedName(), userWithBalance);

// then
subscriber.waitUntilAnyMessageReceived();

Map<String, Object> actual = objMapper.readValue(
subscriber.getLastReceivedRequest().getBodyAsString(),
new TypeReference<HashMap<String,Object>>() {});
assertThat(actual.get("balance")).isEqualTo("1.20");
}

@Test
public void shouldPublishAvroAndConsumeJsonMessage() {
// given
Expand Down

0 comments on commit 5006bbc

Please sign in to comment.