Skip to content

Commit

Permalink
Merge branch 'master' into fix-var-inv
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored Aug 28, 2024
2 parents 9c29378 + d09a39b commit 56eaf8a
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 42 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _parse_variant_columns(vectorized::Block& block);
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body
return Status::OK();
}

Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
opts.sanity_check();
opts.stats->total_pages_num++;

Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/rowset/segment_v2/page_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,17 @@ class PageIO {
// `handle' holds the memory of page data,
// `body' points to page body,
// `footer' stores the page footer.
// This method is exception safe, it will failed when allocate memory failed.
static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
Slice* body, PageFooterPB* footer) {
RETURN_IF_CATCH_EXCEPTION(
{ return read_and_decompress_page_(opts, handle, body, footer); });
}

private:
// An internal method that not deal with exception.
static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
};

} // namespace segment_v2
Expand Down
3 changes: 2 additions & 1 deletion be/src/util/faststring.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class faststring : private Allocator<false, false, false, DefaultMemoryAllocator
OwnedSlice build() {
uint8_t* ret = data_;
if (ret == initial_data_) {
ret = reinterpret_cast<uint8_t*>(Allocator::alloc(len_));
ret = reinterpret_cast<uint8_t*>(Allocator::alloc(capacity_));
DCHECK(len_ <= capacity_);
memcpy(ret, data_, len_);
}
OwnedSlice result(ret, len_, capacity_);
Expand Down
7 changes: 6 additions & 1 deletion be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,12 @@ class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator
OwnedSlice(const OwnedSlice&) = delete;
void operator=(const OwnedSlice&) = delete;

~OwnedSlice() { Allocator::free(_slice.data, _capacity); }
~OwnedSlice() {
if (_slice.data != nullptr) {
DCHECK(_capacity != 0);
Allocator::free(_slice.data, _capacity);
}
}

const Slice& slice() const { return _slice; }

Expand Down
14 changes: 4 additions & 10 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,16 +530,10 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,

Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseConfig& config) {
try {
// Parse each variant column from raw string column
RETURN_IF_ERROR(
vectorized::schema_util::_parse_variant_columns(block, variant_pos, config));
} catch (const doris::Exception& e) {
// TODO more graceful, max_filter_ratio
LOG(WARNING) << "encounter execption " << e.to_string();
return Status::InternalError(e.to_string());
}
return Status::OK();
// Parse each variant column from raw string column
RETURN_IF_CATCH_EXCEPTION({
return vectorized::schema_util::_parse_variant_columns(block, variant_pos, config);
});
}

