Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](metatable) support table$partitions for hive table #40774

Merged
merged 14 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MemTracker final {

const std::string& label() const { return _label; }
std::string log_usage() const {
return fmt::format("MemTracker Lame={}, Used={}({} B), Peak={}({} B)",
return fmt::format("MemTracker name={}, Used={}({} B), Peak={}({} B)", _label,
MemCounter::print_bytes(consumption()), consumption(),
MemCounter::print_bytes(peak_consumption()), peak_consumption());
}
Expand Down
170 changes: 107 additions & 63 deletions be/src/vec/exec/scan/vmeta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ Status VMetaScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
VLOG_CRITICAL << "VMetaScanner::prepare";
RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts));
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
bool has_col_nullable =
std::any_of(std::begin(_tuple_desc->slots()), std::end(_tuple_desc->slots()),
[](SlotDescriptor* slot_desc) { return slot_desc->is_nullable(); });

if (has_col_nullable) {
// We do not allow any columns to be Nullable here, since FE can not
// transmit a NULL value to BE, so we can not distinguish a empty string
// from a NULL value.
return Status::InternalError("Logical error, VMetaScanner do not allow ColumnNullable");
}
return Status::OK();
}

Expand Down Expand Up @@ -146,60 +136,90 @@ Status VMetaScanner::_fill_block_with_remote_data(const std::vector<MutableColum
}

