From e7dfbc2ef135e28828e65ecdd8e20eb749e70482 Mon Sep 17 00:00:00 2001 From: de-lan Date: Mon, 21 Aug 2023 18:40:58 -0700 Subject: [PATCH] fix(CDAP-20786): should check Spanner schema equality after compatible conversion --- .../spanner/SpannerStructuredTableAdmin.java | 28 ++++++++++-- .../SpannerStructuredTableAdminTest.java | 44 ++++++++++++++----- .../spi/data/table/StructuredTableSchema.java | 15 ++++--- .../spi/data/StructuredTableAdminTest.java | 18 ++++++++ 4 files changed, 87 insertions(+), 18 deletions(-) diff --git a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdmin.java b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdmin.java index 851b9d68299f..414a90dfcfd8 100644 --- a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdmin.java +++ b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdmin.java @@ -26,6 +26,7 @@ import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -69,6 +70,12 @@ static String getIndexName(StructuredTableId tableId, String column) { return String.format("%s_%s_idx", tableId.getName(), column); } + /** + * Constructor for {@code SpannerStructuredTableAdmin}. + * + * @param spanner the gcp Spanner service. + * @param databaseId the ID of the Spanner instance database. + */ public SpannerStructuredTableAdmin(Spanner spanner, DatabaseId databaseId) { this.databaseId = databaseId; this.adminClient = spanner.getDatabaseAdminClient(); @@ -127,7 +134,7 @@ private void updateTable(StructuredTableSpecification spec) throws IOException, TableNotFoundException, TableSchemaIncompatibleException { StructuredTableId tableId = spec.getTableId(); StructuredTableSchema cachedTableSchema = getSchema(tableId); - StructuredTableSchema newTableSchema = new StructuredTableSchema(spec); + StructuredTableSchema newTableSchema = convertSpecToCompatibleSchema(spec); if (newTableSchema.equals(cachedTableSchema)) { LOG.trace("The table schema is already up to date: {}", tableId); @@ -165,6 +172,21 @@ private void updateTable(StructuredTableSpecification spec) } } + @VisibleForTesting + static StructuredTableSchema convertSpecToCompatibleSchema(StructuredTableSpecification spec) { + List convertedFieldTypes = + spec.getFieldTypes().stream() + .map( + fieldType -> { + String spannerType = getSpannerType(fieldType.getType()); + FieldType.Type convertedType = fromSpannerType(spannerType); + return new FieldType(fieldType.getName(), convertedType); + }) + .collect(Collectors.toList()); + return new StructuredTableSchema( + spec.getTableId(), convertedFieldTypes, spec.getPrimaryKeys(), spec.getIndexes()); + } + @Override public void drop(StructuredTableId tableId) throws IOException { List ddlStatements = new ArrayList<>(); @@ -286,7 +308,7 @@ private String getCreateIndexStatement(String idxColumn, StructuredTableSchema s return createIndex; } - private String getSpannerType(FieldType.Type fieldType) { + private static String getSpannerType(FieldType.Type fieldType) { switch (fieldType) { case INTEGER: case LONG: @@ -306,7 +328,7 @@ private String getSpannerType(FieldType.Type fieldType) { } } - private FieldType.Type fromSpannerType(String spannerType) { + private static FieldType.Type fromSpannerType(String spannerType) { switch (spannerType.toLowerCase()) { case "int64": return FieldType.Type.LONG; diff --git a/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdminTest.java b/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdminTest.java index 188deddc57ca..f436b7698301 100644 --- a/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdminTest.java +++ b/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableAdminTest.java @@ -16,6 +16,8 @@ package io.cdap.cdap.storage.spanner; +import static io.cdap.cdap.storage.spanner.SpannerStructuredTableAdmin.convertSpecToCompatibleSchema; + import io.cdap.cdap.api.metrics.MetricsCollector; import io.cdap.cdap.spi.data.StorageProviderContext; import io.cdap.cdap.spi.data.StructuredTableAdmin; @@ -59,13 +61,12 @@ public static void createSpannerStorageProvider() throws Exception { Assume.assumeNotNull(project, instance, database); Map configs = new HashMap<>(); - configs.put(SpannerStorageProvider.PROJECT, project); - configs.put(SpannerStorageProvider.INSTANCE, instance); - configs.put(SpannerStorageProvider.DATABASE, database); - if (credentialsPath != null) { configs.put(SpannerStorageProvider.CREDENTIALS_PATH, credentialsPath); } + configs.put(SpannerStorageProvider.PROJECT, project); + configs.put(SpannerStorageProvider.INSTANCE, instance); + configs.put(SpannerStorageProvider.DATABASE, database); StorageProviderContext context = new MockStorageProviderContext(configs); @@ -103,17 +104,38 @@ public void testAdmin() throws Exception { admin.createOrUpdate(SIMPLE_TABLE_SPEC); Assert.assertTrue(admin.exists(SIMPLE_TABLE)); - // Assert SIMPLE_TABLE schema - // ONLY checking compatibility because of INT/LONG to INT64 conversion in Spanner + // Assert SIMPLE_TABLE schema: checking equality after compatible conversion because of INT/LONG + // to INT64 conversion in Spanner StructuredTableSchema simpleTableSchema = admin.getSchema(SIMPLE_TABLE); - Assert.assertTrue(simpleTableSchema.isCompatible(SIMPLE_TABLE_SPEC)); + Assert.assertEquals(simpleTableSchema, convertSpecToCompatibleSchema(SIMPLE_TABLE_SPEC)); // Update SIMPLE_TABLE spec to UPDATED_SIMPLE_TABLE_SPEC admin.createOrUpdate(UPDATED_SIMPLE_TABLE_SPEC); // Assert UPDATED_SIMPLE_TABLE_SPEC schema StructuredTableSchema updateSimpleTableSchema = admin.getSchema(SIMPLE_TABLE); - Assert.assertTrue(updateSimpleTableSchema.isCompatible(UPDATED_SIMPLE_TABLE_SPEC)); + Assert.assertEquals( + updateSimpleTableSchema, convertSpecToCompatibleSchema(UPDATED_SIMPLE_TABLE_SPEC)); + } + + @Test + @Override + public void testCreateOrUpdateTwiceShouldSucceed() throws Exception { + StructuredTableAdmin admin = getStructuredTableAdmin(); + + // Assert SIMPLE_TABLE Empty + Assert.assertFalse(admin.exists(SIMPLE_TABLE)); + + // Calling to createOrUpdate the same SIMPLE_TABLE spec twice to mimic the scenario of + // connecting to an exsting DB and make sure the second time passes the equality check after + // schema compatibility conversion + admin.createOrUpdate(SIMPLE_TABLE_SPEC); + admin.createOrUpdate(SIMPLE_TABLE_SPEC); + Assert.assertTrue(admin.exists(SIMPLE_TABLE)); + + // Assert SIMPLE_TABLE schema + StructuredTableSchema simpleTableSchema = admin.getSchema(SIMPLE_TABLE); + Assert.assertEquals(simpleTableSchema, convertSpecToCompatibleSchema(SIMPLE_TABLE_SPEC)); } @Test @@ -130,8 +152,10 @@ public void testInconsistentKeyOrderInSchema() throws Exception { // Assert INCONSISTENT_PRIMARY_KEY_TABLE schema StructuredTableSchema tableSchema = admin.getSchema(INCONSISTENT_PRIMARY_KEY_TABLE); - // ONLY checking compatibility because of INT/LONG to INT64 conversion in Spanner - Assert.assertTrue(tableSchema.isCompatible(INCONSISTENT_PRIMARY_KEY_TABLE_SPEC)); + // Checking equality after compatible conversion because of INT/LONG to INT64 conversion in + // Spanner + Assert.assertEquals( + tableSchema, convertSpecToCompatibleSchema(INCONSISTENT_PRIMARY_KEY_TABLE_SPEC)); } private static final class MockStorageProviderContext implements StorageProviderContext { diff --git a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/table/StructuredTableSchema.java b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/table/StructuredTableSchema.java index 5539123bef0e..8fecb03b1e07 100644 --- a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/table/StructuredTableSchema.java +++ b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/table/StructuredTableSchema.java @@ -46,11 +46,16 @@ public StructuredTableSchema(StructuredTableSpecification spec) { this(spec.getTableId(), spec.getFieldTypes(), spec.getPrimaryKeys(), spec.getIndexes()); } - public StructuredTableSchema(StructuredTableId tableId, List fields, - List primaryKeys, Collection indexes) { + /** Constructor of {@code StructuredTableSchema} with table schema details. */ + public StructuredTableSchema( + StructuredTableId tableId, + List fields, + List primaryKeys, + Collection indexes) { this.tableId = tableId; - this.fields = Collections.unmodifiableMap(fields.stream().collect( - Collectors.toMap(FieldType::getName, FieldType::getType))); + this.fields = + Collections.unmodifiableMap( + fields.stream().collect(Collectors.toMap(FieldType::getName, FieldType::getType))); this.primaryKeys = Collections.unmodifiableList(new ArrayList<>(primaryKeys)); this.indexes = Collections.unmodifiableSet(new HashSet<>(indexes)); } @@ -185,7 +190,7 @@ public boolean isCompatible(StructuredTableSpecification spec) { public boolean isCompatible(StructuredTableSchema schema) { for (String field : getFieldNames()) { FieldType.Type type = schema.getType(field); - if (type == null || !getType(field).isCompatible(type)) { + if (type == null || !type.isCompatible(getType(field))) { return false; } } diff --git a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableAdminTest.java b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableAdminTest.java index 932d59f13dab..7819bbccaad6 100644 --- a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableAdminTest.java +++ b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableAdminTest.java @@ -126,6 +126,24 @@ public void testAdmin() throws Exception { Assert.assertEquals(updateSimpleTableSchema, expected); } + @Test + public void testCreateOrUpdateTwiceShouldSucceed() throws Exception { + StructuredTableAdmin admin = getStructuredTableAdmin(); + + // Assert SIMPLE_TABLE Empty + Assert.assertFalse(admin.exists(SIMPLE_TABLE)); + + // Calling to createOrUpdate the same SIMPLE_TABLE spec twice to mimic the scenario of + // connecting to an exsting DB + admin.createOrUpdate(SIMPLE_TABLE_SPEC); + admin.createOrUpdate(SIMPLE_TABLE_SPEC); + Assert.assertTrue(admin.exists(SIMPLE_TABLE)); + + // Assert SIMPLE_TABLE schema + StructuredTableSchema simpleTableSchema = admin.getSchema(SIMPLE_TABLE); + Assert.assertEquals(simpleTableSchema, new StructuredTableSchema(SIMPLE_TABLE_SPEC)); + } + @Test public void testBackwardCompatible() throws Exception { StructuredTableAdmin admin = getStructuredTableAdmin();