Skip to content

Commit

Permalink
Refactoring chunking in the HTTP source to improve the performance. (#…
Browse files Browse the repository at this point in the history
…4950)

The HTTP source was parsing the entire message and then serializing from strings. This created a bit of memory churn and probably duplicate processing. The new approach is to chunk the message from the start which allows us to stream the reading and perform copies of data. This also has a JMH benchmark added which shows that this new approach doubles the number of operations per second.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Sep 20, 2024
1 parent 3550162 commit 915c7b1
Show file tree
Hide file tree
Showing 9 changed files with 453 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,25 @@ public interface Codec<T> {
T parse(HttpData httpData) throws IOException;

/**
* Serializes parsed data back into a UTF-8 string.
* Validates the content of the HTTP request.
*
* @param content The content of the original HTTP request
* @throws IOException A failure validating data.
*/
void validate(HttpData content) throws IOException;

/*
* Serializes the HttpData and split into multiple bodies based on splitLength.
* <p>
* The serialized bodies are passed to the serializedBodyConsumer.
* <p>
* This API will split into multiple bodies based on splitLength. Note that if a single
* item is larger than this, it will be output and exceed that length.
*
* @param parsedData The parsed data
* @param content The content of the original HTTP request
* @param serializedBodyConsumer A {@link Consumer} to accept each serialized body
* @param splitLength The length at which to split serialized bodies.
* @throws IOException A failure writing data.
*/
void serialize(final T parsedData,
final Consumer<String> serializedBodyConsumer,
final int splitLength) throws IOException;


/**
* Serializes parsed data back into a UTF-8 string.
* <p>
* This API will not split the data into chunks.
*
* @param parsedData The parsed data
* @param serializedBodyConsumer A {@link Consumer} to accept the serialized body
* @throws IOException A failure writing data.
*/
default void serialize(final T parsedData, final Consumer<String> serializedBodyConsumer) throws IOException {
serialize(parsedData, serializedBodyConsumer, Integer.MAX_VALUE);
}
void serializeSplit(HttpData content, Consumer<String> serializedBodyConsumer, int splitLength) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CountingOutputStream;
import com.linecorp.armeria.common.HttpData;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@ public class JsonCodec implements Codec<List<String>> {
private static final TypeReference<List<Map<String, Object>>> LIST_OF_MAP_TYPE_REFERENCE =
new TypeReference<List<Map<String, Object>>>() {
};
private static final JsonFactory JSON_FACTORY = new JsonFactory();


@Override
Expand All @@ -48,38 +50,56 @@ public List<String> parse(final HttpData httpData) throws IOException {
return jsonList;
}

public void serialize(final List<String> jsonList,
final Consumer<String> serializedBodyConsumer,
final int splitLength) throws IOException {
if (splitLength < 0)
throw new IllegalArgumentException("The splitLength must be greater than or equal to 0.");
@Override
public void validate(final HttpData content) throws IOException {
mapper.readValue(content.toInputStream(),
LIST_OF_MAP_TYPE_REFERENCE);
}

@Override
public void serializeSplit(final HttpData content, final Consumer<String> serializedBodyConsumer, final int splitLength) throws IOException {
final InputStream contentInputStream = content.toInputStream();
if (splitLength == 0) {
performSerialization(jsonList, serializedBodyConsumer, Integer.MAX_VALUE);
performSerialization(contentInputStream, serializedBodyConsumer, Integer.MAX_VALUE);
} else {
performSerialization(jsonList, serializedBodyConsumer, splitLength);
performSerialization(contentInputStream, serializedBodyConsumer, splitLength);
}
}

private void performSerialization(final List<String> jsonList,

private void performSerialization(final InputStream inputStream,
final Consumer<String> serializedBodyConsumer,
final int splitLength) throws IOException {

JsonArrayWriter jsonArrayWriter = new JsonArrayWriter(splitLength, serializedBodyConsumer);
try (final JsonParser jsonParser = JSON_FACTORY.createParser(inputStream)) {
if (jsonParser.nextToken() != JsonToken.START_ARRAY) {
throw new RuntimeException("Input is not a valid JSON array.");
}

JsonArrayWriter jsonArrayWriter = new JsonArrayWriter(splitLength, serializedBodyConsumer);

for (final String individualJsonLine : jsonList) {
if (jsonArrayWriter.willExceedByWriting(individualJsonLine)) {
jsonArrayWriter.close();
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final JsonGenerator objectJsonGenerator = JSON_FACTORY
.createGenerator(outputStream, JsonEncoding.UTF8);
objectJsonGenerator.copyCurrentStructure(jsonParser);
objectJsonGenerator.close();

jsonArrayWriter = new JsonArrayWriter(splitLength, serializedBodyConsumer);

if (jsonArrayWriter.willExceedByWriting(outputStream)) {
jsonArrayWriter.close();

jsonArrayWriter = new JsonArrayWriter(splitLength, serializedBodyConsumer);

}
jsonArrayWriter.write(outputStream);
}
jsonArrayWriter.write(individualJsonLine);
}

jsonArrayWriter.close();
jsonArrayWriter.close();
}
}


private static class JsonArrayWriter {
private static final JsonFactory JSON_FACTORY = new JsonFactory().setCodec(mapper);
private static final int BUFFER_SIZE = 16 * 1024;
Expand All @@ -100,15 +120,15 @@ private static class JsonArrayWriter {
generator.writeStartArray();
}

boolean willExceedByWriting(final String individualJsonLine) {
final int lengthToWrite = individualJsonLine.getBytes(StandardCharsets.UTF_8).length;
boolean willExceedByWriting(final ByteArrayOutputStream byteArrayOutputStream) {
final int lengthToWrite = byteArrayOutputStream.size();
final long lengthOfDataWritten = countingOutputStream.getCount();
return lengthToWrite + lengthOfDataWritten + NECESSARY_CHARACTERS_TO_WRITE.length() > splitLength;
}

void write(final String individualJsonLine) throws IOException {
final JsonNode jsonNode = mapper.readTree(individualJsonLine);
generator.writeTree(jsonNode);
void write(final ByteArrayOutputStream individualJsonLine) throws IOException {
final String jsonLineString = individualJsonLine.toString(Charset.defaultCharset());
generator.writeRawValue(jsonLineString);
generator.flush();
hasItem = true;
}
Expand All @@ -126,5 +146,4 @@ void close() throws IOException {
outputStream.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,6 @@ class JsonCodecTest {
private static final HttpData GOOD_TEST_DATA = HttpData.ofUtf8("[{\"a\":\"b\"}, {\"c\":\"d\"}]");
private static final HttpData GOOD_LARGE_TEST_DATA = HttpData.ofUtf8("[{\"a1\":\"b1\"}, {\"a2\":\"b2\"}, {\"a3\":\"b3\"}, {\"a4\":\"b4\"}, {\"a5\":\"b5\"}]");
private static final HttpData GOOD_LARGE_TEST_DATA_UNICODE = HttpData.ofUtf8("[{\"ὊὊὊ1\":\"ὊὊὊ1\"}, {\"ὊὊὊ2\":\"ὊὊὊ2\"}, {\"a3\":\"b3\"}, {\"ὊὊὊ4\":\"ὊὊὊ4\"}]");
public static final List<String> JSON_BODIES_LIST = List.of(
"{\"a1\":\"b1\"}",
"{\"a2\":\"b2\"}",
"{\"a3\":\"b3\"}",
"{\"a4\":\"b4\"}",
"{\"a5\":\"b5\"}"
);
public static final List<String> JSON_BODIES_UNICODE_MIXED_LIST = List.of(
"{\"ὊὊὊ1\":\"ὊὊὊ1\"}",
"{\"ὊὊὊ2\":\"ὊὊὊ2\"}",
"{\"a3\":\"b3\"}",
"{\"ὊὊὊ4\":\"ὊὊὊ4\"}"
);
private final HttpData badTestDataJsonLine = HttpData.ofUtf8("{\"a\":\"b\"}");
private final HttpData badTestDataMultiJsonLines = HttpData.ofUtf8("{\"a\":\"b\"}{\"c\":\"d\"}");
private final HttpData badTestDataNonJson = HttpData.ofUtf8("non json content");
Expand Down Expand Up @@ -84,16 +71,16 @@ public void testParseSuccessWithMaxSize() throws IOException {

@ParameterizedTest
@ValueSource(ints = {-1, -2, Integer.MIN_VALUE})
void serialize_with_invalid_splitLength(final int splitLength) {
void serializeSplit_with_invalid_splitLength(final int splitLength) {
final Consumer<String> serializedBodyConsumer = mock(Consumer.class);
assertThrows(IllegalArgumentException.class, () -> objectUnderTest.serialize(JSON_BODIES_LIST, serializedBodyConsumer, splitLength));
assertThrows(IllegalArgumentException.class, () -> objectUnderTest.serializeSplit(GOOD_LARGE_TEST_DATA, serializedBodyConsumer, splitLength));
}

@ParameterizedTest
@ValueSource(ints = {1, 2, 24})
void serialize_with_split_length_leading_to_groups_of_one(final int splitLength) throws IOException {
void serializeSplit_with_split_length_leading_to_groups_of_one(final int splitLength) throws IOException {
final Consumer<String> serializedBodyConsumer = mock(Consumer.class);
objectUnderTest.serialize(JSON_BODIES_LIST, serializedBodyConsumer, splitLength);
objectUnderTest.serializeSplit(GOOD_LARGE_TEST_DATA, serializedBodyConsumer, splitLength);

final ArgumentCaptor<String> actualSerializedBodyCaptor = ArgumentCaptor.forClass(String.class);
verify(serializedBodyConsumer, times(5)).accept(actualSerializedBodyCaptor.capture());
Expand All @@ -109,9 +96,9 @@ void serialize_with_split_length_leading_to_groups_of_one(final int splitLength)

@ParameterizedTest
@ValueSource(ints = {25, 30, 36})
void serialize_with_split_length_leading_to_groups_of_two(final int splitLength) throws IOException {
void serializeSplit_with_split_length_leading_to_groups_of_two(final int splitLength) throws IOException {
final Consumer<String> serializedBodyConsumer = mock(Consumer.class);
objectUnderTest.serialize(JSON_BODIES_LIST, serializedBodyConsumer, splitLength);
objectUnderTest.serializeSplit(GOOD_LARGE_TEST_DATA, serializedBodyConsumer, splitLength);

final ArgumentCaptor<String> actualSerializedBodyCaptor = ArgumentCaptor.forClass(String.class);
verify(serializedBodyConsumer, times(3)).accept(actualSerializedBodyCaptor.capture());
Expand All @@ -129,9 +116,9 @@ void serialize_with_split_length_leading_to_groups_of_two(final int splitLength)

@ParameterizedTest
@ValueSource(ints = {37, 48})
void serialize_with_split_length_leading_to_groups_up_to_three(final int splitLength) throws IOException {
void serializeSplit_with_split_length_leading_to_groups_up_to_three(final int splitLength) throws IOException {
final Consumer<String> serializedBodyConsumer = mock(Consumer.class);
objectUnderTest.serialize(JSON_BODIES_LIST, serializedBodyConsumer, splitLength);
objectUnderTest.serializeSplit(GOOD_LARGE_TEST_DATA, serializedBodyConsumer, splitLength);

final ArgumentCaptor<String> actualSerializedBodyCaptor = ArgumentCaptor.forClass(String.class);
verify(serializedBodyConsumer, times(2)).accept(actualSerializedBodyCaptor.capture());
Expand All @@ -147,9 +134,9 @@ void serialize_with_split_length_leading_to_groups_up_to_three(final int splitLe

@ParameterizedTest
@ValueSource(ints = {0, Integer.MAX_VALUE})
void serialize_with_split_size_that_does_not_split(final int splitLength) throws IOException {
void serializeSplit_with_split_size_that_does_not_split(final int splitLength) throws IOException {
final Consumer<String> serializedBodyConsumer = mock(Consumer.class);
objectUnderTest.serialize(JSON_BODIES_LIST, serializedBodyConsumer, splitLength);
objectUnderTest.serializeSplit(GOOD_LARGE_TEST_DATA, serializedBodyConsumer, splitLength);

final ArgumentCaptor<String> actualSerializedBodyCaptor = ArgumentCaptor.forClass(String.class);
verify(serializedBodyConsumer, times(1)).accept(actualSerializedBodyCaptor.capture());
Expand All @@ -160,9 +147,9 @@ void serialize_with_split_size_that_does_not_split(final int splitLength) throws

@ParameterizedTest
@ValueSource(ints = {58, 68})
void serialize_with_split_length_unicode(final int splitLength) throws IOException {
void serializeSplit_with_split_length_unicode(final int splitLength) throws IOException {
final Consumer<String> serializedBodyConsumer = mock(Consumer.class);
objectUnderTest.serialize(JSON_BODIES_UNICODE_MIXED_LIST, serializedBodyConsumer, splitLength);
objectUnderTest.serializeSplit(GOOD_LARGE_TEST_DATA_UNICODE, serializedBodyConsumer, splitLength);

final ArgumentCaptor<String> actualSerializedBodyCaptor = ArgumentCaptor.forClass(String.class);
verify(serializedBodyConsumer, times(2)).accept(actualSerializedBodyCaptor.capture());
Expand All @@ -178,28 +165,28 @@ void serialize_with_split_length_unicode(final int splitLength) throws IOExcepti

@ParameterizedTest
@ArgumentsSource(GoodTestData.class)
void parse_and_serialize_symmetry(final HttpData httpData) throws IOException {
final List<String> parsedList = objectUnderTest.parse(httpData);

void serializeSplit_and_parse_symmetry(final HttpData httpData) throws IOException {
final List<String> parsedFromOriginal = objectUnderTest.parse(httpData);
final Consumer<String> serializedBodyConsumer = mock(Consumer.class);
objectUnderTest.serialize(parsedList, serializedBodyConsumer);
objectUnderTest.serializeSplit(httpData, serializedBodyConsumer, Integer.MAX_VALUE);
final ArgumentCaptor<String> actualSerializedBodyCaptor = ArgumentCaptor.forClass(String.class);
verify(serializedBodyConsumer, times(1)).accept(actualSerializedBodyCaptor.capture());
final String actualString = actualSerializedBodyCaptor.getValue();

final String expectedJsonString = httpData.toStringUtf8().replace(" ", "");
assertThat(actualString, equalTo(expectedJsonString));

final List<String> parsedFromRewritten = objectUnderTest.parse(HttpData.ofUtf8(actualString));
assertThat(parsedFromRewritten, equalTo(parsedFromOriginal));
}


@ParameterizedTest
@ArgumentsSource(JsonArrayWithKnownFirstArgumentsProvider.class)
public void parse_should_return_lists_smaller_than_provided_length(
final String inputJsonArray, final String knownFirstPart, final int maxSize, final List<List<String>> expectedChunks, final List<Boolean> exceedsMaxSize) throws IOException {
List<String> individualJsonLines = objectUnderTest.parse(HttpData.ofUtf8(inputJsonArray));

Consumer<String> serializedBodyConsumer = mock(Consumer.class);
objectUnderTest.serialize(individualJsonLines, serializedBodyConsumer, maxSize);
objectUnderTest.serializeSplit(HttpData.ofUtf8(inputJsonArray), serializedBodyConsumer, maxSize);
ArgumentCaptor<String> actualSerializedBodyCaptor = ArgumentCaptor.forClass(String.class);
verify(serializedBodyConsumer, times(expectedChunks.size())).accept(actualSerializedBodyCaptor.capture());

Expand Down Expand Up @@ -301,4 +288,26 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext extensionCo
);
}
}


@ParameterizedTest
@ArgumentsSource(GoodTestData.class)
void validate_with_known_good_Json(final HttpData httpData) throws IOException {
objectUnderTest.validate(httpData);
}

@Test
void validate_with_valid_JSON_but_not_array_should_throw() {
assertThrows(IOException.class, () -> objectUnderTest.validate(badTestDataJsonLine));
}

@Test
void validate_with_multiline_JSON_should_throw() {
assertThrows(IOException.class, () -> objectUnderTest.validate(badTestDataMultiJsonLines));
}

@Test
void validate_with_invalid_JSON_should_throw() {
assertThrows(IOException.class, () -> objectUnderTest.validate(badTestDataNonJson));
}
}
1 change: 1 addition & 0 deletions data-prepper-plugins/http-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

plugins {
id 'java'
id 'me.champeau.jmh' version '0.7.2'
}

dependencies {
Expand Down
Loading

0 comments on commit 915c7b1

Please sign in to comment.