Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-24379][Formats] Avro Glue Schema Registry table format #122

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ tools/flink
tools/flink-*
tools/releasing/release
tools/japicmp-output
*/.idea/
*/.idea/
.java-version
226 changes: 226 additions & 0 deletions docs/content/docs/connectors/table/formats/avro-glue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
---
title: Avro Glue Schema Registry
weight: 5
type: docs
aliases:
- /dev/table/connectors/formats/avro-glue.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# AWS Glue Avro Format

{{< label "Format: Serialization Schema" >}}
{{< label "Format: Deserialization Schema" >}}

The AWS Glue Schema Registry (``avro-glue``) format allows you to read records that were serialized by ``com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer`` and to write records that can in turn be read by ``com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer``.
These records have their schemas stored out-of-band in a configured registry provided by the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas).

When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured AWS Glue Schema Registry, based on the schema version id encoded in the record, while the reader schema is inferred from table schema.

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema id to be encoded with the data.
The lookup is performed against the configured AWS Glue Schema Registry under the [value](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas) given in `avro-glue.schema-name`.
Optionally, you can enable schema auto-registration, allowing the writer to register a new schema version in the schema registry, directly. The new schema will be accepted only if it does not violate the compatibility mode that was set when the schema was created in the first place.

The AWS Glue Schema format can only be used in conjunction with the [Apache Kafka SQL connector]({{< ref "docs/connectors/table/kafka" >}}) or the [Upsert Kafka SQL Connector]({{< ref "docs/connectors/table/upsert-kafka" >}}).

Dependencies
------------

{{< sql_download_table "avro-glue" >}}

How to create tables with Avro-Glue format
--------------


Example of a table using raw UTF-8 string as Kafka key and Avro records registered in the Schema Registry as Kafka values:

```sql
CREATE TABLE user_created (

-- one column mapped to the Kafka raw UTF-8 key
the_kafka_key STRING,

-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING

) WITH (

'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',

-- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',

'value.format' = 'avro-glue',
'value.avro-glue.region' = 'us-east-1',
'value.avro-glue.registry.name' = 'my-schema-registry',
'value.avro-glue.schema.name' = 'my-schema-name',
'avro-glue.schema.autoRegistration' = 'true',
'value.fields-include' = 'EXCEPT_KEY'
)
```

You can write data into the Kafka table as follows:

```
INSERT INTO user_created
SELECT
-- replicating the user id into a column mapped to the kafka key
id as the_kafka_key,

-- all values
id, name, email
FROM some_table
```

Format Options
----------------