for (int _row_idx = 0; _row_idx < _batch_data.size(); _row_idx++) {
// No need to check nullable column since no nullable column is
// guaranteed in VMetaScanner::prepare
vectorized::IColumn* col_ptr = columns[col_idx].get();
switch (slot_desc->type().type) {
case TYPE_BOOLEAN: {
bool data = _batch_data[_row_idx].column_value[col_idx].boolVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)
->insert_value((uint8_t)data);
break;
}
case TYPE_INT: {
int64_t data = _batch_data[_row_idx].column_value[col_idx].intVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_BIGINT: {
int64_t data = _batch_data[_row_idx].column_value[col_idx].longVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_FLOAT: {
double data = _batch_data[_row_idx].column_value[col_idx].doubleVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_DOUBLE: {
double data = _batch_data[_row_idx].column_value[col_idx].doubleVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_DATETIMEV2: {
uint64_t data = _batch_data[_row_idx].column_value[col_idx].longVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_STRING:
case TYPE_CHAR:
case TYPE_VARCHAR: {
std::string data = _batch_data[_row_idx].column_value[col_idx].stringVal;
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data.c_str(),
data.length());
break;
}
default: {
std::string error_msg =
fmt::format("Invalid column type {} on column: {}.",
slot_desc->type().debug_string(), slot_desc->col_name());
return Status::InternalError(std::string(error_msg));
}
TCell& cell = _batch_data[_row_idx].column_value[col_idx];
if (cell.__isset.isNull && cell.isNull) {
DCHECK(slot_desc->is_nullable())
<< "cell is null but column is not nullable: " << slot_desc->col_name();
auto& null_col = reinterpret_cast<ColumnNullable&>(*col_ptr);
null_col.get_nested_column().insert_default();
null_col.get_null_map_data().push_back(1);
} else {
if (slot_desc->is_nullable()) {
auto& null_col = reinterpret_cast<ColumnNullable&>(*col_ptr);
null_col.get_null_map_data().push_back(0);
col_ptr = null_col.get_nested_column_ptr();
}
switch (slot_desc->type().type) {
case TYPE_BOOLEAN: {
bool data = cell.boolVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)
->insert_value((uint8_t)data);
break;
}
case TYPE_TINYINT: {
int8_t data = (int8_t)cell.intVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_SMALLINT: {
int16_t data = (int16_t)cell.intVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_INT: {
int32_t data = cell.intVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_BIGINT: {
int64_t data = cell.longVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_FLOAT: {
double data = cell.doubleVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_DOUBLE: {
double data = cell.doubleVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_DATEV2: {
uint32_t data = (uint32_t)cell.longVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_DATETIMEV2: {
uint64_t data = cell.longVal;
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)
->insert_value(data);
break;
}
case TYPE_STRING:
case TYPE_CHAR:
case TYPE_VARCHAR: {
std::string data = cell.stringVal;
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(
data.c_str(), data.length());
break;
}
default: {
std::string error_msg =
fmt::format("Invalid column type {} on column: {}.",
slot_desc->type().debug_string(), slot_desc->col_name());
return Status::InternalError(std::string(error_msg));
}
}
}
}
}
Expand Down Expand Up @@ -241,6 +261,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::TASKS:
RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::PARTITION_VALUES:
RETURN_IF_ERROR(_build_partition_values_metadata_request(meta_scan_range, &request));
break;
default:
_meta_eos = true;
return Status::OK();
Expand Down Expand Up @@ -461,6 +484,27 @@ Status VMetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_sc
return Status::OK();
}

Status VMetaScanner::_build_partition_values_metadata_request(
const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_partition_values_metadata_request";
if (!meta_scan_range.__isset.partition_values_params) {
return Status::InternalError(
"Can not find TPartitionValuesMetadataParams from meta_scan_range.");
}

// create request
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::PARTITION_VALUES);
metadata_table_params.__set_partition_values_metadata_params(
meta_scan_range.partition_values_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "VMetaScanner::close";
RETURN_IF_ERROR(VScanner::close(state));
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class VMetaScanner : public VScanner {
TFetchSchemaTableDataRequest* request);
Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_partition_values_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
bool _meta_eos;
TupleId _tuple_id;
TUserIdentity _user_identity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,12 +1267,12 @@ optScanParams

relationPrimary
: multipartIdentifier optScanParams? materializedViewName? tableSnapshot? specifiedPartition?
tabletList? tableAlias sample? relationHint? lateralView* #tableName
| LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery
tabletList? tableAlias sample? relationHint? lateralView* #tableName
| LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery
| tvfName=identifier LEFT_PAREN
(properties=propertyItemList)?
RIGHT_PAREN tableAlias #tableValuedFunction
| LEFT_PAREN relations RIGHT_PAREN #relationList
RIGHT_PAREN tableAlias #tableValuedFunction
| LEFT_PAREN relations RIGHT_PAREN #relationList
;

materializedViewName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.IndexSchemaProcNode;
import org.apache.doris.common.proc.ProcNodeInterface;
Expand All @@ -44,6 +45,7 @@
import org.apache.doris.qe.ShowResultSetMetaData;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -55,6 +57,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class DescribeStmt extends ShowStmt implements NotFallbackInParser {
Expand Down Expand Up @@ -123,6 +126,25 @@ public boolean isAllTables() {

@Override
public void analyze(Analyzer analyzer) throws UserException {
// First handle meta table.
// It will convert this to corresponding table valued functions
// eg: DESC table$partitions -> partition_values(...)
if (dbTableName != null) {
dbTableName.analyze(analyzer);
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(dbTableName.getCtl());
Pair<String, String> sourceTableNameWithMetaName = catalog.getSourceTableNameWithMetaTableName(
dbTableName.getTbl());
if (!Strings.isNullOrEmpty(sourceTableNameWithMetaName.second)) {
isTableValuedFunction = true;
Optional<TableValuedFunctionRef> optTvfRef = catalog.getMetaTableFunctionRef(
dbTableName.getDb(), dbTableName.getTbl());
if (!optTvfRef.isPresent()) {
throw new AnalysisException("meta table not found: " + sourceTableNameWithMetaName.second);
}
tableValuedFunctionRef = optTvfRef.get();
}
}

if (!isAllTables && isTableValuedFunction) {
tableValuedFunctionRef.analyze(analyzer);
List<Column> columns = tableValuedFunctionRef.getTable().getBaseSchema();
Expand All @@ -148,8 +170,6 @@ public void analyze(Analyzer analyzer) throws UserException {
}
}

dbTableName.analyze(analyzer);

if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbTableName, PrivPredicate.SHOW)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "DESCRIBE",
Expand All @@ -159,8 +179,7 @@ public void analyze(Analyzer analyzer) throws UserException {

CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(dbTableName.getCtl());
DatabaseIf db = catalog.getDbOrAnalysisException(dbTableName.getDb());
TableIf table = db.getTableOrAnalysisException(dbTableName.getTbl());

TableIf table = db.getTableOrDdlException(dbTableName.getTbl());
table.readLock();
try {
if (!isAllTables) {
Expand Down Expand Up @@ -387,3 +406,4 @@ private static List<String> initEmptyRow() {
return emptyRow;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues;
import org.apache.doris.nereids.trees.expressions.functions.table.Partitions;
import org.apache.doris.nereids.trees.expressions.functions.table.Query;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
Expand Down Expand Up @@ -59,7 +60,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
tableValued(Partitions.class, "partitions"),
tableValued(Jobs.class, "jobs"),
tableValued(Tasks.class, "tasks"),
tableValued(Query.class, "query")
tableValued(Query.class, "query"),
tableValued(PartitionValues.class, "partition_values")
);

public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ default boolean tryReadLock(long timeout, TimeUnit unit) {
default void readUnlock() {
}

;

default void writeLock() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,39 @@ private static String formatDateStr(String dateStr) {
parts.length > 3 ? String.format(" %02d:%02d:%02d", Integer.parseInt(parts[3]),
Integer.parseInt(parts[4]), Integer.parseInt(parts[5])) : "");
}


// Refer to be/src/vec/runtime/vdatetime_value.h
public static long convertToDateTimeV2(
int year, int month, int day, int hour, int minute, int second, int microsecond) {
return (long) microsecond | (long) second << 20 | (long) minute << 26 | (long) hour << 32
| (long) day << 37 | (long) month << 42 | (long) year << 46;
}

// Refer to be/src/vec/runtime/vdatetime_value.h
public static long convertToDateV2(
int year, int month, int day) {
return (long) day | (long) month << 5 | (long) year << 9;
}

public static long convertStringToDateTimeV2(String dateTimeStr, int scale) {
String format = "yyyy-MM-dd HH:mm:ss";
if (scale > 0) {
format += ".";
for (int i = 0; i < scale; i++) {
format += "S";
}
}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
LocalDateTime dateTime = TimeUtils.formatDateTimeAndFullZero(dateTimeStr, formatter);
return convertToDateTimeV2(dateTime.getYear(), dateTime.getMonthValue(), dateTime.getDayOfMonth(),
dateTime.getHour(), dateTime.getMinute(), dateTime.getSecond(), dateTime.getNano() / 1000);
}

public static long convertStringToDateV2(String dateStr) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDateTime dateTime = TimeUtils.formatDateTimeAndFullZero(dateStr, formatter);
return convertToDateV2(dateTime.getYear(), dateTime.getMonthValue(), dateTime.getDayOfMonth());
}
}

Loading
Loading