Skip to content

Commit

Permalink
Refactor KeyOrValueSpecs into impl classes (#4241)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith authored Jul 28, 2023
1 parent 0131a17 commit 76fa655
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 393 deletions.
178 changes: 178 additions & 0 deletions extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.kafka;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.engine.table.Table;
import io.deephaven.kafka.KafkaTools.Consume;
import io.deephaven.kafka.KafkaTools.DataFormat;
import io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.commons.lang3.mutable.MutableObject;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;

import static io.deephaven.kafka.KafkaTools.createSchemaRegistryClient;

class AvroImpl {

/**
* Avro spec.
*/
static final class AvroConsume extends Consume.KeyOrValueSpec {
final Schema schema;
final String schemaName;
final String schemaVersion;
/** fields mapped to null are skipped. */
final Function<String, String> fieldPathToColumnName;

AvroConsume(final Schema schema, final Function<String, String> fieldPathToColumnName) {
this.schema = schema;
this.schemaName = null;
this.schemaVersion = null;
this.fieldPathToColumnName = fieldPathToColumnName;
}

AvroConsume(final String schemaName,
final String schemaVersion,
final Function<String, String> fieldPathToColumnName) {
this.schema = null;
this.schemaName = schemaName;
this.schemaVersion = schemaVersion;
this.fieldPathToColumnName = fieldPathToColumnName;
}

@Override
DataFormat dataFormat() {
return DataFormat.AVRO;
}
}

/**
* Avro spec.
*/
static final class AvroProduce extends KeyOrValueSpec {
Schema schema;
final String schemaName;
final String schemaVersion;
final Map<String, String> fieldToColumnMapping;
final String timestampFieldName;
final Predicate<String> includeOnlyColumns;
final Predicate<String> excludeColumns;
final boolean publishSchema;
final String schemaNamespace;
final MutableObject<Properties> columnProperties;

AvroProduce(final Schema schema,
final String schemaName,
final String schemaVersion,
final Map<String, String> fieldToColumnMapping,
final String timestampFieldName,
final Predicate<String> includeOnlyColumns,
final Predicate<String> excludeColumns,
final boolean publishSchema,
final String schemaNamespace,
final Properties columnProperties) {
this.schema = schema;
this.schemaName = schemaName;
this.schemaVersion = schemaVersion;
this.fieldToColumnMapping = fieldToColumnMapping;
this.timestampFieldName = timestampFieldName;
this.includeOnlyColumns = includeOnlyColumns;
this.excludeColumns = excludeColumns;
this.publishSchema = publishSchema;
this.schemaNamespace = schemaNamespace;
this.columnProperties = new MutableObject<>(columnProperties);
if (publishSchema) {
if (schemaVersion != null && !KafkaTools.AVRO_LATEST_VERSION.equals(schemaVersion)) {
throw new IllegalArgumentException(
String.format("schemaVersion must be null or \"%s\" when publishSchema=true",
KafkaTools.AVRO_LATEST_VERSION));
}
}
}

@Override
DataFormat dataFormat() {
return DataFormat.AVRO;
}

void ensureSchema(final Table t, final Properties kafkaProperties) {
if (schema != null) {
return;
}
if (publishSchema) {
schema = KafkaTools.columnDefinitionsToAvroSchema(t,
schemaName, schemaNamespace, columnProperties.getValue(), includeOnlyColumns,
excludeColumns, columnProperties);
try {
putAvroSchema(kafkaProperties, schemaName, schema);
} catch (RestClientException | IOException e) {
throw new UncheckedDeephavenException(e);
}
} else {
schema = KafkaTools.getAvroSchema(kafkaProperties, schemaName, schemaVersion);
}
}

String[] getColumnNames(final Table t, final Properties kafkaProperties) {
ensureSchema(t, kafkaProperties);
final List<Field> fields = schema.getFields();
// ensure we got timestampFieldName right
if (timestampFieldName != null) {
boolean found = false;
for (final Field field : fields) {
final String fieldName = field.name();
if (fieldName.equals(timestampFieldName)) {
found = true;
break;
}
}
if (!found) {
throw new IllegalArgumentException(
"timestampFieldName=" + timestampFieldName +
" is not a field name in the provided schema.");
}
}
final int timestampFieldCount = ((timestampFieldName != null) ? 1 : 0);
final List<String> columnNames = new ArrayList<>();
for (final Field field : fields) {
final String fieldName = field.name();
if (timestampFieldName != null && fieldName.equals(timestampFieldName)) {
continue;
}
final String candidateColumnName;
if (fieldToColumnMapping == null) {
candidateColumnName = fieldName;
} else {
candidateColumnName = fieldToColumnMapping.getOrDefault(fieldName, fieldName);
}
if (excludeColumns != null && excludeColumns.test(candidateColumnName)) {
continue;
}
if (includeOnlyColumns != null && !includeOnlyColumns.test(candidateColumnName)) {
continue;
}
columnNames.add(candidateColumnName);
}
return columnNames.toArray(new String[columnNames.size()]);
}
}

private static int putAvroSchema(Properties kafkaProperties, String schemaName, Schema schema)
throws RestClientException, IOException {
final SchemaRegistryClient registryClient = createSchemaRegistryClient(kafkaProperties);
return registryClient.register(schemaName, new AvroSchema(schema));
}
}
24 changes: 24 additions & 0 deletions extensions/kafka/src/main/java/io/deephaven/kafka/IgnoreImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.kafka;

import io.deephaven.kafka.KafkaTools.Consume;
import io.deephaven.kafka.KafkaTools.DataFormat;
import io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec;

class IgnoreImpl {
static final class IgnoreConsume extends Consume.KeyOrValueSpec {
@Override
DataFormat dataFormat() {
return DataFormat.IGNORE;
}
}

static final class IgnoreProduce extends KeyOrValueSpec {
@Override
DataFormat dataFormat() {
return DataFormat.IGNORE;
}
}
}
149 changes: 149 additions & 0 deletions extensions/kafka/src/main/java/io/deephaven/kafka/JsonImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.kafka.KafkaTools.Consume;
import io.deephaven.kafka.KafkaTools.DataFormat;
import io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

class JsonImpl {
/**
* JSON spec.
*/
static final class JsonConsume extends Consume.KeyOrValueSpec {
@Nullable
final ObjectMapper objectMapper;
final ColumnDefinition<?>[] columnDefinitions;
final Map<String, String> fieldToColumnName;

JsonConsume(
@NotNull final ColumnDefinition<?>[] columnDefinitions,
@Nullable final Map<String, String> fieldNameToColumnName,
@Nullable final ObjectMapper objectMapper) {
this.columnDefinitions = columnDefinitions;
this.fieldToColumnName = mapNonPointers(fieldNameToColumnName);
this.objectMapper = objectMapper;
}

@Override
DataFormat dataFormat() {
return DataFormat.JSON;
}

private static Map<String, String> mapNonPointers(final Map<String, String> fieldNameToColumnName) {
if (fieldNameToColumnName == null) {
return null;
}
final boolean needsMapping =
fieldNameToColumnName.keySet().stream().anyMatch(key -> !key.startsWith("/"));
if (!needsMapping) {
return fieldNameToColumnName;
}
final Map<String, String> ans = new HashMap<>(fieldNameToColumnName.size());
for (Map.Entry<String, String> entry : fieldNameToColumnName.entrySet()) {
final String key = entry.getKey();
if (key.startsWith("/")) {
ans.put(key, entry.getValue());
} else {
ans.put(mapFieldNameToJsonPointerStr(key), entry.getValue());
}
}
return ans;
}

/***
* JSON field names (or "key") can be any string, so in principle they can contain the '/' character. JSON
* Pointers assign special meaning to the '/' character, so actual '/' in the key they need to be encoded
* differently. The spec for JSON Pointers (see RFC 6901) tells us to encode '/' using "~1". If we need the '~'
* character we have to encode that as "~0". This method does this simple JSON Pointer encoding.
*
* @param key an arbitrary JSON field name, that can potentially contain the '/' or '~' characters.
* @return a JSON Pointer encoded as a string for the provided key.
*/
public static String mapFieldNameToJsonPointerStr(final String key) {
return "/" + key.replace("~", "~0").replace("/", "~1");
}
}

/**
* JSON spec.
*/
static final class JsonProduce extends KeyOrValueSpec {
final String[] includeColumns;
final Predicate<String> excludeColumns;
final Map<String, String> columnNameToFieldName;
final String nestedObjectDelimiter;
final boolean outputNulls;
final String timestampFieldName;

JsonProduce(final String[] includeColumns,
final Predicate<String> excludeColumns,
final Map<String, String> columnNameToFieldName,
final String nestedObjectDelimiter,
final boolean outputNulls,
final String timestampFieldName) {
this.includeColumns = includeColumns;
this.excludeColumns = excludeColumns;
this.columnNameToFieldName = columnNameToFieldName;
this.nestedObjectDelimiter = nestedObjectDelimiter;
this.outputNulls = outputNulls;
this.timestampFieldName = timestampFieldName;
}

@Override
DataFormat dataFormat() {
return DataFormat.JSON;
}

String[] getColumnNames(final Table t) {
if (excludeColumns != null && includeColumns != null) {
throw new IllegalArgumentException(
"Can't have both excludeColumns and includeColumns not null");
}
final String[] tableColumnNames = t.getDefinition().getColumnNamesArray();
if (excludeColumns == null && includeColumns == null) {
return tableColumnNames;
}
final Set<String> tableColumnsSet = new HashSet<>(Arrays.asList(tableColumnNames));
if (includeColumns != null) {
// Validate includes
final List<String> missing = Arrays.stream(includeColumns)
.filter(cn -> !tableColumnsSet.contains(cn)).collect(Collectors.toList());
if (missing.size() > 0) {
throw new IllegalArgumentException(
"includeColumns contains names not found in table columns: " + missing);
}
return includeColumns;
}
return Arrays.stream(tableColumnNames)
.filter(cn -> !excludeColumns.test(cn)).toArray(String[]::new);
}

String[] getFieldNames(final String[] columnNames) {
final String[] fieldNames = new String[columnNames.length];
for (int i = 0; i < columnNames.length; ++i) {
if (columnNameToFieldName == null) {
fieldNames[i] = columnNames[i];
} else {
fieldNames[i] = columnNameToFieldName.getOrDefault(columnNames[i], columnNames[i]);
}
}
return fieldNames;
}
}
}
Loading

0 comments on commit 76fa655

Please sign in to comment.