Skip to content

Commit

Permalink
[GOBBLIN-2163] Add Iceberg-Distcp table metadata validation (for part…
Browse files Browse the repository at this point in the history
…ition copy) (#4064)
  • Loading branch information
Blazer-007 authored Oct 29, 2024
1 parent 4b639f6 commit b4e4d4a
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value";
public static final String ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + "partition.validate.strict.equality";
// true, requires equality of the partitions' specId as well as the partitions' fields' fieldId
public static final String DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true";

public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) {
super(sourceFs, properties);
Expand All @@ -46,7 +49,12 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties)
@Override
protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable,
Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IOException {
// TODO: Add Validator for source and destination tables later

boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));

IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);

String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE,
ICEBERG_PARTITION_NAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;

import lombok.extern.slf4j.Slf4j;

/**
* Validator for Iceberg table metadata, ensuring that the given tables metadata have same schema and partition spec.
*/
@Slf4j
public class IcebergTableMetadataValidatorUtils {

private IcebergTableMetadataValidatorUtils() {
// Do not instantiate
}

/**
* Compares the metadata of the given two iceberg tables.
* <ul>
* <li>First compares the schema of the metadata.</li>
* <li>Then compares the partition spec of the metadata.</li>
* </ul>
* @param tableMetadataA the metadata of the first table
* @param tableMetadataB the metadata of the second table
* @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison
* @throws IOException if the schemas or partition spec do not match
*/
public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA,
TableMetadata tableMetadataB, boolean validateStrictPartitionEquality) throws IOException {
log.info("Starting comparison between iceberg tables with metadata file location : {} and {}",
tableMetadataA.metadataFileLocation(),
tableMetadataB.metadataFileLocation());

Schema schemaA = tableMetadataA.schema();
Schema schemaB = tableMetadataB.schema();
// TODO: Need to add support for schema evolution
// This check needs to be broken down into multiple checks to support schema evolution
// Possible cases - schemaA == schemaB,
// - schemaA is subset of schemaB [ schemaB Evolved ],
// - schemaA is superset of schemaB [ schemaA Evolved ],
// - Other cases?
// Also consider using Strategy or any other design pattern for this to make it a better solution
if (!schemaA.sameSchema(schemaB)) {
String errMsg = String.format(
"Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}",
tableMetadataA.metadataFileLocation(),
schemaA.schemaId(),
tableMetadataB.metadataFileLocation(),
schemaB.schemaId()
);
log.error(errMsg);
throw new IOException(errMsg);
}

PartitionSpec partitionSpecA = tableMetadataA.spec();
PartitionSpec partitionSpecB = tableMetadataB.spec();
// .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does
boolean partitionSpecMatch = validateStrictPartitionEquality ? partitionSpecA.equals(partitionSpecB)
: partitionSpecA.compatibleWith(partitionSpecB);
if (!partitionSpecMatch) {
String errMsg = String.format(
"Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} and Metadata{%s} - PartitionSpecId{%d}",
tableMetadataA.metadataFileLocation(),
partitionSpecA.specId(),
tableMetadataB.metadataFileLocation(),
partitionSpecB.specId()
);
log.error(errMsg);
throw new IOException(errMsg);
}

log.info("Comparison completed successfully between iceberg tables with metadata file location : {} and {}",
tableMetadataA.metadataFileLocation(),
tableMetadataB.metadataFileLocation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.HashMap;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;

public class IcebergTableMetadataValidatorUtilsTest {
private static final PartitionSpec unpartitionedPartitionSpec = PartitionSpec.unpartitioned();
private static final Schema schema1 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1")
.fields()
.requiredString("field1")
.requiredString("field2")
.endRecord());
private static final Schema schema2IsNotSchema1Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
.fields()
.requiredString("field2")
.requiredString("field1")
.endRecord());
private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
.fields()
.requiredString("field1")
.requiredString("field2")
.requiredInt("field3")
.endRecord());
private static final Schema schema4IsNotSchema3Compat = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
.fields()
.requiredInt("field1")
.requiredString("field2")
.requiredInt("field3")
.endRecord());
private static final PartitionSpec partitionSpec1 = PartitionSpec.builderFor(schema1)
.identity("field1")
.build();
private static final TableMetadata tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata(
schema1, unpartitionedPartitionSpec, "tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 = TableMetadata.newTableMetadata(
schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", new HashMap<>());
private static final TableMetadata tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
schema3, unpartitionedPartitionSpec, "tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch between Metadata";
private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition Spec Mismatch between Metadata";
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = false;
@Test
public void testValidateSameSchema() throws IOException {
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
tableMetadataWithSchema1AndUnpartitionedSpec, tableMetadataWithSchema1AndUnpartitionedSpec,
VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
);
Assert.assertTrue(true);
}

