diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 8c353320ba67a13..88c1889a80e6b92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -34,6 +34,7 @@ import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.HMSAnalysisTask; +import org.apache.doris.statistics.StatsType; import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.THiveTable; @@ -41,6 +42,7 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -81,7 +83,7 @@ public class HMSExternalTable extends ExternalTable { private static final Set SUPPORTED_HIVE_FILE_FORMATS; private static final Set SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS; - + private static final Map MAP_SPARK_STATS_TO_DORIS; private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties"; private static final String TBL_PROP_INSERT_ONLY = "insert_only"; @@ -89,6 +91,15 @@ public class HMSExternalTable extends ExternalTable { private static final String NUM_ROWS = "numRows"; + private static final String SPARK_COL_STATS = "spark.sql.statistics.colStats."; + private static final String SPARK_STATS_MAX = ".max"; + private static final String SPARK_STATS_MIN = ".min"; + private static final String SPARK_STATS_NDV = ".distinctCount"; + private static final String SPARK_STATS_NULLS = ".nullCount"; + private static final String SPARK_STATS_AVG_LEN = ".avgLen"; + private static final String SPARK_STATS_MAX_LEN = ".avgLen"; + private static final String SPARK_STATS_HISTOGRAM = ".histogram"; + static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -109,6 +120,17 @@ public class HMSExternalTable extends ExternalTable { SUPPORTED_HUDI_FILE_FORMATS.add("com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"); } + static { + MAP_SPARK_STATS_TO_DORIS = Maps.newHashMap(); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.NDV, SPARK_STATS_NDV); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.AVG_SIZE, SPARK_STATS_AVG_LEN); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_SIZE, SPARK_STATS_MAX_LEN); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.NUM_NULLS, SPARK_STATS_NULLS); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.MIN_VALUE, SPARK_STATS_MIN); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_VALUE, SPARK_STATS_MAX); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.HISTOGRAM, SPARK_STATS_HISTOGRAM); + } + private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; private List partitionColumns; @@ -536,6 +558,31 @@ private void initPartitionColumns(List schema) { LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); } + public boolean hasColumnStatistics(String colName) { + Map parameters = remoteTable.getParameters(); + return parameters.keySet().stream() + .filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent(); + } + + public boolean fillColumnStatistics(String colName, Map statsTypes, Map stats) { + makeSureInitialized(); + if (!hasColumnStatistics(colName)) { + return false; + } + + Map parameters = remoteTable.getParameters(); + for (StatsType type : statsTypes.keySet()) { + String key = SPARK_COL_STATS + colName + MAP_SPARK_STATS_TO_DORIS.getOrDefault(type, "-"); + if (parameters.containsKey(key)) { + stats.put(statsTypes.get(type), parameters.get(key)); + } else { + // should not happen, spark would have all type (except histogram) + stats.put(statsTypes.get(type), "NULL"); + } + } + return true; + } + @Override public Optional getColumnStatistic(String colName) { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 5053fc62a2d15fe..fd0a4c8253821ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -89,12 +90,18 @@ private void getTableStats() throws Exception { * Get column statistics and insert the result to __internal_schema.column_statistics */ protected void getTableColumnStats() throws Exception { - if (!info.usingSqlForPartitionColumn && isPartitionColumn()) { + if (!info.usingSqlForPartitionColumn) { try { - getPartitionColumnStats(); + if (isPartitionColumn()) { + getPartitionColumnStats(); + } else { + getHmsColumnStats(); + } } catch (Exception e) { - LOG.warn("Failed to collect stats for partition col {} using metadata, " - + "fallback to normal collection", col.getName(), e); + LOG.warn("Failed to collect stats for {}col {} using metadata, " + + "fallback to normal collection", + isPartitionColumn() ? "partition " : "", col.getName(), e); + /* retry using sql way! */ getOrdinaryColumnStats(); } } else { @@ -209,6 +216,32 @@ private void getPartitionColumnStats() throws Exception { runQuery(sql); } + // Collect the spark analyzed column stats through HMS metadata. + private void getHmsColumnStats() throws Exception { + TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); + long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount; + + Map params = buildStatsParams("NULL"); + Map statsParams = new HashMap<>(); + statsParams.put(StatsType.NDV, "ndv"); + statsParams.put(StatsType.NUM_NULLS, "null_count"); + statsParams.put(StatsType.MIN_VALUE, "min"); + statsParams.put(StatsType.MAX_VALUE, "max"); + statsParams.put(StatsType.AVG_SIZE, "avg_len"); + + if (table.fillColumnStatistics(info.colName, statsParams, params)) { + throw new AnalysisException("some column stats not available"); + } + + long dataSize = Long.valueOf(params.get("avg_len")) * count; + params.put("row_count", String.valueOf(count)); + params.put("data_size", String.valueOf(dataSize)); + + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE); + runQuery(sql); + } + private String updateMinValue(String currentMin, String value) { if (currentMin == null) { return value;