diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java index 06a7cd1b..179e2fc7 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java @@ -34,15 +34,17 @@ import java.text.SimpleDateFormat; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class StarRocksDynamicLRUFunction extends TableFunction { private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicLRUFunction.class); - + private final ColumnRichInfo[] filterRichInfos; private final StarRocksSourceOptions sourceOptions; private final ArrayList filterList; @@ -56,7 +58,7 @@ public class StarRocksDynamicLRUFunction extends TableFunction { private final long cacheExpireMs; private final int maxRetryTimes; - public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions, + public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions, ColumnRichInfo[] filterRichInfos, List columnRichInfos, SelectColumn[] selectColumns) { @@ -72,7 +74,7 @@ public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions, this.filterList = new ArrayList<>(); this.dataReaderList = new ArrayList<>(); } - + @Override public void open(FunctionContext context) throws Exception { super.open(context); @@ -101,14 +103,17 @@ public void eval(Object... keys) { } String filter = String.join(" and ", filterList); filterList.clear(); - String SQL = "select * from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter; + String columns = Arrays.stream(selectColumns) + .map(col -> "`" + col.getColumnName() + "`") + .collect(Collectors.joining(",")); + String SQL = "select " + columns + " from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter; LOG.info("LookUpFunction SQL [{}]", SQL); this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, SQL); List> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo); lists.get(0).forEach(beXTablets -> { StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(beXTablets.getBeNode(), columnRichInfos, - selectColumns, + selectColumns, sourceOptions); beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions); beReader.startToRead(); @@ -132,7 +137,7 @@ public void eval(Object... keys) { }); rows.trimToSize(); cache.put(keyRow, rows); - } + } } private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) { @@ -147,9 +152,9 @@ private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) { filter = columnRichInfo.getColumnName() + " = '" + sdf.format(d).toString() + "'"; } if (flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE || - flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE || + flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE || flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) { - + DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); String strDateTime = dtf.format(((TimestampData)obj).toLocalDateTime()); filter = columnRichInfo.getColumnName() + " = '" + strDateTime + "'"; diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java index 27dde932..14392f3d 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java @@ -25,11 +25,13 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; import com.starrocks.connector.flink.table.source.struct.PushDownHolder; import com.starrocks.connector.flink.table.source.struct.SelectColumn; +import org.apache.flink.table.functions.TableFunction; import java.util.ArrayList; import java.util.Arrays; @@ -58,32 +60,77 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction( - options, flinkSchema, - this.pushDownHolder.getFilter(), - this.pushDownHolder.getLimit(), - this.pushDownHolder.getSelectColumns(), + options, flinkSchema, + this.pushDownHolder.getFilter(), + this.pushDownHolder.getLimit(), + this.pushDownHolder.getSelectColumns(), this.pushDownHolder.getQueryType()); return SourceFunctionProvider.of(sourceFunction, true); } @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - int[] projectedFields = Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray(); + Map columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema); + List allColumnRichInfos = + StarRocksSourceCommonFunc.genColumnRichInfo(columnMap); + SelectColumn[] pushDownSelectColumns = pushDownHolder.getSelectColumns(); + SelectColumn[] selectColumns; + List columnRichInfos; + int[] projectedFields = + Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray(); ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length]; - for (int i = 0; i < projectedFields.length; i ++) { - ColumnRichInfo columnRichInfo = new ColumnRichInfo( - this.flinkSchema.getFieldName(projectedFields[i]).get(), - projectedFields[i], - this.flinkSchema.getFieldDataType(projectedFields[i]).get() - ); - filerRichInfo[i] = columnRichInfo; + StarRocksSourceQueryType queryType = pushDownHolder.getQueryType(); + if (queryType == StarRocksSourceQueryType.QuerySomeColumns) { + columnRichInfos = new ArrayList<>(); + selectColumns = new SelectColumn[pushDownSelectColumns.length]; + for (int i = 0; i < pushDownSelectColumns.length; i++) { + ColumnRichInfo columnRichInfo = + allColumnRichInfos.get( + pushDownSelectColumns[i].getColumnIndexInFlinkTable()); + columnRichInfos.add( + new ColumnRichInfo( + columnRichInfo.getColumnName(), i, columnRichInfo.getDataType())); + selectColumns[i] = new SelectColumn(columnRichInfo.getColumnName(), i); + } + for (int i = 0; i < projectedFields.length; i++) { + int columnIndexInFlinkTable = pushDownSelectColumns[i].getColumnIndexInFlinkTable(); + ColumnRichInfo columnRichInfo = + new ColumnRichInfo( + this.flinkSchema.getFieldName(columnIndexInFlinkTable).get(), + i, + this.flinkSchema.getFieldDataType(columnIndexInFlinkTable).get()); + + filerRichInfo[i] = columnRichInfo; + } + } else { + columnRichInfos = allColumnRichInfos; + selectColumns = + StarRocksSourceCommonFunc.genSelectedColumns( + columnMap, this.options, allColumnRichInfos); + for (int i = 0; i < projectedFields.length; i++) { + ColumnRichInfo columnRichInfo = + new ColumnRichInfo( + this.flinkSchema.getFieldName(i).get(), + projectedFields[i], + this.flinkSchema.getFieldDataType(i).get()); + filerRichInfo[i] = columnRichInfo; + } } - Map columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema); - List ColumnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(columnMap); - SelectColumn[] selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(columnMap, this.options, ColumnRichInfos); - - StarRocksDynamicLookupFunction tableFunction = new StarRocksDynamicLookupFunction(this.options, filerRichInfo, ColumnRichInfos, selectColumns); + TableFunction tableFunction = null; + StarRocksSourceOptions.CacheType lookupCacheType = options.getLookupCacheType(); + switch (lookupCacheType) { + case ALL: + tableFunction = + new StarRocksDynamicLookupFunction( + this.options, filerRichInfo, columnRichInfos, selectColumns); + break; + case LRU: + tableFunction = + new StarRocksDynamicLRUFunction( + this.options, filerRichInfo, columnRichInfos, selectColumns); + break; + } return TableFunctionProvider.of(tableFunction); } @@ -113,7 +160,7 @@ public void applyProjection(int[][] projectedFields) { this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns); ArrayList columnList = new ArrayList<>(); - ArrayList selectColumns = new ArrayList(); + ArrayList selectColumns = new ArrayList(); for (int index : curProjectedFields) { String columnName = flinkSchema.getFieldName(index).get(); columnList.add(columnName); diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java index e340b2cd..40b295b1 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java @@ -79,6 +79,7 @@ public Set> optionalOptions() { options.add(StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS); options.add(StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS); options.add(StarRocksSourceOptions.LOOKUP_MAX_RETRIES); + options.add(StarRocksSourceOptions.LOOKUP_CACHE_TYPE); return options; } } diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java index eb2db84a..c14124c2 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java @@ -37,11 +37,11 @@ public class StarRocksSourceCommonFunc { - + private static volatile StarRocksQueryVisitor starrocksQueryVisitor; private static volatile StarRocksQueryPlanVisitor starRocksQueryPlanVisitor; - + private static StarRocksQueryVisitor getStarRocksQueryVisitor(StarRocksSourceOptions sourceOptions) { if (null == starrocksQueryVisitor) { @@ -84,7 +84,7 @@ public static List> splitQueryBeXTablets(int subTaskCount, curBeXTabletList.set(i, Collections.singletonList(queryInfo.getBeXTablets().get(i))); } return curBeXTabletList; - } + } if (subTaskCount < beXTabletsListCount) { for (int i = 0; i < beXTabletsListCount; i ++) { List tList = curBeXTabletList.get(i%subTaskCount); @@ -92,7 +92,7 @@ public static List> splitQueryBeXTablets(int subTaskCount, curBeXTabletList.set(i%subTaskCount, tList); } return curBeXTabletList; - } + } List beWithSingleTabletList = new ArrayList<>(); queryInfo.getBeXTablets().forEach(beXTablets -> { beXTablets.getTabletIds().forEach(tabletId -> { @@ -106,7 +106,7 @@ public static List> splitQueryBeXTablets(int subTaskCount, curBeXTabletList.set(i, Collections.singletonList(beWithSingleTabletList.get(i))); } return curBeXTabletList; - } + } long newx = Math.round(x); for (int i = 0; i < subTaskCount; i ++) { int start = (int)(i * newx); @@ -124,7 +124,7 @@ public static List> splitQueryBeXTablets(int subTaskCount, curBxTs = beWithSingleTabletList.subList(start, end); Map> beXTabletsMap = new HashMap<>(); curBxTs.forEach(curBxT -> { - List tablets = new ArrayList<>(); + List tablets = new ArrayList<>(); if (beXTabletsMap.containsKey(curBxT.getBeNode())) { tablets = beXTabletsMap.get(curBxT.getBeNode()); } else { @@ -174,8 +174,12 @@ public static List genColumnRichInfo(Map return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList()); } + public static List getSelectSql(Map columnMap) { + return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList()); + } + public static SelectColumn[] genSelectedColumns(Map columnMap, - StarRocksSourceOptions sourceOptions, + StarRocksSourceOptions sourceOptions, List columnRichInfos) { List selectedColumns = new ArrayList<>(); // user selected columns from sourceOptions diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java index cd739d76..cff69be3 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java @@ -53,24 +53,24 @@ public class StarRocksSourceOptions implements Serializable { public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name") .stringType().noDefaultValue().withDescription("Table name"); - - + + // optional Options public static final ConfigOption SCAN_CONNECT_TIMEOUT = ConfigOptions.key("scan.connect.timeout-ms") .intType().defaultValue(1000).withDescription("Connect timeout"); - + public static final ConfigOption SCAN_BATCH_ROWS = ConfigOptions.key("scan.params.batch-rows") .intType().defaultValue(1000).withDescription("Batch rows"); public static final ConfigOption SCAN_PROPERTIES = ConfigOptions.key("scan.params.properties") .stringType().noDefaultValue().withDescription("Reserved params for use"); - + public static final ConfigOption SCAN_LIMIT = ConfigOptions.key("scan.params.limit") .intType().defaultValue(1).withDescription("The query limit, if specified."); public static final ConfigOption SCAN_KEEP_ALIVE_MIN = ConfigOptions.key("scan.params.keep-alive-min") .intType().defaultValue(10).withDescription("Max keep alive time min"); - + public static final ConfigOption SCAN_QUERTY_TIMEOUT_S = ConfigOptions.key("scan.params.query-timeout-s") .intType().defaultValue(600).withDescription("Query timeout for a single query"); @@ -88,7 +88,7 @@ public class StarRocksSourceOptions implements Serializable { public static final ConfigOption SCAN_BE_HOST_MAPPING_LIST = ConfigOptions.key("scan.be-host-mapping-list") .stringType().defaultValue("").withDescription("List of be host mapping"); - + // lookup Options public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows") .longType().defaultValue(-1L).withDescription( @@ -102,6 +102,12 @@ public class StarRocksSourceOptions implements Serializable { public static final ConfigOption LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries") .intType().defaultValue(1).withDescription("the max retry times if lookup database failed."); + public static final ConfigOption LOOKUP_CACHE_TYPE = + ConfigOptions.key("lookup.cache-type") + .enumType(CacheType.class) + .defaultValue(CacheType.ALL) + .withDescription("lookup type."); + public static final String SOURCE_PROPERTIES_PREFIX = "scan.params."; @@ -150,7 +156,7 @@ public String getScanUrl() { public String getJdbcUrl() { return tableOptions.get(JDBC_URL); } - + public String getUsername() { return tableOptions.get(USERNAME); } @@ -169,8 +175,8 @@ public String getTableName() { // optional Options - public int getConnectTimeoutMs() { - return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue(); + public int getConnectTimeoutMs() { + return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue(); } public int getBatchRows() { @@ -236,10 +242,23 @@ public int getLookupMaxRetries() { return tableOptions.get(LOOKUP_MAX_RETRIES).intValue(); } + public static Builder builder() { return new Builder(); } + public CacheType getLookupCacheType() { + return tableOptions.get(LOOKUP_CACHE_TYPE); + } + + /** + * Cache Type + */ + public enum CacheType { + LRU, + ALL + } + /** * Builder for {@link StarRocksSourceOptions}. */