Note that naming of the properties slightly diverges from the [AWS Glue client code](https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L20) properties, to match with the conventions used by other Flink formats.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be <code>'avro-glue'</code>.</td>
</tr>
<tr>
<td><h5>avro-glue.aws.region</h5></td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what AWS region the Glue Schema Registry is, such as <code>'us-east-1'</code>.</td>
</tr>
<tr>
<td><h5>avro-glue.aws.endpoint</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The HTTP endpoint to use for AWS calls.</td>
</tr>
<tr>
<td><h5>avro-glue.registry.name</h5></td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The name (not the ARN) of the Glue schema registry in which to store the schemas.</td>
</tr>
<tr>
<td><h5>avro-glue.schema.name</h5></td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The name under which to store the schema in the registry.</td>
</tr>
<tr>
<td><h5>avro-glue.schema.autoRegistration</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether new schemas should be automatically registered rather than treated as errors. Only used when writing (serializing). Ignored when reading (deserializing).(</td>
</tr>
<tr>
<td><h5>avro-glue.schema.compression</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">NONE</td>
<td>String</td>
<td>What kind of compression to use. Valid values are <code>'NONE'</code> and <code>'ZLIB'</code>.</td>
</tr>
<tr>
<td><h5>avro-glue.schema.compatibility</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">BACKWARD</td>
<td>String</td>
<td>The schema compatibility mode under which to store the schema. Valid values are:
<code>'NONE'</code>,
<code>'DISABLED'</code>,
<code>'BACKWARD'</code>,
<code>'BACKWARD_ALL'</code>,
<code>'FORWARD'</code>,
<code>'FORWARD_ALL'</code>,
<code>'FULL'</code>, and
<code>'FULL_ALL'</code>.
Only used when schema auto-registration is enabled and when the schema is registered in the first place.
Ignored when reading or when a new schema version is auto-registered in an existing schema.
</td>
</tr>
<tr>
<td><h5>avro-glue.cache.size</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">200</td>
<td>Integer</td>
<td>The size (in number of items, not bytes) of the cache the Glue client code should manage</td>
</tr>
<tr>
<td><h5>avro-glue.cache.ttlMs</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">1 day (24 * 60 * 60 * 1000)</td>
<td>Integer</td>
<td>The time to live (in milliseconds) for cache entries. Defaults to 1 day.</td>
</tr>
</tbody>
</table>

Note that the schema type (Generic or Specific Record) cannot be specified while using Table API.

Schema Auto-registration
------------------------

By default, the schema auto-registration is disabled. When writing to a Kafka table new records are accepted only if a schema version that matches the table schema exactly is already registered in the Schema Registry at `registry.name` and `schema.name`. Otherwise, an exception is thrown.

You can enable schema auto-registration setting the property `avro-glue.schema.autoRegistration` = `true`.

When auto-registration is enabled, Flink will first check whether a schema matching the table schema is already registered in the Schema Registry. If the schema is already registered, the writer will reuse the schemaId.
If the table schema does not match any schema version already registered at the specified `registry.name` and `schema.name`, the writer will try to auto-register a new schema version.

When auto-registering a new schema version, there are two different cases:

1. No schema is registered at the specified `registry.name` and `schema.name`: a new schema, matching the table schema, will be registered. The compatibility mode is set to the value of the `schema.compatibility` property.
2. Another, different schema version is already registered at the specified `registry.name` and `schema.name`: in this case the new schema version will be accepted only it does not violate the schema evolution rules defined by the Compatibility Mode that has been set when the Schema has been created in the first place.

When auto-registering a new schema, the schema compatibility mode is set based on the `avro-glue.schema.compatibility` property.

Note that `avro-glue.schema.compatibility` is used only when a new schema is auto-registered in the first place. When a new schema version is auto-registered in an existing schema, the compatibility mode of the schema is never changed and the `avro-glue.schema.compatibility` is ignored.

Data Type Mapping
----------------

Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet.
See the [Apache Avro Format]({{< ref "docs/connectors/table/formats/avro" >}}#data-type-mapping) for the mapping between Avro and Flink DataTypes.

In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro `union(something, null)`, where `something` is the Avro type converted from Flink type.

You can refer to [Avro Specification](https://avro.apache.org/docs/current/spec.html) for more information about Avro types.
44 changes: 42 additions & 2 deletions flink-formats-aws/flink-avro-glue-schema-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
Expand All @@ -50,7 +62,7 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
<!-- This has a transitive dependency on mbknor-jackson-jsonschema which needs scala -->
<!-- This has a transitive dependency on mbknor-jackson-jsonschema which needs scala 2.12 -->
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>${glue.schema.registry.version}</version>
Expand All @@ -61,8 +73,36 @@ under the License.
<version>${glue.schema.registry.version}</version>
</dependency>

<!-- ArchUit test dependencies -->
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- ArchUit test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-architecture-tests-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.glue.schema.registry;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import software.amazon.awssdk.services.glue.model.Compatibility;

import java.time.Duration;

/** Options for AWS Glue Schema Registry Avro format. */
@PublicEvolving
public class AvroGlueFormatOptions {
public static final ConfigOption<String> AWS_REGION =
ConfigOptions.key("aws.region")
.stringType()
.noDefaultValue()
.withDescription("AWS region");

public static final ConfigOption<String> AWS_ENDPOINT =
ConfigOptions.key("aws.endpoint").stringType().noDefaultValue();

public static final ConfigOption<Integer> CACHE_SIZE =
ConfigOptions.key("cache.size")
.intType()
.defaultValue(200)
.withDescription("Cache maximum size in *items*. Defaults to 200");

public static final ConfigOption<Long> CACHE_TTL_MS =
ConfigOptions.key("cache.ttlMs")
.longType()
.defaultValue(Duration.ofDays(1L).toMillis())
.withDescription("Cache TTL in milliseconds. Defaults to 1 day");

public static final ConfigOption<String> REGISTRY_NAME =
ConfigOptions.key("registry.name")
.stringType()
.noDefaultValue()
.withDescription("Registry name");

public static final ConfigOption<Boolean> SCHEMA_AUTO_REGISTRATION =
ConfigOptions.key("schema.autoRegistration")
.booleanType()
.defaultValue(false)
.withDescription("Whether auto-registration is enabled. Defaults to false.");

public static final ConfigOption<Compatibility> SCHEMA_COMPATIBILITY =
ConfigOptions.key("schema.compatibility")
.enumType(Compatibility.class)
.defaultValue(AWSSchemaRegistryConstants.DEFAULT_COMPATIBILITY_SETTING);

public static final ConfigOption<AWSSchemaRegistryConstants.COMPRESSION> SCHEMA_COMPRESSION =
ConfigOptions.key("schema.compression")
.enumType(AWSSchemaRegistryConstants.COMPRESSION.class)
.defaultValue(AWSSchemaRegistryConstants.COMPRESSION.NONE)
.withDescription("Compression type");

public static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema.name")
.stringType()
.noDefaultValue()
.withDescription(
"The Schema name under which to register the schema used by this format during serialization.");

public static final ConfigOption<AvroRecordType> SCHEMA_TYPE =
ConfigOptions.key("schema.type")
.enumType(AvroRecordType.class)
.defaultValue(AvroRecordType.GENERIC_RECORD)
.withDescription("Record type");

private AvroGlueFormatOptions() {}
}
Loading