Status encode_variant_sparse_subcolumns(ColumnObject& column) {
Expand Down
9 changes: 4 additions & 5 deletions be/test/io/fs/s3_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,10 +944,9 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_2) {
sp->set_call_back("S3FileWriter::_complete:2", [](auto&& outcome) {
// Deliberately make one upload one part task fail to test if s3 file writer could
// handle io error
auto* parts = try_any_cast<std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>(
outcome.back());
auto* parts = try_any_cast<std::vector<io::ObjectCompleteMultiPart>*>(outcome.back());
size_t size = parts->size();
parts->back()->SetPartNumber(size + 2);
parts->back().part_num = (size + 2);
});
Defer defer {[&]() { sp->clear_call_back("S3FileWriter::_complete:2"); }};
auto client = s3_fs->client_holder();
Expand Down Expand Up @@ -992,8 +991,8 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_1) {
sp->set_call_back("S3FileWriter::_complete:1", [](auto&& outcome) {
// Deliberately make one upload one part task fail to test if s3 file writer could
// handle io error
const auto& points = try_any_cast<const std::pair<
std::atomic_bool*, std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>&>(
const auto& points = try_any_cast<
const std::pair<std::atomic_bool*, std::vector<io::ObjectCompleteMultiPart>*>&>(
outcome.back());
(*points.first) = false;
points.second->pop_back();
Expand Down
6 changes: 5 additions & 1 deletion build-for-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ cp -R "${ORI_OUTPUT}"/apache_hdfs_broker "${OUTPUT_EXT}"/apache_hdfs_broker
cp -R "${ORI_OUTPUT}"/be/* "${OUTPUT_BE}"/

# CLOUD
cp -R "${ORI_OUTPUT}"/ms/* "${OUTPUT_CLOUD}"/
if [[ "${ARCH}" == "arm64" ]]; then
echo "WARNING: Cloud module is not supported on ARM platform, will skip building it."
else
cp -R "${ORI_OUTPUT}"/ms/* "${OUTPUT_CLOUD}"/
fi

if [[ "${TAR}" -eq 1 ]]; then
echo "Begin to compress"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ addRollupClause

alterTableClause
: ADD COLUMN columnDef columnPosition? toRollup? properties=propertyClause? #addColumnClause
| ADD COLUMN LEFT_PAREN columnDef (COMMA columnDef) RIGHT_PAREN
| ADD COLUMN LEFT_PAREN columnDef (COMMA columnDef)* RIGHT_PAREN
toRollup? properties=propertyClause? #addColumnsClause
| DROP COLUMN name=identifier fromRollup? properties=propertyClause? #dropColumnClause
| MODIFY COLUMN columnDef columnPosition? fromRollup?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.analysis.LargeIntLiteral;
import org.apache.doris.analysis.LikePredicate;
import org.apache.doris.analysis.LikePredicate.Operator;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.thrift.TExprOpcode;
Expand Down Expand Up @@ -127,9 +128,24 @@ public static QueryBuilder toEsDsl(Expr expr) {
.build());
}

private static QueryBuilder parseBinaryPredicate(Expr expr, TExprOpcode opCode, String column,
private static TExprOpcode flipOpCode(TExprOpcode opCode) {
switch (opCode) {
case GE:
return TExprOpcode.LE;
case GT:
return TExprOpcode.LT;
case LE:
return TExprOpcode.GE;
case LT:
return TExprOpcode.GT;
default:
return opCode;
}
}

private static QueryBuilder parseBinaryPredicate(LiteralExpr expr, TExprOpcode opCode, String column,
boolean needDateCompat) {
Object value = toDorisLiteral(expr.getChild(1));
Object value = toDorisLiteral(expr);
if (needDateCompat) {
value = compatDefaultDate(value);
}
Expand Down Expand Up @@ -223,6 +239,20 @@ private static QueryBuilder parseFunctionCallExpr(Expr expr) {
return new QueryBuilders.EsQueryBuilder(stringValue);
}

private static String getColumnFromExpr(Expr expr) {
// Type transformed cast can not pushdown
if (expr instanceof CastExpr) {
Expr withoutCastExpr = exprWithoutCast(expr);
if (withoutCastExpr.getType().equals(expr.getType())
|| (withoutCastExpr.getType().isFloatingPointType() && expr.getType().isFloatingPointType())) {
return ((SlotRef) withoutCastExpr).getColumnName();
}
} else if (expr instanceof SlotRef) {
return ((SlotRef) expr).getColumnName();
}
return null;
}

/**
* Doris expr to es dsl.
**/
Expand All @@ -241,32 +271,43 @@ public static QueryBuilder toEsDsl(Expr expr, List<Expr> notPushDownList, Map<St
return toCompoundEsDsl(expr, notPushDownList, fieldsContext, builderOptions);
}
TExprOpcode opCode = expr.getOpcode();
String column;
boolean isFlip = false;
Expr leftExpr = expr.getChild(0);
// Type transformed cast can not pushdown
if (leftExpr instanceof CastExpr) {
Expr withoutCastExpr = exprWithoutCast(leftExpr);
// pushdown col(float) >= 3
if (withoutCastExpr.getType().equals(leftExpr.getType()) || (withoutCastExpr.getType().isFloatingPointType()
&& leftExpr.getType().isFloatingPointType())) {
column = ((SlotRef) withoutCastExpr).getColumnName();
} else {
notPushDownList.add(expr);
return null;
}
} else if (leftExpr instanceof SlotRef) {
column = ((SlotRef) leftExpr).getColumnName();
} else {
String column = getColumnFromExpr(leftExpr);

if (StringUtils.isEmpty(column)) {
Expr rightExpr = expr.getChild(1);
column = getColumnFromExpr(rightExpr);
opCode = flipOpCode(opCode);
isFlip = true;
}

if (StringUtils.isEmpty(column)) {
notPushDownList.add(expr);
return null;
}

// Check whether the date type need compat, it must before keyword replace.
List<String> needCompatDateFields = builderOptions.getNeedCompatDateFields();
boolean needDateCompat = needCompatDateFields != null && needCompatDateFields.contains(column);
// Replace col with col.keyword if mapping exist.
column = fieldsContext.getOrDefault(column, column);
if (expr instanceof BinaryPredicate) {
return parseBinaryPredicate(expr, opCode, column, needDateCompat);
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
Expr value;
if (isFlip) {
value = binaryPredicate.getChild(0);
} else {
value = binaryPredicate.getChild(1);
}
// only push down literal expr to ES
if (value instanceof LiteralExpr) {
LiteralExpr literalExpr = (LiteralExpr) value;
return parseBinaryPredicate(literalExpr, opCode, column, needDateCompat);
} else {
notPushDownList.add(expr);
return null;
}
}
if (expr instanceof IsNullPredicate) {
return parseIsNullPredicate(expr, column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,33 @@ public void testCastConvertEsDsl() {
new FloatLiteral(3.0, Type.DOUBLE));
QueryBuilders.toEsDsl(castDoublePredicate, notPushDownList, fieldsContext, builderOptions);
Assertions.assertEquals(3, notPushDownList.size());

SlotRef k4 = new SlotRef(null, "k4");
k4.setType(Type.FLOAT);
CastExpr castFloatExpr = new CastExpr(Type.FLOAT, k4);
BinaryPredicate castFloatPredicate = new BinaryPredicate(Operator.GE, new FloatLiteral(3.0, Type.FLOAT),
castFloatExpr);
QueryBuilders.QueryBuilder queryBuilder = QueryBuilders.toEsDsl(castFloatPredicate, notPushDownList, fieldsContext, builderOptions);
Assertions.assertEquals("{\"range\":{\"k4\":{\"lte\":3.0}}}", queryBuilder.toJson());
Assertions.assertEquals(3, notPushDownList.size());

castFloatPredicate = new BinaryPredicate(Operator.LE, new FloatLiteral(3.0, Type.FLOAT),
castFloatExpr);
queryBuilder = QueryBuilders.toEsDsl(castFloatPredicate, notPushDownList, fieldsContext, builderOptions);
Assertions.assertEquals("{\"range\":{\"k4\":{\"gte\":3.0}}}", queryBuilder.toJson());
Assertions.assertEquals(3, notPushDownList.size());

castFloatPredicate = new BinaryPredicate(Operator.LT, new FloatLiteral(3.0, Type.FLOAT),
castFloatExpr);
queryBuilder = QueryBuilders.toEsDsl(castFloatPredicate, notPushDownList, fieldsContext, builderOptions);
Assertions.assertEquals("{\"range\":{\"k4\":{\"gt\":3.0}}}", queryBuilder.toJson());
Assertions.assertEquals(3, notPushDownList.size());

castFloatPredicate = new BinaryPredicate(Operator.GT, new FloatLiteral(3.0, Type.FLOAT),
castFloatExpr);
queryBuilder = QueryBuilders.toEsDsl(castFloatPredicate, notPushDownList, fieldsContext, builderOptions);
Assertions.assertEquals("{\"range\":{\"k4\":{\"lt\":3.0}}}", queryBuilder.toJson());
Assertions.assertEquals(3, notPushDownList.size());
}


Expand Down
49 changes: 49 additions & 0 deletions regression-test/suites/nereids_syntax_p0/ddl/add_column.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("add_column") {
def tableName ="test"

sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'

sql """
drop table if exists test
"""

sql """
CREATE TABLE IF NOT EXISTS `test` (
`id` bigint(20) NOT NULL,
`count` bigint(20) NOT NULL,
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 5
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
"""

sql """
ALTER TABLE `test`
ADD COLUMN (`cost` VARCHAR(256) DEFAULT "add");
"""
}

0 comments on commit 56eaf8a

Please sign in to comment.