Skip to content

Commit

Permalink
Cache tableId route mappings & Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jul 5, 2024
1 parent 68580e5 commit c7ad18e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
12 changes: 6 additions & 6 deletions docs/content.zh/docs/core-concept/route.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ under the License.
# Parameters
To describe a route, the follows are required:

| parameter | meaning | optional/required |
|----------------|----------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports regular expressions | required |
| replace-symbol | Special symbol in sink-table for pattern replacing | optional |
| description | Routing rule description(a default value provided) | optional |
| parameter | meaning | optional/required |
|----------------|---------------------------------------------------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports regular expressions | required |
| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional |
| description | Routing rule description(a default value provided) | optional |

A route module can contain a list of source-table/sink-table rules.

Expand Down
12 changes: 6 additions & 6 deletions docs/content/docs/core-concept/route.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ under the License.
# Parameters
To describe a route, the follows are required:

| parameter | meaning | optional/required |
|----------------|----------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports regular expressions | required |
| replace-symbol | Special symbol in sink-table for pattern replacing | optional |
| description | Routing rule description(a default value provided) | optional |
| parameter | meaning | optional/required |
|----------------|---------------------------------------------------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| sink-table | Sink table id, supports regular expressions | required |
| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional |
| description | Routing rule description(a default value provided) | optional |

A route module can contain a list of source-table/sink-table rules.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -100,6 +101,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private transient TaskOperatorEventGateway toCoordinator;
private transient SchemaEvolutionClient schemaEvolutionClient;
private transient LoadingCache<TableId, Schema> cachedSchemas;
private transient LoadingCache<TableId, List<TableId>> tableIdMappingCache;

private final long rpcTimeOutInMillis;

Expand Down Expand Up @@ -146,6 +148,16 @@ public Schema load(TableId tableId) {
return getLatestSchema(tableId);
}
});
tableIdMappingCache =
CacheBuilder.newBuilder()
.expireAfterAccess(CACHE_EXPIRE_DURATION)
.build(
new CacheLoader<TableId, List<TableId>>() {
@Override
public List<TableId> load(TableId tableId) {
return getRoutedTables(tableId);
}
});
}

@Override
Expand All @@ -164,7 +176,7 @@ public void initializeState(StateInitializationContext context) throws Exception
*/
@Override
public void processElement(StreamRecord<Event> streamRecord)
throws InterruptedException, TimeoutException {
throws InterruptedException, TimeoutException, ExecutionException {
Event event = streamRecord.getValue();
// Schema changes
if (event instanceof SchemaChangeEvent) {
Expand All @@ -175,15 +187,15 @@ public void processElement(StreamRecord<Event> streamRecord)
handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
// Update caches
cachedSchemas.put(tableId, getLatestSchema(tableId));
getRoutedTables(tableId)
tableIdMappingCache
.get(tableId)
.forEach(routed -> cachedSchemas.put(routed, getLatestSchema(routed)));
return;
}

// Data changes
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
TableId tableId = dataChangeEvent.tableId();
List<TableId> optionalRoutedTable = getRoutedTables(tableId);
List<TableId> optionalRoutedTable = tableIdMappingCache.get(dataChangeEvent.tableId());
if (optionalRoutedTable.isEmpty()) {
output.collect(streamRecord);
} else {
Expand Down

0 comments on commit c7ad18e

Please sign in to comment.