@Test
public void testValidateDifferentSchemaFails() {
// Schema 1 and Schema 2 have different field order

TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
unpartitionedPartitionSpec, "tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());

verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
tableMetadataWithSchema2AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
}

@Test
public void testValidateSchemaWithDifferentTypesFails() {
// schema 3 and schema 4 have different field types for field1

TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
unpartitionedPartitionSpec, "tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());

verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
tableMetadataWithSchema4AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
}

@Test
public void testValidateSchemaWithEvolvedSchemaIFails() {
// Schema 3 has one more extra field as compared to Schema 1
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
tableMetadataWithSchema3AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
}

@Test
public void testValidateSchemaWithEvolvedSchemaIIFails() {
// TODO: This test should pass in the future when we support schema evolution
// Schema 3 has one more extra field as compared to Schema 1
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
tableMetadataWithSchema1AndUnpartitionedSpec, SCHEMA_MISMATCH_EXCEPTION);
}

@Test
public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
// Adding this test as to verify that partition copy doesn't proceed further for this case
// as while doing poc and testing had seen final commit gets fail if there is mismatch in field type
// specially from int to long
Schema schema5EvolvedFromSchema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
.fields()
.requiredLong("field1")
.requiredString("field2")
.requiredInt("field3")
.endRecord());
PartitionSpec partitionSpec = PartitionSpec.builderFor(schema5EvolvedFromSchema4)
.identity("field1")
.build();
TableMetadata tableMetadataWithSchema5AndPartitionSpec = TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
partitionSpec, "tableLocationForSchema5WithPartitionSpec", new HashMap<>());

verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
}

@Test
public void testValidateSamePartitionSpec() throws IOException {
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
tableMetadataWithSchema1AndPartitionSpec1, tableMetadataWithSchema1AndPartitionSpec1,
VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
);
Assert.assertTrue(true);
}

@Test
public void testValidatePartitionSpecWithDiffNameFails() {
PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
.identity("field2")
.build();
TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12,
"tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION);
}

@Test
public void testValidatePartitionSpecWithUnpartitionedFails() {
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
tableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION);
}

@Test
public void testPartitionSpecWithDifferentTransformFails() {
PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
.truncate("field1", 4)
.build();
TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = TableMetadata.newTableMetadata(schema1, partitionSpec12,
"tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
tableMetadataWithSchema1AndPartitionSpec12, PARTITION_SPEC_MISMATCH_EXCEPTION);
}

@Test
public void testStrictPartitionSpecEqualityOffVsOn() throws IOException {
PartitionSpec partitionSpecWithTwoCols = PartitionSpec.builderFor(schema1)
.identity("field1")
.identity("field2")
.build();

TableMetadata tableMetadataWithSchema1AndPartitionSpecWithTwoCols = TableMetadata.newTableMetadata(schema1,
partitionSpecWithTwoCols, "tableLocationForSchema1WithPartitionSpecWithTwoCols", new HashMap<>());
TableMetadata updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1 = tableMetadataWithSchema1AndPartitionSpec1
.updatePartitionSpec(tableMetadataWithSchema1AndPartitionSpecWithTwoCols.spec());

IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1,
VALIDATE_STRICT_PARTITION_EQUALITY_FALSE);
Assert.assertTrue(true); // passes w/ non-strict equality...
// but fails when strict equality
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, PARTITION_SPEC_MISMATCH_EXCEPTION);
}

private void verifyStrictFailUnlessCompatibleStructureThrows(TableMetadata tableAMetadata,
TableMetadata tableBMetadata, String expectedMessage) {
IOException exception = Assert.expectThrows(IOException.class, () -> {
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata, tableBMetadata,
VALIDATE_STRICT_PARTITION_EQUALITY_TRUE);
});
Assert.assertTrue(exception.getMessage().startsWith(expectedMessage));
}
}

0 comments on commit b4e4d4a

Please sign in to comment.