Skip to content

Commit

Permalink
Merge pull request #15296 from cdapio/bug/CDAP-20786
Browse files Browse the repository at this point in the history
fix(CDAP-20786): should check Spanner schema equality after compatibility conversion
  • Loading branch information
de-lan authored Aug 23, 2023
2 parents c842322 + e7dfbc2 commit dc007fa
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -69,6 +70,12 @@ static String getIndexName(StructuredTableId tableId, String column) {
return String.format("%s_%s_idx", tableId.getName(), column);
}

/**
* Constructor for {@code SpannerStructuredTableAdmin}.
*
* @param spanner the gcp Spanner service.
* @param databaseId the ID of the Spanner instance database.
*/
public SpannerStructuredTableAdmin(Spanner spanner, DatabaseId databaseId) {
this.databaseId = databaseId;
this.adminClient = spanner.getDatabaseAdminClient();
Expand Down Expand Up @@ -127,7 +134,7 @@ private void updateTable(StructuredTableSpecification spec)
throws IOException, TableNotFoundException, TableSchemaIncompatibleException {
StructuredTableId tableId = spec.getTableId();
StructuredTableSchema cachedTableSchema = getSchema(tableId);
StructuredTableSchema newTableSchema = new StructuredTableSchema(spec);
StructuredTableSchema newTableSchema = convertSpecToCompatibleSchema(spec);

if (newTableSchema.equals(cachedTableSchema)) {
LOG.trace("The table schema is already up to date: {}", tableId);
Expand Down Expand Up @@ -165,6 +172,21 @@ private void updateTable(StructuredTableSpecification spec)
}
}

@VisibleForTesting
static StructuredTableSchema convertSpecToCompatibleSchema(StructuredTableSpecification spec) {
List<FieldType> convertedFieldTypes =
spec.getFieldTypes().stream()
.map(
fieldType -> {
String spannerType = getSpannerType(fieldType.getType());
FieldType.Type convertedType = fromSpannerType(spannerType);
return new FieldType(fieldType.getName(), convertedType);
})
.collect(Collectors.toList());
return new StructuredTableSchema(
spec.getTableId(), convertedFieldTypes, spec.getPrimaryKeys(), spec.getIndexes());
}

@Override
public void drop(StructuredTableId tableId) throws IOException {
List<String> ddlStatements = new ArrayList<>();
Expand Down Expand Up @@ -286,7 +308,7 @@ private String getCreateIndexStatement(String idxColumn, StructuredTableSchema s
return createIndex;
}

private String getSpannerType(FieldType.Type fieldType) {
private static String getSpannerType(FieldType.Type fieldType) {
switch (fieldType) {
case INTEGER:
case LONG:
Expand All @@ -306,7 +328,7 @@ private String getSpannerType(FieldType.Type fieldType) {
}
}

private FieldType.Type fromSpannerType(String spannerType) {
private static FieldType.Type fromSpannerType(String spannerType) {
switch (spannerType.toLowerCase()) {
case "int64":
return FieldType.Type.LONG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cdap.cdap.storage.spanner;

import static io.cdap.cdap.storage.spanner.SpannerStructuredTableAdmin.convertSpecToCompatibleSchema;

import io.cdap.cdap.api.metrics.MetricsCollector;
import io.cdap.cdap.spi.data.StorageProviderContext;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
Expand Down Expand Up @@ -59,13 +61,12 @@ public static void createSpannerStorageProvider() throws Exception {
Assume.assumeNotNull(project, instance, database);

Map<String, String> configs = new HashMap<>();
configs.put(SpannerStorageProvider.PROJECT, project);
configs.put(SpannerStorageProvider.INSTANCE, instance);
configs.put(SpannerStorageProvider.DATABASE, database);

if (credentialsPath != null) {
configs.put(SpannerStorageProvider.CREDENTIALS_PATH, credentialsPath);
}
configs.put(SpannerStorageProvider.PROJECT, project);
configs.put(SpannerStorageProvider.INSTANCE, instance);
configs.put(SpannerStorageProvider.DATABASE, database);

StorageProviderContext context = new MockStorageProviderContext(configs);

Expand Down Expand Up @@ -103,17 +104,38 @@ public void testAdmin() throws Exception {
admin.createOrUpdate(SIMPLE_TABLE_SPEC);
Assert.assertTrue(admin.exists(SIMPLE_TABLE));

// Assert SIMPLE_TABLE schema
// ONLY checking compatibility because of INT/LONG to INT64 conversion in Spanner
// Assert SIMPLE_TABLE schema: checking equality after compatible conversion because of INT/LONG
// to INT64 conversion in Spanner
StructuredTableSchema simpleTableSchema = admin.getSchema(SIMPLE_TABLE);
Assert.assertTrue(simpleTableSchema.isCompatible(SIMPLE_TABLE_SPEC));
Assert.assertEquals(simpleTableSchema, convertSpecToCompatibleSchema(SIMPLE_TABLE_SPEC));

// Update SIMPLE_TABLE spec to UPDATED_SIMPLE_TABLE_SPEC
admin.createOrUpdate(UPDATED_SIMPLE_TABLE_SPEC);

// Assert UPDATED_SIMPLE_TABLE_SPEC schema
StructuredTableSchema updateSimpleTableSchema = admin.getSchema(SIMPLE_TABLE);
Assert.assertTrue(updateSimpleTableSchema.isCompatible(UPDATED_SIMPLE_TABLE_SPEC));
Assert.assertEquals(
updateSimpleTableSchema, convertSpecToCompatibleSchema(UPDATED_SIMPLE_TABLE_SPEC));
}

@Test
@Override
public void testCreateOrUpdateTwiceShouldSucceed() throws Exception {
StructuredTableAdmin admin = getStructuredTableAdmin();

// Assert SIMPLE_TABLE Empty
Assert.assertFalse(admin.exists(SIMPLE_TABLE));

// Calling to createOrUpdate the same SIMPLE_TABLE spec twice to mimic the scenario of
// connecting to an exsting DB and make sure the second time passes the equality check after
// schema compatibility conversion
admin.createOrUpdate(SIMPLE_TABLE_SPEC);
admin.createOrUpdate(SIMPLE_TABLE_SPEC);
Assert.assertTrue(admin.exists(SIMPLE_TABLE));

// Assert SIMPLE_TABLE schema
StructuredTableSchema simpleTableSchema = admin.getSchema(SIMPLE_TABLE);
Assert.assertEquals(simpleTableSchema, convertSpecToCompatibleSchema(SIMPLE_TABLE_SPEC));
}

@Test
Expand All @@ -130,8 +152,10 @@ public void testInconsistentKeyOrderInSchema() throws Exception {

// Assert INCONSISTENT_PRIMARY_KEY_TABLE schema
StructuredTableSchema tableSchema = admin.getSchema(INCONSISTENT_PRIMARY_KEY_TABLE);
// ONLY checking compatibility because of INT/LONG to INT64 conversion in Spanner
Assert.assertTrue(tableSchema.isCompatible(INCONSISTENT_PRIMARY_KEY_TABLE_SPEC));
// Checking equality after compatible conversion because of INT/LONG to INT64 conversion in
// Spanner
Assert.assertEquals(
tableSchema, convertSpecToCompatibleSchema(INCONSISTENT_PRIMARY_KEY_TABLE_SPEC));
}

private static final class MockStorageProviderContext implements StorageProviderContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@ public StructuredTableSchema(StructuredTableSpecification spec) {
this(spec.getTableId(), spec.getFieldTypes(), spec.getPrimaryKeys(), spec.getIndexes());
}

public StructuredTableSchema(StructuredTableId tableId, List<FieldType> fields,
List<String> primaryKeys, Collection<String> indexes) {
/** Constructor of {@code StructuredTableSchema} with table schema details. */
public StructuredTableSchema(
StructuredTableId tableId,
List<FieldType> fields,
List<String> primaryKeys,
Collection<String> indexes) {
this.tableId = tableId;
this.fields = Collections.unmodifiableMap(fields.stream().collect(
Collectors.toMap(FieldType::getName, FieldType::getType)));
this.fields =
Collections.unmodifiableMap(
fields.stream().collect(Collectors.toMap(FieldType::getName, FieldType::getType)));
this.primaryKeys = Collections.unmodifiableList(new ArrayList<>(primaryKeys));
this.indexes = Collections.unmodifiableSet(new HashSet<>(indexes));
}
Expand Down Expand Up @@ -185,7 +190,7 @@ public boolean isCompatible(StructuredTableSpecification spec) {
public boolean isCompatible(StructuredTableSchema schema) {
for (String field : getFieldNames()) {
FieldType.Type type = schema.getType(field);
if (type == null || !getType(field).isCompatible(type)) {
if (type == null || !type.isCompatible(getType(field))) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ public void testAdmin() throws Exception {
Assert.assertEquals(updateSimpleTableSchema, expected);
}

@Test
public void testCreateOrUpdateTwiceShouldSucceed() throws Exception {
StructuredTableAdmin admin = getStructuredTableAdmin();

// Assert SIMPLE_TABLE Empty
Assert.assertFalse(admin.exists(SIMPLE_TABLE));

// Calling to createOrUpdate the same SIMPLE_TABLE spec twice to mimic the scenario of
// connecting to an exsting DB
admin.createOrUpdate(SIMPLE_TABLE_SPEC);
admin.createOrUpdate(SIMPLE_TABLE_SPEC);
Assert.assertTrue(admin.exists(SIMPLE_TABLE));

// Assert SIMPLE_TABLE schema
StructuredTableSchema simpleTableSchema = admin.getSchema(SIMPLE_TABLE);
Assert.assertEquals(simpleTableSchema, new StructuredTableSchema(SIMPLE_TABLE_SPEC));
}

@Test
public void testBackwardCompatible() throws Exception {
StructuredTableAdmin admin = getStructuredTableAdmin();
Expand Down

0 comments on commit dc007fa

Please sign in to comment.