Skip to content

Commit

Permalink
Fix init (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean authored Mar 28, 2024
1 parent c309c90 commit 09c77f5
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 38 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Pull Request Build

on:

pull_request:
types: [ "opened", "reopened", "synchronize", ]

jobs:
release:
runs-on: ubuntu-latest
outputs:
version: ${{steps.build.outputs.version}}
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'zulu'
cache: 'maven'

- name: Build with Maven
id: build
run: |
mvn -B -V -ntp clean package
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Glue Schema Registry serde for kafka-ui

This is pluggable serde implementation for [kafka-ui](https://github.com/provectus/kafka-ui/).
This is pluggable serde implementation for [kafka-ui](https://github.com/kafbat/kafka-ui/).

You can read about Glue Schema [registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) and how it can be applied for [Kafka usage](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html).

Expand Down
4 changes: 2 additions & 2 deletions docker-compose/setup-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8080:8080
depends_on:
Expand All @@ -20,7 +20,7 @@ services:

kafka.clusters.0.serde.0.name: GlueSchemaRegistry
kafka.clusters.0.serde.0.filePath: /glue-serde/kafkaui-glue-serde-1.0-SNAPSHOT-jar-with-dependencies.jar
kafka.clusters.0.serde.0.className: com.provectus.kafka.ui.serdes.glue.GlueSerde
kafka.clusters.0.serde.0.className: io.kafbat.ui.serde.glue.GlueSerde
kafka.clusters.0.serde.0.properties.region: us-east-1 #required
kafka.clusters.0.serde.0.properties.registry: kui-test #required, name of Glue Schema Registry
# template that will be used to find schema name for topic key. Optional, default is null (not set).
Expand Down
16 changes: 10 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafkaui-glue-serde</artifactId>

<groupId>io.kafbat.ui.serde</groupId>
<artifactId>serde-glue</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Archetype - kafkaui-glue-serde</name>

<name>kafbat-ui-serde-glue</name>
<url>http://maven.apache.org</url>
<packaging>jar</packaging>

Expand Down Expand Up @@ -56,10 +59,11 @@
<version>7.0.1</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.provectus</groupId>
<artifactId>kafka-ui-serde-api</artifactId>
<version>1.1.0</version>
<groupId>io.kafbat.ui</groupId>
<artifactId>serde-api</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.provectus.kafka.ui.serdes.glue;
package io.kafbat.ui.serde.glue;

import com.amazonaws.services.schemaregistry.common.AWSDeserializerInput;
import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatDeserializer;
Expand All @@ -15,34 +15,34 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.DynamicMessage;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;

import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.PropertyResolver;
import io.kafbat.ui.serde.api.RecordHeaders;
import io.kafbat.ui.serde.api.SchemaDescription;
import io.kafbat.ui.serde.api.Serde;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import lombok.NonNull;
import javax.validation.constraints.NotNull;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import software.amazon.awssdk.auth.credentials.*;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serde.api.Serde;

import java.util.Optional;

import software.amazon.awssdk.profiles.ProfileFile;
import software.amazon.awssdk.profiles.ProfileFileSystemSetting;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -155,10 +155,9 @@ static AwsCredentialsProvider createCredentialsProvider(PropertyResolver serdePr
Optional<String> awsSecretKey = serdeProperties.getProperty("awsSecretAccessKey", String.class);
Optional<String> awsSessionToken = serdeProperties.getProperty("awsSessionToken", String.class);
if (awsAccessKey.isPresent() && awsSecretKey.isPresent()) {
if (awsSessionToken.isEmpty()) {
return () -> AwsBasicCredentials.create(awsAccessKey.get(), awsSecretKey.get());
}
return () -> AwsSessionCredentials.create(awsAccessKey.get(), awsSecretKey.get(), awsSessionToken.get());
return awsSessionToken.<AwsCredentialsProvider>map(
s -> () -> AwsSessionCredentials.create(awsAccessKey.get(), awsSecretKey.get(), s))
.orElseGet(() -> () -> AwsBasicCredentials.create(awsAccessKey.get(), awsSecretKey.get()));
}

Optional<String> profileName = serdeProperties.getProperty("awsProfileName", String.class);
Expand Down Expand Up @@ -293,7 +292,8 @@ private Optional<GetSchemaVersionResponse> getSchemaDefinition(String schemaName
glueClient.getSchemaVersion(
GetSchemaVersionRequest.builder()
.schemaId(SchemaId.builder().registryName(registryName).schemaName(schemaName).build())
.schemaVersionNumber(SchemaVersionNumber.builder().versionNumber(schemaResponse.latestSchemaVersion()).build())
.schemaVersionNumber(
SchemaVersionNumber.builder().versionNumber(schemaResponse.latestSchemaVersion()).build())
.build())
);
} catch (EntityNotFoundException nfe) {
Expand All @@ -307,7 +307,8 @@ public Deserializer deserializer(String topic, Target target) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders recordHeaders, byte[] bytes) {
Object obj = deserializationFacade.deserialize(AWSDeserializerInput.builder().buffer(ByteBuffer.wrap(bytes)).build());
Object obj =
deserializationFacade.deserialize(AWSDeserializerInput.builder().buffer(ByteBuffer.wrap(bytes)).build());
String val = null;
if (obj instanceof GenericRecord) {
val = JsonUtil.avroRecordToJson((GenericRecord) obj);
Expand All @@ -334,8 +335,8 @@ private static class FixedDeserializerFactory extends GlueSchemaRegistryDeserial
private static final FixedJsonDeserializer JSON_DESERIALIZER = new FixedJsonDeserializer();

@Override
public GlueSchemaRegistryDataFormatDeserializer getInstance(@NonNull DataFormat dataFormat,
@NonNull GlueSchemaRegistryConfiguration configs) {
public GlueSchemaRegistryDataFormatDeserializer getInstance(@NotNull DataFormat dataFormat,
@NotNull GlueSchemaRegistryConfiguration configs) {
if (dataFormat == DataFormat.JSON) {
return JSON_DESERIALIZER;
} else {
Expand All @@ -352,8 +353,8 @@ private static class FixedJsonDeserializer implements GlueSchemaRegistryDataForm
GlueSchemaRegistryDeserializerDataParser.getInstance();

@Override
public Object deserialize(@NonNull ByteBuffer buffer,
@NonNull com.amazonaws.services.schemaregistry.common.Schema schemaObject) {
public Object deserialize(@NotNull ByteBuffer buffer,
@NotNull com.amazonaws.services.schemaregistry.common.Schema schemaObject) {
String schema = schemaObject.getSchemaDefinition();
byte[] data = DESERIALIZER_DATA_PARSER.getPlainData(buffer);
return JsonDataWithSchema.builder(schema, new String(data)).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.provectus.kafka.ui.serdes.glue;
package io.kafbat.ui.serde.glue;

import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.provectus.kafka.ui.serdes.glue;
package io.kafbat.ui.serde.glue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -20,10 +20,10 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.Serde;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.PropertyResolver;
import io.kafbat.ui.serde.api.Serde;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -307,7 +307,7 @@ void testAvroFormatSerdeCompatibility() throws Exception {
@Test
void testProtoFormatSerdeCompatibility() throws Exception {
var schema = "syntax = \"proto3\";\n"
+ "package com.provectus;\n"
+ "package io.kafbat;\n"
+ "\n"
+ "message TestProtoRecord {\n"
+ " string field1 = 1;\n"
Expand Down

0 comments on commit 09c77f5

Please sign in to comment.