Skip to content

Commit

Permalink
[feature](cloud) introduce copy into (#32759)
Browse files Browse the repository at this point in the history
Co-authored-by: deardeng <[email protected]>
Co-authored-by: meiyi <[email protected]>
  • Loading branch information
3 people authored Apr 24, 2024
1 parent 550ed2b commit 79890c1
Show file tree
Hide file tree
Showing 63 changed files with 11,548 additions and 200 deletions.
8 changes: 7 additions & 1 deletion be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,13 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
// Only check for load task. For query task, the non exist column will be filled "null".
// if actual column number in csv file is not equal to _file_slot_descs.size()
// then filter this line.
if (_split_values.size() != _file_slot_descs.size()) {
bool ignore_col = false;
ignore_col = _params.__isset.file_attributes &&
_params.file_attributes.__isset.ignore_csv_redundant_col &&
_params.file_attributes.ignore_csv_redundant_col;

if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) ||
(ignore_col && _split_values.size() < _file_slot_descs.size())) {
std::string cmp_str =
_split_values.size() > _file_slot_descs.size() ? "more than" : "less than";
RETURN_IF_ERROR(_state->append_error_msg_to_file(
Expand Down
34 changes: 34 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,40 @@ public static boolean isNotCloudMode() {
@ConfField
public static int cloud_txn_tablet_batch_size = 50;

/**
* Default number of waiting copy jobs for the whole cluster
*/
@ConfField(mutable = true)
public static int cluster_max_waiting_copy_jobs = 100;

/**
* Default number of max file num for per copy into job
*/
@ConfField(mutable = true)
public static int max_file_num_per_copy_into_job = 50;

/**
* Default number of max meta size for per copy into job
*/
@ConfField(mutable = true)
public static int max_meta_size_per_copy_into_job = 51200;

// 0 means no limit
@ConfField(mutable = true)
public static int cloud_max_copy_job_per_table = 10000;

@ConfField(mutable = true)
public static int cloud_filter_copy_file_num_limit = 100;

@ConfField(mutable = true, masterOnly = true)
public static boolean cloud_delete_loaded_internal_stage_files = false;

@ConfField(mutable = false)
public static int cloud_copy_txn_conflict_error_retry_num = 5;

@ConfField(mutable = false)
public static int cloud_copy_into_statement_submitter_threads_num = 64;

@ConfField
public static int drop_user_notify_ms_max_times = 86400;

Expand Down
99 changes: 97 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Version;
import org.apache.doris.cloud.analysis.UseCloudClusterStmt;
import org.apache.doris.cloud.proto.Cloud.StagePB;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.policy.PolicyTypeEnum;
Expand Down Expand Up @@ -605,6 +606,7 @@ terminal String
KW_SQL,
KW_SQL_BLOCK_RULE,
KW_STAGE,
KW_STAGES,
KW_START,
KW_STARTS,
KW_STATS,
Expand Down Expand Up @@ -714,7 +716,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
use_stmt, use_cloud_cluster_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, refresh_stmt, clean_stmt, analyze_stmt, show_mtmv_stmt, kill_analysis_job_stmt, insert_overwrite_stmt,
import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, refresh_stmt, clean_stmt, analyze_stmt, show_mtmv_stmt, kill_analysis_job_stmt, insert_overwrite_stmt, copy_stmt,
warm_up_stmt;

nonterminal FromClause opt_using_clause;
Expand All @@ -733,7 +735,7 @@ nonterminal ValueList value_clause;
// No return.
nonterminal describe_command, opt_full, opt_inner, opt_outer, from_or_in, keys_or_index, opt_storage, opt_wild_where,
charset, equal, transaction_characteristics, isolation_level,
transaction_access_mode, isolation_types;
transaction_access_mode, isolation_types, opt_where;

// String
nonterminal String user, opt_user, opt_using_charset;
Expand Down Expand Up @@ -996,6 +998,12 @@ nonterminal Long opt_auto_inc_init_value;
// workload policy/group
nonterminal String policy_condition_op, policy_condition_value;

// copy into
nonterminal CopyFromParam copy_from_param;
nonterminal String stage_name;
nonterminal StageAndPattern stage_and_pattern;
nonterminal List<Expr> copy_select_expr_list;

precedence nonassoc COMMA;
precedence nonassoc STRING_LITERAL;
precedence nonassoc KW_COLUMNS;
Expand Down Expand Up @@ -1266,6 +1274,8 @@ stmt ::=
{:
RESULT = new EmptyStmt();
:}
| copy_stmt:stmt
{: RESULT = stmt; :}
;

refresh_stmt ::=
Expand Down Expand Up @@ -2035,6 +2045,11 @@ create_stmt ::=
{:
RESULT = new AlterTableStmt(tableName, Lists.newArrayList(new BuildIndexClause(tableName, new IndexDef(indexName, partitionNames, true), false)));
:}
/* stage */
| KW_CREATE KW_STAGE opt_if_not_exists:ifNotExists ident:stageName KW_PROPERTIES opt_key_value_map:properties
{:
RESULT = new CreateStageStmt(ifNotExists, stageName, properties);
:}
;

channel_desc_list ::=
Expand Down Expand Up @@ -2383,6 +2398,62 @@ job_label ::=
:}
;

// Copy Statement
copy_stmt ::=
KW_COPY KW_INTO opt_select_hints:hints table_name:name opt_col_list:cols KW_FROM copy_from_param:copyFromParam KW_PROPERTIES opt_key_value_map:properties
{:
RESULT = new CopyStmt(name, cols, copyFromParam, properties, hints);
:}
| KW_COPY KW_INTO opt_select_hints:hints table_name:name opt_col_list:cols KW_FROM copy_from_param:copyFromParam
{:
RESULT = new CopyStmt(name, cols, copyFromParam, new HashMap(), hints);
:}
;

copy_from_param ::=
stage_and_pattern:stage
{:
RESULT = new CopyFromParam(stage);
:}
| LPAREN KW_SELECT copy_select_expr_list:exprList KW_FROM stage_and_pattern:stage where_clause:whereExpr RPAREN
{:
RESULT = new CopyFromParam(stage, exprList, whereExpr);
:}
;

stage_name ::=
AT ident:stage
{:
RESULT = stage;
:}
| AT BITNOT
{:
RESULT = "~";
:}
;

stage_and_pattern ::=
stage_name:name
{:
RESULT = new StageAndPattern(name, null);
:}
| stage_name:name LPAREN STRING_LITERAL:pattern RPAREN
{:
RESULT = new StageAndPattern(name, pattern);
:}
;

copy_select_expr_list ::=
{:
RESULT = null;
:}
| STAR {:
RESULT = null;
:}
| expr_list:exprList {:
RESULT = exprList;
:};

data_desc_list ::=
data_desc:desc
{:
Expand Down Expand Up @@ -3190,6 +3261,10 @@ drop_stmt ::=
{:
RESULT = new DropAnalyzeJobStmt(job_id);
:}
| KW_DROP KW_STAGE opt_if_exists:ifExists ident:stageName
{:
RESULT = new DropStageStmt(ifExists, stageName);
:}
;

// Recover statement
Expand Down Expand Up @@ -4035,6 +4110,11 @@ show_stmt ::=
{:
RESULT = new ShowPolicyStmt(PolicyTypeEnum.STORAGE, null, null);
:}
/* show stage */
| KW_SHOW KW_STAGES
{:
RESULT = new ShowStageStmt();
:}
| KW_SHOW KW_STORAGE KW_VAULT
{:
RESULT = new ShowStorageVaultStmt();
Expand Down Expand Up @@ -4554,6 +4634,11 @@ show_param ::=
{:
RESULT = new DiagnoseTabletStmt(tabletId);
:}
/* Show copy statement */
| KW_COPY opt_db:db opt_where order_by_clause:orderByClause limit_clause:limitClause
{:
RESULT = new ShowCopyStmt(db, parser.where, orderByClause, limitClause);
:}
| KW_WARM KW_UP KW_JOB opt_wild_where
{:
RESULT = new ShowCloudWarmUpStmt(parser.where);
Expand Down Expand Up @@ -4680,6 +4765,14 @@ opt_wild_where ::=
:}
;

opt_where ::=
/* empty */
| KW_WHERE expr:where
{:
parser.where = where;
:}
;

opt_id ::=
/* empty */
{:
Expand Down Expand Up @@ -8305,6 +8398,8 @@ keyword ::=
{: RESULT = id; :}
| KW_BELONG:id
{: RESULT = id; :}
| KW_STAGE:id
{: RESULT = id; :}
;

// Identifier that contain keyword
Expand Down
Loading

0 comments on commit 79890c1

Please sign in to comment.