diff --git a/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/handlers/CompositeHandler.java b/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/handlers/CompositeHandler.java index 78c4f51ba6..a3af2e5b90 100644 --- a/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/handlers/CompositeHandler.java +++ b/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/handlers/CompositeHandler.java @@ -95,18 +95,27 @@ public final void handleRequest(InputStream inputStream, OutputStream outputStre throws IOException { try (BlockAllocatorImpl allocator = new BlockAllocatorImpl()) { - ObjectMapper objectMapper = VersionedObjectMapperFactory.create(allocator, SerDeVersion.SERDE_VERSION); - FederationRequest rawReq; + int resolvedSerDeVersion = SerDeVersion.SERDE_VERSION; byte[] allInputBytes = com.google.common.io.ByteStreams.toByteArray(inputStream); - - try { - rawReq = objectMapper.readValue(allInputBytes, FederationRequest.class); + FederationRequest rawReq = null; + ObjectMapper objectMapper = null; + while (resolvedSerDeVersion >= 1) { + try { + objectMapper = VersionedObjectMapperFactory.create(allocator, resolvedSerDeVersion); + rawReq = objectMapper.readValue(allInputBytes, FederationRequest.class); + break; + } + catch (IllegalStateException e) { // if client has not upgraded to our latest, fallback to lower version + logger.warn("Client's SerDe mis-matched with connector version:, attempt with lower version: '{}'", --resolvedSerDeVersion, e); + } } - catch (IllegalStateException e) { // if client has not upgraded to our latest, fallback to v4 - logger.warn("Client's SerDe is not upgraded to latest version, defaulting to V4:", e); - objectMapper = VersionedObjectMapperFactory.create(allocator, 4); - rawReq = objectMapper.readValue(allInputBytes, FederationRequest.class); + + if (rawReq == null || objectMapper == null) { + throw new RuntimeException(String.format("FederationRequest/ObjectMapper is null with SerDeVersion: '%d'", resolvedSerDeVersion)); } + + logger.info("Parsing request with resolvedSerDeVersion: '{}', connector SerDeVersion: '{}'", resolvedSerDeVersion, SerDeVersion.SERDE_VERSION); + if (rawReq instanceof MetadataRequest) { ((MetadataRequest) rawReq).setContext(context); }