Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MGDCTRS-2214: Use Kamelet data types #644

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.TreeMap;
import java.util.function.Consumer;

import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.text.CaseUtils;
Expand Down Expand Up @@ -306,72 +307,34 @@ public ConnectorContainer build() {
}

if (consumes != null) {
switch (consumes) {
case "application/json": {
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-decoder-json-action");
if (meta.has("consumes_class")) {
step.with("to").with("parameters").set("contentClass", meta.get("consumes_class"));
}
}
break;
case "avro/binary": {
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-decoder-avro-action");
if (meta.has("consumes_class")) {
step.with("to").with("parameters").set("contentClass", meta.get("consumes_class"));
}
}
break;
case "application/x-java-object": {
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-decoder-pojo-action");
if (meta.has("consumes_class")) {
step.with("to").with("parameters").set("mimeType", meta.get("consumes_class"));
}
}
break;
case "text/plain":
case "application/octet-stream":
break;
default:
throw new IllegalArgumentException("Unsupported value format " + consumes);
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-resolve-schema-action");
step.with("to").with("parameters").set("mimeType", meta.requiredAt("/consumes"));
if (meta.has("consumes_class")) {
step.with("to").with("parameters").set("contentClass", meta.get("consumes_class"));
}
if (produces != null) {
step.with("to").with("parameters").set("targetMimeType", meta.requiredAt("/consumes"));
}

step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-data-type-action");
step.with("to").with("parameters").set("format", new TextNode(consumes.replaceAll("/", "-")));
}

configureProcessors(mapper, meta, steps, properties);

if (produces != null) {
switch (produces) {
case "application/json": {
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-encoder-json-action");
if (meta.has("consumes_class")) {
step.with("to").with("parameters").set("contentClass", meta.get("consumes_class"));
}
}
break;
case "avro/binary": {
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-decoder-json-action");
if (meta.has("consumes_class")) {
step.with("to").with("parameters").set("contentClass", meta.get("consumes_class"));
}
}
break;
case "text/plain": {
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-encoder-string-action");
}
break;
case "application/octet-stream": {
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-encoder-bytearray-action");
}
break;
default:
throw new IllegalArgumentException("Unsupported value format " + produces);
ObjectNode step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-resolve-schema-action");
step.with("to").with("parameters").set("mimeType", meta.requiredAt("/produces"));
if (meta.has("produces_class")) {
step.with("to").with("parameters").set("contentClass", meta.get("produces_class"));
}

step = steps.addObject();
step.with("to").put("uri", "kamelet:cos-data-type-action");
step.with("to").with("parameters").set("format", new TextNode(produces.replaceAll("/", "-")));
}

steps.addObject().with("removeHeader").put("name", "X-Content-Schema");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
public enum MimeType {
JSON("application/json"),
AVRO("avro/binary"),
AVRO_STRUCT("avro/x-struct"),
BINARY("application/octet-stream"),
TEXT("text/plain"),
JAVA_OBJECT("application/x-java-object"),
STRUCT("application/x-struct");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bf2.cos.connector.camel.serdes.avro;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.bf2.cos.connector.camel.serdes.MimeType;
import org.bf2.cos.connector.camel.serdes.Serdes;
import org.bf2.cos.connector.camel.serdes.format.spi.DataTypeConverter;
import org.bf2.cos.connector.camel.serdes.format.spi.annotations.DataType;
import org.bf2.cos.connector.camel.serdes.json.Json;

/**
* Data type uses Jackson Avro data format to marshal given JsonNode in Exchange body to a binary (byte array) representation.
* Uses given Avro schema from the Exchange properties when marshalling the payload (usually already resolved via schema
* resolver Kamelet action).
*/
@DataType(name = "avro-binary", mediaType = "avro/binary")
public class AvroBinaryDataType implements DataTypeConverter {

@Override
public void convert(Exchange exchange) {
AvroSchema schema = exchange.getProperty(Serdes.CONTENT_SCHEMA, AvroSchema.class);

if (schema == null) {
throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange);
}

try {
byte[] marshalled = Avro.MAPPER.writer().forType(JsonNode.class).with(schema)
.writeValueAsBytes(exchange.getMessage().getBody());
exchange.getMessage().setBody(marshalled);

exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.AVRO.type());
exchange.getMessage().setHeader(Serdes.CONTENT_SCHEMA,
exchange.getProperty(Serdes.CONTENT_SCHEMA, "", String.class));
} catch (JsonProcessingException e) {
throw new CamelExecutionException("Failed to apply Json output data type on exchange", exchange, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bf2.cos.connector.camel.serdes.avro;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.bf2.cos.connector.camel.serdes.MimeType;
import org.bf2.cos.connector.camel.serdes.Serdes;
import org.bf2.cos.connector.camel.serdes.format.spi.DataTypeConverter;
import org.bf2.cos.connector.camel.serdes.format.spi.annotations.DataType;
import org.bf2.cos.connector.camel.serdes.json.Json;

/**
* Data type uses Avro Jackson data format to unmarshal Exchange body to generic JsonNode.
* Uses given Avro schema from the Exchange properties when unmarshalling the payload (usually already resolved via schema
* resolver Kamelet action).
*/
@DataType(name = "avro-x-struct", mediaType = "application/x-struct")
public class AvroStructDataType implements DataTypeConverter {

@Override
public void convert(Exchange exchange) {
AvroSchema schema = exchange.getProperty(Serdes.CONTENT_SCHEMA, AvroSchema.class);

if (schema == null) {
throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange);
}

try {
Object unmarshalled = Avro.MAPPER.reader().forType(JsonNode.class).with(schema)
.readValue(getBodyAsStream(exchange));
exchange.getMessage().setBody(unmarshalled);

exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT);
} catch (InvalidPayloadException | IOException e) {
throw new CamelExecutionException("Failed to apply Json input data type on exchange", exchange, e);
}
}

private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);

if (bodyStream == null) {
bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
}

return bodyStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bf2.cos.connector.camel.serdes.bytes;

import org.apache.camel.Exchange;
import org.bf2.cos.connector.camel.serdes.MimeType;
import org.bf2.cos.connector.camel.serdes.format.DefaultDataTypeConverter;
import org.bf2.cos.connector.camel.serdes.format.spi.DataTypeConverter;
import org.bf2.cos.connector.camel.serdes.format.spi.annotations.DataType;

/**
* Generic binary data type uses Camel message body converter mechanism to convert content to byte array representation.
*/
@DataType(name = "application-octet-stream", mediaType = "application/octet-stream")
public class ByteArrayDataType implements DataTypeConverter {

private static final DataTypeConverter DELEGATE = new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary",
MimeType.BINARY.type(), byte[].class);

@Override
public void convert(Exchange exchange) {
DELEGATE.convert(exchange);

exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.BINARY);
}
}
Loading