Skip to content

Commit

Permalink
improve serde logic for decoding FederationRequest
Browse files Browse the repository at this point in the history
change log level
  • Loading branch information
chngpe committed May 22, 2024
1 parent 68528ef commit d3f701a
Showing 1 changed file with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,27 @@ public final void handleRequest(InputStream inputStream, OutputStream outputStre
throws IOException
{
try (BlockAllocatorImpl allocator = new BlockAllocatorImpl()) {
ObjectMapper objectMapper = VersionedObjectMapperFactory.create(allocator, SerDeVersion.SERDE_VERSION);
FederationRequest rawReq;
int resolvedSerDeVersion = SerDeVersion.SERDE_VERSION;
byte[] allInputBytes = com.google.common.io.ByteStreams.toByteArray(inputStream);

try {
rawReq = objectMapper.readValue(allInputBytes, FederationRequest.class);
FederationRequest rawReq = null;
ObjectMapper objectMapper = null;
while (resolvedSerDeVersion >= 1) {
try {
objectMapper = VersionedObjectMapperFactory.create(allocator, resolvedSerDeVersion);
rawReq = objectMapper.readValue(allInputBytes, FederationRequest.class);
break;
}
catch (IllegalStateException e) { // if client has not upgraded to our latest, fallback to lower version
logger.warn("Client's SerDe mis-matched with connector version:, attempt with lower version: '{}'", --resolvedSerDeVersion, e);
}
}
catch (IllegalStateException e) { // if client has not upgraded to our latest, fallback to v4
logger.warn("Client's SerDe is not upgraded to latest version, defaulting to V4:", e);
objectMapper = VersionedObjectMapperFactory.create(allocator, 4);
rawReq = objectMapper.readValue(allInputBytes, FederationRequest.class);

if (rawReq == null || objectMapper == null) {
throw new RuntimeException(String.format("FederationRequest/ObjectMapper is null with SerDeVersion: '%d'", resolvedSerDeVersion));
}

logger.info("Parsing request with resolvedSerDeVersion: '{}', connector SerDeVersion: '{}'", resolvedSerDeVersion, SerDeVersion.SERDE_VERSION);

if (rawReq instanceof MetadataRequest) {
((MetadataRequest) rawReq).setContext(context);
}
Expand Down

0 comments on commit d3f701a

Please sign in to comment.