Skip to content

Commit

Permalink
[feat](meta) Reuse HMS statistics analyzed by Spark engine for Analyz…
Browse files Browse the repository at this point in the history
…e Task. (apache#28525)

Taking the Idea further from PR apache#24853 (apache#24853)
Column statistics already analyzed and available in HMS from spark, this PR proposes to reuse the analyzed stats from external source, when executed WITH SQL clause of analyze cooamd.

Spark analyzes and stores the statistics in Table properties instead of HiveColumnStatistics. In this PR, we try to get the statistics from these properties and make it available to Doris.
  • Loading branch information
Nitin-Kashyap authored and seawinde committed Jan 15, 2024
1 parent 50fab50 commit 68154b7
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.HMSAnalysisTask;
import org.apache.doris.statistics.StatsType;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -81,14 +83,23 @@ public class HMSExternalTable extends ExternalTable {

private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
private static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;

private static final Map<StatsType, String> MAP_SPARK_STATS_TO_DORIS;
private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties";
private static final String TBL_PROP_INSERT_ONLY = "insert_only";

private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime";

private static final String NUM_ROWS = "numRows";

private static final String SPARK_COL_STATS = "spark.sql.statistics.colStats.";
private static final String SPARK_STATS_MAX = ".max";
private static final String SPARK_STATS_MIN = ".min";
private static final String SPARK_STATS_NDV = ".distinctCount";
private static final String SPARK_STATS_NULLS = ".nullCount";
private static final String SPARK_STATS_AVG_LEN = ".avgLen";
private static final String SPARK_STATS_MAX_LEN = ".avgLen";
private static final String SPARK_STATS_HISTOGRAM = ".histogram";

static {
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
Expand All @@ -109,6 +120,17 @@ public class HMSExternalTable extends ExternalTable {
SUPPORTED_HUDI_FILE_FORMATS.add("com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat");
}

static {
MAP_SPARK_STATS_TO_DORIS = Maps.newHashMap();
MAP_SPARK_STATS_TO_DORIS.put(StatsType.NDV, SPARK_STATS_NDV);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.AVG_SIZE, SPARK_STATS_AVG_LEN);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_SIZE, SPARK_STATS_MAX_LEN);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.NUM_NULLS, SPARK_STATS_NULLS);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.MIN_VALUE, SPARK_STATS_MIN);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_VALUE, SPARK_STATS_MAX);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.HISTOGRAM, SPARK_STATS_HISTOGRAM);
}

private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
private List<Column> partitionColumns;

Expand Down Expand Up @@ -536,6 +558,31 @@ private void initPartitionColumns(List<Column> schema) {
LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name);
}

public boolean hasColumnStatistics(String colName) {
Map<String, String> parameters = remoteTable.getParameters();
return parameters.keySet().stream()
.filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent();
}

public boolean fillColumnStatistics(String colName, Map<StatsType, String> statsTypes, Map<String, String> stats) {
makeSureInitialized();
if (!hasColumnStatistics(colName)) {
return false;
}

Map<String, String> parameters = remoteTable.getParameters();
for (StatsType type : statsTypes.keySet()) {
String key = SPARK_COL_STATS + colName + MAP_SPARK_STATS_TO_DORIS.getOrDefault(type, "-");
if (parameters.containsKey(key)) {
stats.put(statsTypes.get(type), parameters.get(key));
} else {
// should not happen, spark would have all type (except histogram)
stats.put(statsTypes.get(type), "NULL");
}
}
return true;
}

@Override
public Optional<ColumnStatistic> getColumnStatistic(String colName) {
makeSureInitialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
Expand Down Expand Up @@ -89,12 +90,18 @@ private void getTableStats() throws Exception {
* Get column statistics and insert the result to __internal_schema.column_statistics
*/
protected void getTableColumnStats() throws Exception {
if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
if (!info.usingSqlForPartitionColumn) {
try {
getPartitionColumnStats();
if (isPartitionColumn()) {
getPartitionColumnStats();
} else {
getHmsColumnStats();
}
} catch (Exception e) {
LOG.warn("Failed to collect stats for partition col {} using metadata, "
+ "fallback to normal collection", col.getName(), e);
LOG.warn("Failed to collect stats for {}col {} using metadata, "
+ "fallback to normal collection",
isPartitionColumn() ? "partition " : "", col.getName(), e);
/* retry using sql way! */
getOrdinaryColumnStats();
}
} else {
Expand Down Expand Up @@ -209,6 +216,32 @@ private void getPartitionColumnStats() throws Exception {
runQuery(sql);
}

// Collect the spark analyzed column stats through HMS metadata.
private void getHmsColumnStats() throws Exception {
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount;

Map<String, String> params = buildStatsParams("NULL");
Map<StatsType, String> statsParams = new HashMap<>();
statsParams.put(StatsType.NDV, "ndv");
statsParams.put(StatsType.NUM_NULLS, "null_count");
statsParams.put(StatsType.MIN_VALUE, "min");
statsParams.put(StatsType.MAX_VALUE, "max");
statsParams.put(StatsType.AVG_SIZE, "avg_len");

if (table.fillColumnStatistics(info.colName, statsParams, params)) {
throw new AnalysisException("some column stats not available");
}

long dataSize = Long.valueOf(params.get("avg_len")) * count;
params.put("row_count", String.valueOf(count));
params.put("data_size", String.valueOf(dataSize));

StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
runQuery(sql);
}

private String updateMinValue(String currentMin, String value) {
if (currentMin == null) {
return value;
Expand Down

0 comments on commit 68154b7

Please sign in to comment.