diff --git a/docs/content.zh/docs/core-concept/route.md b/docs/content.zh/docs/core-concept/route.md index dc870dd474..75ae903bc8 100644 --- a/docs/content.zh/docs/core-concept/route.md +++ b/docs/content.zh/docs/core-concept/route.md @@ -30,12 +30,12 @@ under the License. # Parameters To describe a route, the follows are required: -| parameter | meaning | optional/required | -|----------------|----------------------------------------------------|-------------------| -| source-table | Source table id, supports regular expressions | required | -| sink-table | Sink table id, supports regular expressions | required | -| replace-symbol | Special symbol in sink-table for pattern replacing | optional | -| description | Routing rule description(a default value provided) | optional | +| parameter | meaning | optional/required | +|----------------|---------------------------------------------------------------------------------------------|-------------------| +| source-table | Source table id, supports regular expressions | required | +| sink-table | Sink table id, supports regular expressions | required | +| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional | +| description | Routing rule description(a default value provided) | optional | A route module can contain a list of source-table/sink-table rules. diff --git a/docs/content/docs/core-concept/route.md b/docs/content/docs/core-concept/route.md index dc870dd474..75ae903bc8 100644 --- a/docs/content/docs/core-concept/route.md +++ b/docs/content/docs/core-concept/route.md @@ -30,12 +30,12 @@ under the License. # Parameters To describe a route, the follows are required: -| parameter | meaning | optional/required | -|----------------|----------------------------------------------------|-------------------| -| source-table | Source table id, supports regular expressions | required | -| sink-table | Sink table id, supports regular expressions | required | -| replace-symbol | Special symbol in sink-table for pattern replacing | optional | -| description | Routing rule description(a default value provided) | optional | +| parameter | meaning | optional/required | +|----------------|---------------------------------------------------------------------------------------------|-------------------| +| source-table | Source table id, supports regular expressions | required | +| sink-table | Sink table id, supports regular expressions | required | +| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional | +| description | Routing rule description(a default value provided) | optional | A route module can contain a list of source-table/sink-table rules. diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 2bbb06d96a..a8230fa483 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -71,6 +71,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -100,6 +101,7 @@ public class SchemaOperator extends AbstractStreamOperator private transient TaskOperatorEventGateway toCoordinator; private transient SchemaEvolutionClient schemaEvolutionClient; private transient LoadingCache cachedSchemas; + private transient LoadingCache> tableIdMappingCache; private final long rpcTimeOutInMillis; @@ -146,6 +148,16 @@ public Schema load(TableId tableId) { return getLatestSchema(tableId); } }); + tableIdMappingCache = + CacheBuilder.newBuilder() + .expireAfterAccess(CACHE_EXPIRE_DURATION) + .build( + new CacheLoader>() { + @Override + public List load(TableId tableId) { + return getRoutedTables(tableId); + } + }); } @Override @@ -164,7 +176,7 @@ public void initializeState(StateInitializationContext context) throws Exception */ @Override public void processElement(StreamRecord streamRecord) - throws InterruptedException, TimeoutException { + throws InterruptedException, TimeoutException, ExecutionException { Event event = streamRecord.getValue(); // Schema changes if (event instanceof SchemaChangeEvent) { @@ -175,15 +187,15 @@ public void processElement(StreamRecord streamRecord) handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event); // Update caches cachedSchemas.put(tableId, getLatestSchema(tableId)); - getRoutedTables(tableId) + tableIdMappingCache + .get(tableId) .forEach(routed -> cachedSchemas.put(routed, getLatestSchema(routed))); return; } // Data changes DataChangeEvent dataChangeEvent = (DataChangeEvent) event; - TableId tableId = dataChangeEvent.tableId(); - List optionalRoutedTable = getRoutedTables(tableId); + List optionalRoutedTable = tableIdMappingCache.get(dataChangeEvent.tableId()); if (optionalRoutedTable.isEmpty()) { output.collect(streamRecord); } else {