Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pick](nereids) runtime filter with probe expr should be pushed though set operator #33010 for Branch-2.1 #33782

Closed
wants to merge 102 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
dcd6909
[Fix](inverted index) fix fast execute problem when need read data op…
airborne12 Apr 11, 2024
1d7f367
[refactor](Nereids): compute unique and uniform property respectively…
keanji-x Apr 11, 2024
6545c83
[fix](mtmv)add logs for mv_infos() (#33485)
zddr Apr 11, 2024
0423419
[fix](nereids) do not transpose semi join agg when mark join (#32475)
iwanttobepowerful Apr 11, 2024
844e34e
(Chore)[regression-test] fix unstable output variant case (#33520)
eldenmoon Apr 11, 2024
12358e8
[opt](log) refactor the log dir config (#32933)
morningman Mar 29, 2024
441704a
[fix](memory) Fix compaction destructor memory tracking #33549 (#33572)
xinyiZzz Apr 12, 2024
882e23f
[opt](Nereids) support set operation minus (#33582)
morrySnow Apr 12, 2024
dab4c6e
[Improvementation](stream-load) improve streamLoadPut log warning det…
BiteTheDDDDt Apr 11, 2024
91b7506
[Fix](inverted index) fix build index error status when batch_next (#…
airborne12 Apr 11, 2024
cf03e69
[bug](not in) if not in (null) could eos early (#33482)
zhangstar333 Apr 11, 2024
f0c60d7
[improve](fold) support complex type for constant folding (#32867)
zhangstar333 Apr 11, 2024
4bacd20
[Chore](code comment) add comment for #32999 (#33548)
eldenmoon Apr 11, 2024
9681f34
[FIX](regresstest) fix array_range case for regress test (#33337)
amorynan Apr 12, 2024
4b40323
[refactor](heap sort) Simplify sorted block view (#33477)
Gabriel39 Apr 12, 2024
e837013
[pipelineX](fix) Fix data pooling judgement for bucket join (#33533)
Gabriel39 Apr 12, 2024
34fafca
[Feature](Variant) support aggregation model for Variant type (#33493)
eldenmoon Apr 12, 2024
3a74fdb
[fix](fs) Close local file writer when downloading finished (#33556)
w41ter Apr 12, 2024
296421d
[chore](Nereids) better way to return error (#33507)
morrySnow Apr 12, 2024
0dd9d8f
[feat](nereids) support create view in nereids (#32743)
feiniaofeiafei Apr 12, 2024
c5b7a8c
[opt](Nereids) support select async mv partition (#33560)
morrySnow Apr 12, 2024
29f5640
[chore](test) update one join case for nereids because ambiguous name…
morrySnow Apr 12, 2024
7b8a1e7
[fix](routine-load) fix consumer hang when kafka exception causing ca…
sollhui Apr 12, 2024
d383884
[fix](timeout) query timeout was not correctly set (#33444)
Mryange Apr 12, 2024
474aa81
[feature](profile) add non-zero counter in profile(#33342)
Mryange Apr 12, 2024
585aeb5
[Fix] fix compile problem (#33578)
feiniaofeiafei Apr 12, 2024
90c69b0
[Fix](Nereids) fix leading hint should have all tables in one query b…
LiBinfeng-01 Apr 12, 2024
76502fd
[pipelineX](broadcast) Set dependency ready if a limited exchange ret…
Gabriel39 Apr 12, 2024
7ea5e1d
[fix](schema change) CastStringConverter is compiled failed in g++ (#…
AshinGau Apr 12, 2024
cd9eb6d
[test](bi) add dbeaver and datagrip connect doris test (#33487)
zy-kkk Apr 12, 2024
add2805
[k8s](improve)add docker resource script for k8s (#33329)
LemonLiTree Apr 12, 2024
77b1cfe
[enhancement](merge-iterator) catch exception to avoid coredump when …
liaoxin01 Apr 12, 2024
3ea6587
[fix](feservice) remove connect context (#33583)
mymeiyi Apr 12, 2024
bdd5440
[neride][foldconstant] disable fold constant by be
Doris-Extras Apr 13, 2024
24c3294
[feat](nereids) add session var to turn on/off common sub expressoin…
englefly Apr 13, 2024
6c2e684
[testcases](auto-partition) Add and fix testcases in P0 #33588
zclllyybb Apr 12, 2024
af556cf
[Fix](timezone) fix miss of expected rounding of Date type with timez…
zclllyybb Apr 12, 2024
3e0082c
[feature](backup) ignore table that not support type when backup, and…
nextdreamblue Apr 13, 2024
2dfe8cf
[Fix](executor)reset remote scan thread num #33579
wangbo Apr 13, 2024
ecc3c60
[case](regression) Add backup restore test for hdfs repo (#33581)
w41ter Apr 13, 2024
3aa3604
[bugfix](feoom) add timeout to send be request to avoid too long to o…
yiguolei Apr 13, 2024
9c48618
[enhancement](mysql compatible) add user and procs_priv tables to mys…
zy-kkk Apr 2, 2024
12ee922
[enhancement](plsql) Support select * from routines (#32866)
Vallishp Mar 28, 2024
2ae3313
[test](mtmv)Add column name case sensitive test (#33538)
zfr9527 Apr 13, 2024
3cebd37
fix compile
Doris-Extras Apr 13, 2024
96ed07c
[fix](test) fix some p2 external table test cases (#33624)
morningman Apr 13, 2024
0e5226e
[fix](Nereids) create view should forward to master (#33626)
morrySnow Apr 14, 2024
565de9d
[fix](create table) Fix create table exception without cleaning the e…
deardeng Apr 14, 2024
80f25cb
[fix](schema change) follow fe set sc fail replicas as bad (#33569)
yujun777 Apr 14, 2024
9154b54
[fix](merge-iterator) Fix mem leak when get next batch failed (#33627)
liaoxin01 Apr 14, 2024
6ce61eb
[Chore](status) change unknow filter error to internal error (#33633)
BiteTheDDDDt Apr 14, 2024
00e082f
remove is cloud mode
Doris-Extras Apr 14, 2024
1832dc2
[fix](test) fix some unstable p2 test cases (#33637) (#33655)
morningman Apr 15, 2024
4b3339a
[opt](scan) read scan ranges in the order of partitions (#33515) (#33…
AshinGau Apr 15, 2024
894f9d5
[enhancement](merge-on-write) compaction should not check correctness…
zhannngchen Apr 15, 2024
54c3851
[branch-2.1](cherry-pick) Pick some partial-update PR from master (#3…
Yukang-Lian Apr 15, 2024
0f1adc9
[fix](merge-on-write) schema change may cause mow duplicate key (#335…
zhannngchen Apr 16, 2024
b5c39c4
[Bug](runtime-filter) make need_local_merge unrelated with broadcast …
BiteTheDDDDt Apr 16, 2024
c7d93e6
[runtime filter](fix) Fix wrong results caused by IN_OR_BLOOM filter …
Gabriel39 Apr 16, 2024
aaba87d
[improvement](binlog)Support inverted index format v2 in CCR (#33415)
qidaye Apr 15, 2024
669bdec
[improve](move-memtable) add more info in LoadStreamStub errors (#33618)
kaijchen Apr 15, 2024
8cfbc0e
[feature](Nereids): date literal suppose Zone (#33534)
jackwener Apr 15, 2024
a5c7e76
[docker](script)add --grace to be_prestop.sh (#33599)
LemonLiTree Apr 15, 2024
bd9d1e7
[fix](nereids) ExtractAndNormalizeWindowExpression should only normal…
starocean999 Apr 15, 2024
bc69024
[fix](planner) fix bug of InlineViewRef's tableNameToSql method (#33575)
starocean999 Apr 15, 2024
c7e9927
[refine](Operator) When _stop_emplace_flag is not set to true, perfor…
Mryange Apr 15, 2024
973d94e
[opt](Nereids) prefer slot type to support delete task better (#33559)
morrySnow Apr 15, 2024
5b0d357
[fix](catalog) Remove unexpected cleanup when reading jdbc data (#33529)
zy-kkk Apr 15, 2024
3a75f02
[feature](inverted index) add slop functionality to match_phrase (#33…
zzzxl1993 Apr 15, 2024
7522799
[testcases](auto-partition) fix data sync (#33635)
zclllyybb Apr 15, 2024
486b450
[fix](nereids)EliminateGroupBy should keep the output's datatype same…
starocean999 Apr 15, 2024
75f2ef7
[Chore](log) adjust output order on PrintInstanceStandardInfo and red…
BiteTheDDDDt Apr 15, 2024
5f6eaac
[fix](routine-load) fix get kafka offset timeout may too long (#33502)
sollhui Apr 15, 2024
0a31a3c
[refactor](pipelineX) Reduce prepare overhead (PART I) (#33550)
Gabriel39 Apr 15, 2024
db4cb33
[refactor](refresh-catalog) refactor the refresh catalog code (#33653)
morningman Apr 15, 2024
684c987
[fix](nereids) Use correct PREAGGREGATION in agg(filter(scan)) (#33454)
liutang123 Apr 15, 2024
3a486df
[case](regression) Add backup temp partition case (#33646)
w41ter Apr 15, 2024
491d6f3
[fix](fe) Fix finalizeCommand `sendAndFlush` NullPointerException (#3…
SWJTU-ZhangLei Apr 15, 2024
7436904
[Enhencement](Nereids) add rule of agg(case when) to agg(filter) (#33…
LiBinfeng-01 Apr 16, 2024
596d8df
[Update](cloud) skip show data size assertion in cloud mode (#33677)
airborne12 Apr 16, 2024
ceedc26
[improvement](mow) Add profile for delete_bitmap get_agg function (#3…
hust-hhb Apr 16, 2024
8ff4d55
[refactor](pipelineX) Reduce prepare overhead (PART II) (#33681)
Gabriel39 Apr 16, 2024
d3752d2
[fix](testcase) fix miss used global variables in index testcases (#3…
xiaokang Apr 16, 2024
a374dfa
[fix](routine-load) fix data lost when FE leader change (#33678)
sollhui Apr 16, 2024
6f38426
[feature](proc)Add table's indexes info in show proc interface (#33438)
qidaye Apr 16, 2024
de209c1
[fix](analyze) avoid java.util.ConcurrentModificationException (#33674)
morningman Apr 16, 2024
06494d2
[minor](Nereids): remove useless override (#33651)
jackwener Apr 16, 2024
24edd6e
[opt](Nereids) date literal support basic format with timezone (#33662)
morrySnow Apr 16, 2024
0aad1f2
[FIX](cast)fix full/right out join for cast array (#33475)
amorynan Apr 16, 2024
c753b56
[fix](nereids)InSubquery's withChildren method lost typeCoercionExpr …
starocean999 Apr 16, 2024
dd98f98
[improvement](spill) improve config and fix spill bugs (#33519)
jacktengg Apr 16, 2024
d00ca28
[feature](agg) support aggregate function group_array_intersect (#33265)
superdiaodiao Apr 16, 2024
8640246
[feature](function) support hll functions hll_from_base64, hll_to_bas…
superdiaodiao Apr 16, 2024
775ffdf
[docker](hive) add hive3 docker compose and modify scripts (#33115)
suxiaogang223 Apr 16, 2024
71a1866
[fix](Nereids) could not query variant that not from table (#33704)
morrySnow Apr 16, 2024
5e30989
[opt](Nereids) auto fallback when meet udf override (#33708)
morrySnow Apr 16, 2024
2181a29
(Fix)(nereids) modify create view privilege check error message (#33669)
feiniaofeiafei Apr 16, 2024
09a0c07
[improve](CI)Core modules require maintainer review (#32468)
caoliang-web Apr 16, 2024
ec2bf8b
fix compile
Doris-Extras Apr 16, 2024
7a04785
[opt](inverted index) topn opt reads only limit number of records (#3…
zzzxl1993 Apr 17, 2024
e61848d
[opt](meta-cache) refine the meta cache (#33449) (#33754)
morningman Apr 17, 2024
2fcda99
[fix](nereids) runtime filter with probe expr should be pushed thoug…
englefly Apr 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
51 changes: 17 additions & 34 deletions .github/workflows/pr-approve-status.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,25 @@ name: Need_2_Approval

on:
pull_request_review:
types: [submitted]
types: [ submitted ]

jobs:
Need_2_Approval:
runs-on: ubuntu-latest
timeout-minutes: 3
timeout-minutes: 5
steps:
- uses: actions/checkout@v3
- run: |
pr_num=${{ github.event.pull_request.number }}
echo $pr_num
if [ -z "$pr_num" ]; then
echo "PR number is not set"
exit 1
fi
response=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }} " "https://api.github.com/repos/apache/doris/pulls/${pr_num}/reviews?per_page=100")
# shellcheck disable=SC2207
reviewers=($(echo $response | jq -r '.[] | .user.login'))
# shellcheck disable=SC2207
statuses=($(echo $response | jq -r '.[] | .state'))
echo "${reviewers[@]}"
echo "${statuses[@]}"
approves=()
reviewers_unique=()
for ((i=${#reviewers[@]}-1;i>=0;i--)); do
if ! echo "${reviewers_unique[@]}" | grep -q -w "${reviewers[$i]}" && [ "${statuses[$i]}" != "COMMENTED" ]; then
reviewers_unique+=( "${reviewers[$i]}" )
if [ "${statuses[$i]}" == "APPROVED" ]; then
approves+=( "${reviewers[$i]}" )
fi
fi
done
echo "${approves[@]}"
if [ ${#approves[@]} -lt 2 ]; then
echo "PR ${pr_num} has not been approved by at least 2 reviewers"
# shellcheck disable=SC2242
exit 1
fi
echo "Thanks for your contribution to Doris."
exit 0
- name: Install Python dependencies
uses: actions/setup-python@v5
with:
python-version: '3.10' # Adjust if needed
- name: Install match library
run: |
pip install --upgrade pip
pip install match
pip install requests
- name: Run Python script
run: |
python tools/maintainers/check_review.py ${{ github.event.pull_request.number }} ${{secrets.GITHUB_TOKEN}}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
10 changes: 4 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,8 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");

// log dir
DEFINE_String(sys_log_dir, "${DORIS_HOME}/log");
DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
DEFINE_String(pipeline_tracing_log_dir, "${DORIS_HOME}/log/tracing");
// INFO, WARNING, ERROR, FATAL
DEFINE_mString(sys_log_level, "INFO");
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
Expand Down Expand Up @@ -1165,13 +1163,13 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
DEFINE_Int32(partition_disk_index_lru_size, "10000");
// limit the storage space that query spill files can use
DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
DEFINE_mInt64(spill_storage_limit, "10737418240"); // 10G
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_file_count, "2000");
DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
DEFINE_mInt32(spill_mem_warning_water_mark_multiplier, "2");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

Expand Down
15 changes: 11 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ DECLARE_mInt32(download_low_speed_limit_kbps);
// download low speed time(seconds)
DECLARE_mInt32(download_low_speed_time);

// log dir
// deprecated, use env var LOG_DIR in be.conf
DECLARE_String(sys_log_dir);
// for udf
DECLARE_String(user_function_dir);
DECLARE_String(pipeline_tracing_log_dir);
// INFO, WARNING, ERROR, FATAL
DECLARE_String(sys_log_level);
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
Expand Down Expand Up @@ -1238,13 +1238,20 @@ DECLARE_mDouble(high_disk_avail_level_diff_usages);
// create tablet in partition random robin idx lru size, default 10000
DECLARE_Int32(partition_disk_index_lru_size);
DECLARE_String(spill_storage_root_path);
DECLARE_mInt64(spill_storage_limit);
// Spill storage limit specified as number of bytes
// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
// or percentage of capaity ('<int>%').
// Defaults to bytes if no unit is given.
// Must larger than 0.
// If specified as percentage, the final limit value is:
// disk_capacity_bytes * storage_flood_stage_usage_percent * spill_storage_limit
DECLARE_String(spill_storage_limit);
DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_file_count);
DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int32(spill_async_task_thread_pool_thread_num);
DECLARE_Int32(spill_async_task_thread_pool_queue_size);
DECLARE_mInt32(spill_mem_warning_water_mark_multiplier);

DECLARE_mBool(check_segment_when_build_rowset_meta);

Expand Down
7 changes: 6 additions & 1 deletion be/src/common/logconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ bool init_glog(const char* basename) {
// so fatal log can output to be.out .
FLAGS_stderrthreshold = google::FATAL;
// set glog log dir
FLAGS_log_dir = config::sys_log_dir;
// ATTN: sys_log_dir is deprecated, this is just for compatibility
std::string log_dir = config::sys_log_dir;
if (log_dir == "") {
log_dir = getenv("LOG_DIR");
}
FLAGS_log_dir = log_dir;
// 0 means buffer INFO only
FLAGS_logbuflevel = 0;
// buffer log messages for at most this many seconds
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
#include "exec/schema_scanner/schema_partitions_scanner.h"
#include "exec/schema_scanner/schema_processlist_scanner.h"
#include "exec/schema_scanner/schema_profiling_scanner.h"
#include "exec/schema_scanner/schema_routine_scanner.h"
#include "exec/schema_scanner/schema_rowsets_scanner.h"
#include "exec/schema_scanner/schema_schema_privileges_scanner.h"
#include "exec/schema_scanner/schema_schemata_scanner.h"
#include "exec/schema_scanner/schema_table_privileges_scanner.h"
#include "exec/schema_scanner/schema_tables_scanner.h"
#include "exec/schema_scanner/schema_user_privileges_scanner.h"
#include "exec/schema_scanner/schema_user_scanner.h"
#include "exec/schema_scanner/schema_variables_scanner.h"
#include "exec/schema_scanner/schema_views_scanner.h"
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
Expand Down Expand Up @@ -161,6 +163,10 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaWorkloadGroupsScanner::create_unique();
case TSchemaTableType::SCH_PROCESSLIST:
return SchemaProcessListScanner::create_unique();
case TSchemaTableType::SCH_PROCEDURES:
return SchemaRoutinesScanner::create_unique();
case TSchemaTableType::SCH_USER:
return SchemaUserScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
10 changes: 10 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class TShowVariableRequest;
class TShowVariableResult;
class TShowProcessListRequest;
class TShowProcessListResult;
class TShowUserRequest;
class TShowUserResult;

Status SchemaHelper::get_db_names(const std::string& ip, const int32_t port,
const TGetDbsParams& request, TGetDbsResult* result) {
Expand Down Expand Up @@ -134,4 +136,12 @@ Status SchemaHelper::show_process_list(const std::string& ip, const int32_t port
});
}

Status SchemaHelper::show_user(const std::string& ip, const int32_t port,
const TShowUserRequest& request, TShowUserResult* result) {
return ThriftRpcHelper::rpc<FrontendServiceClient>(
ip, port, [&request, &result](FrontendServiceConnection& client) {
client->showUser(*result, request);
});
}

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class TShowVariableRequest;
class TShowVariableResult;
class TShowProcessListRequest;
class TShowProcessListResult;
class TShowUserRequest;
class TShowUserResult;

// this class is a helper for getting schema info from FE
class SchemaHelper {
Expand Down Expand Up @@ -82,6 +84,8 @@ class SchemaHelper {
static Status show_process_list(const std::string& ip, const int32_t port,
const TShowProcessListRequest& request,
TShowProcessListResult* result);
static Status show_user(const std::string& ip, const int32_t port,
const TShowUserRequest& request, TShowUserResult* result);
};

} // namespace doris
172 changes: 172 additions & 0 deletions be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/schema_scanner/schema_routine_scanner.h"

#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/thrift_rpc_helper.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
std::vector<SchemaScanner::ColumnDesc> SchemaRoutinesScanner::_s_tbls_columns = {
{"SPECIFIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
{"DTD_IDENTIFIER", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_BODY", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_DEFINITION", TYPE_VARCHAR, sizeof(StringRef), true},
{"EXTERNAL_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
{"EXTERNAL_LANGUAGE", TYPE_VARCHAR, sizeof(StringRef), true},
{"PARAMETER_STYLE", TYPE_VARCHAR, sizeof(StringRef), true},
{"IS_DETERMINISTIC", TYPE_VARCHAR, sizeof(StringRef), true},
{"SQL_DATA_ACCESS", TYPE_VARCHAR, sizeof(StringRef), true},
{"SQL_PATH", TYPE_VARCHAR, sizeof(StringRef), true},
{"SECURITY_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
{"CREATED", TYPE_DATETIME, sizeof(int64_t), true},
{"LAST_ALTERED", TYPE_DATETIME, sizeof(int64_t), true},
{"SQL_MODE", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_COMMENT", TYPE_VARCHAR, sizeof(StringRef), true},
{"DEFINER", TYPE_VARCHAR, sizeof(StringRef), true},
{"CHARACTER_SET_CLIENT", TYPE_VARCHAR, sizeof(StringRef), true},
{"COLLATION_CONNECTION", TYPE_VARCHAR, sizeof(StringRef), true},
{"DATABASE_COLLATION", TYPE_VARCHAR, sizeof(StringRef), true},
};

SchemaRoutinesScanner::SchemaRoutinesScanner()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PROCEDURES) {}

Status SchemaRoutinesScanner::start(RuntimeState* state) {
_block_rows_limit = state->batch_size();
_rpc_timeout = state->execution_timeout() * 1000;
return Status::OK();
}

Status SchemaRoutinesScanner::get_block_from_fe() {
TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
schema_table_request_params.__isset.columns_name = true;
schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name);
}
schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident);
TFetchSchemaTableDataRequest request;
request.__set_schema_table_name(TSchemaTableName::ROUTINES_INFO);
request.__set_schema_table_params(schema_table_request_params);
TFetchSchemaTableDataResult result;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->fetchSchemaTableData(result, request);
},
_rpc_timeout));
Status status(Status::create(result.status));
if (!status.ok()) {
LOG(WARNING) << "fetch routines from FE failed, errmsg=" << status;
return status;
}
std::vector<TRow> result_data = result.data_batch;
_routines_block = vectorized::Block::create_unique();
for (int i = 0; i < _s_tbls_columns.size(); ++i) {
TypeDescriptor descriptor(_s_tbls_columns[i].type);
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
_routines_block->insert(vectorized::ColumnWithTypeAndName(
data_type->create_column(), data_type, _s_tbls_columns[i].name));
}
_routines_block->reserve(_block_rows_limit);
if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("routine table schema is not match for FE and BE");
}
}
auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
str_val.size());
nullable_column->get_null_map_data().emplace_back(0);
};
auto insert_datetime_value = [&](int col_index, const std::vector<void*>& datas,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
auto data = datas[0];
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
nullable_column->get_null_map_data().emplace_back(0);
};

for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];

for (int j = 0; j < _s_tbls_columns.size(); j++) {
if (_s_tbls_columns[j].type == TYPE_DATETIME) {
std::vector<void*> datas(1);
VecDateTimeValue src[1];
src[0].from_date_str(row.column_value[j].stringVal.data(),
row.column_value[j].stringVal.size());
datas[0] = src;
insert_datetime_value(j, datas, _routines_block.get());
} else {
insert_string_value(j, row.column_value[j].stringVal, _routines_block.get());
}
}
}
return Status::OK();
}

Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}

if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

if (_routines_block == nullptr) {
RETURN_IF_ERROR(get_block_from_fe());
_total_rows = _routines_block->rows();
}

if (_row_idx == _total_rows) {
*eos = true;
return Status::OK();
}

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows);
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
return Status::OK();
}

} // namespace doris
Loading
Loading