From 68154b7d882cdd474a98b15debc49080d1c3cb7f Mon Sep 17 00:00:00 2001 From: Nitin-Kashyap <66766227+Nitin-Kashyap@users.noreply.github.com> Date: Fri, 12 Jan 2024 14:18:27 +0530 Subject: [PATCH] [feat](meta) Reuse HMS statistics analyzed by Spark engine for Analyze Task. (#28525) Taking the Idea further from PR #24853 (#24853) Column statistics already analyzed and available in HMS from spark, this PR proposes to reuse the analyzed stats from external source, when executed WITH SQL clause of analyze cooamd. Spark analyzes and stores the statistics in Table properties instead of HiveColumnStatistics. In this PR, we try to get the statistics from these properties and make it available to Doris. --- .../catalog/external/HMSExternalTable.java | 49 ++++++++++++++++++- .../doris/statistics/HMSAnalysisTask.java | 41 ++++++++++++++-- 2 files changed, 85 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 8c353320ba67a13..88c1889a80e6b92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -34,6 +34,7 @@ import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.HMSAnalysisTask; +import org.apache.doris.statistics.StatsType; import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.THiveTable; @@ -41,6 +42,7 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -81,7 +83,7 @@ public class HMSExternalTable extends ExternalTable { private static final Set SUPPORTED_HIVE_FILE_FORMATS; private static final Set SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS; - + private static final Map MAP_SPARK_STATS_TO_DORIS; private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties"; private static final String TBL_PROP_INSERT_ONLY = "insert_only"; @@ -89,6 +91,15 @@ public class HMSExternalTable extends ExternalTable { private static final String NUM_ROWS = "numRows"; + private static final String SPARK_COL_STATS = "spark.sql.statistics.colStats."; + private static final String SPARK_STATS_MAX = ".max"; + private static final String SPARK_STATS_MIN = ".min"; + private static final String SPARK_STATS_NDV = ".distinctCount"; + private static final String SPARK_STATS_NULLS = ".nullCount"; + private static final String SPARK_STATS_AVG_LEN = ".avgLen"; + private static final String SPARK_STATS_MAX_LEN = ".avgLen"; + private static final String SPARK_STATS_HISTOGRAM = ".histogram"; + static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -109,6 +120,17 @@ public class HMSExternalTable extends ExternalTable { SUPPORTED_HUDI_FILE_FORMATS.add("com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"); } + static { + MAP_SPARK_STATS_TO_DORIS = Maps.newHashMap(); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.NDV, SPARK_STATS_NDV); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.AVG_SIZE, SPARK_STATS_AVG_LEN); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_SIZE, SPARK_STATS_MAX_LEN); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.NUM_NULLS, SPARK_STATS_NULLS); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.MIN_VALUE, SPARK_STATS_MIN); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_VALUE, SPARK_STATS_MAX); + MAP_SPARK_STATS_TO_DORIS.put(StatsType.HISTOGRAM, SPARK_STATS_HISTOGRAM); + } + private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; private List partitionColumns; @@ -536,6 +558,31 @@ private void initPartitionColumns(List schema) { LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); } + public boolean hasColumnStatistics(String colName) { + Map parameters = remoteTable.getParameters(); + return parameters.keySet().stream() + .filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent(); + } + + public boolean fillColumnStatistics(String colName, Map statsTypes, Map stats) { + makeSureInitialized(); + if (!hasColumnStatistics(colName)) { + return false; + } + + Map parameters = remoteTable.getParameters(); + for (StatsType type : statsTypes.keySet()) { + String key = SPARK_COL_STATS + colName + MAP_SPARK_STATS_TO_DORIS.getOrDefault(type, "-"); + if (parameters.containsKey(key)) { + stats.put(statsTypes.get(type), parameters.get(key)); + } else { + // should not happen, spark would have all type (except histogram) + stats.put(statsTypes.get(type), "NULL"); + } + } + return true; + } + @Override public Optional getColumnStatistic(String colName) { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 5053fc62a2d15fe..fd0a4c8253821ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -89,12 +90,18 @@ private void getTableStats() throws Exception { * Get column statistics and insert the result to __internal_schema.column_statistics */ protected void getTableColumnStats() throws Exception { - if (!info.usingSqlForPartitionColumn && isPartitionColumn()) { + if (!info.usingSqlForPartitionColumn) { try { - getPartitionColumnStats(); + if (isPartitionColumn()) { + getPartitionColumnStats(); + } else { + getHmsColumnStats(); + } } catch (Exception e) { - LOG.warn("Failed to collect stats for partition col {} using metadata, " - + "fallback to normal collection", col.getName(), e); + LOG.warn("Failed to collect stats for {}col {} using metadata, " + + "fallback to normal collection", + isPartitionColumn() ? "partition " : "", col.getName(), e); + /* retry using sql way! */ getOrdinaryColumnStats(); } } else { @@ -209,6 +216,32 @@ private void getPartitionColumnStats() throws Exception { runQuery(sql); } + // Collect the spark analyzed column stats through HMS metadata. + private void getHmsColumnStats() throws Exception { + TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); + long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount; + + Map params = buildStatsParams("NULL"); + Map statsParams = new HashMap<>(); + statsParams.put(StatsType.NDV, "ndv"); + statsParams.put(StatsType.NUM_NULLS, "null_count"); + statsParams.put(StatsType.MIN_VALUE, "min"); + statsParams.put(StatsType.MAX_VALUE, "max"); + statsParams.put(StatsType.AVG_SIZE, "avg_len"); + + if (table.fillColumnStatistics(info.colName, statsParams, params)) { + throw new AnalysisException("some column stats not available"); + } + + long dataSize = Long.valueOf(params.get("avg_len")) * count; + params.put("row_count", String.valueOf(count)); + params.put("data_size", String.valueOf(dataSize)); + + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE); + runQuery(sql); + } + private String updateMinValue(String currentMin, String value) { if (currentMin == null) { return value;