From d574647a8b25e98560e81a347429edb4176234ac Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Sat, 23 Dec 2023 14:30:54 +0800 Subject: [PATCH] [feature](mtmv)MTMV pause and resume (#28887) - PAUSE MATERIALIZED VIEW JOB ON mv1 - RESUME MATERIALIZED VIEW JOB ON mv1 - fix when drop db,not drop job - add lock for one materialized view can only run one task at a time --- .../org/apache/doris/nereids/DorisParser.g4 | 2 + .../doris/datasource/InternalCatalog.java | 5 ++ .../doris/job/extensions/mtmv/MTMVJob.java | 21 ++++++ .../doris/job/extensions/mtmv/MTMVTask.java | 17 ++++- .../apache/doris/job/task/AbstractTask.java | 8 +++ .../apache/doris/mtmv/MTMVHookService.java | 17 ++++- .../org/apache/doris/mtmv/MTMVJobManager.java | 48 ++++++++----- .../doris/mtmv/MTMVRelationManager.java | 13 ++++ .../org/apache/doris/mtmv/MTMVService.java | 21 +++++- .../nereids/parser/LogicalPlanBuilder.java | 18 +++++ .../doris/nereids/trees/plans/PlanType.java | 2 + .../plans/commands/PauseMTMVCommand.java | 50 +++++++++++++ .../plans/commands/ResumeMTMVCommand.java | 50 +++++++++++++ .../plans/commands/info/PauseMTMVInfo.java | 72 +++++++++++++++++++ .../plans/commands/info/ResumeMTMVInfo.java | 72 +++++++++++++++++++ .../trees/plans/visitor/CommandVisitor.java | 10 +++ regression-test/data/mtmv_p0/test_db_mtmv.out | 7 ++ .../data/mtmv_p0/test_pause_mtmv.out | 10 +++ .../suites/mtmv_p0/test_db_mtmv.groovy | 52 ++++++++++++++ .../suites/mtmv_p0/test_pause_mtmv.groovy | 60 ++++++++++++++++ 20 files changed, 534 insertions(+), 21 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseMTMVCommand.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeMTMVCommand.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java create mode 100644 regression-test/data/mtmv_p0/test_db_mtmv.out create mode 100644 regression-test/data/mtmv_p0/test_pause_mtmv.out create mode 100644 regression-test/suites/mtmv_p0/test_db_mtmv.groovy create mode 100644 regression-test/suites/mtmv_p0/test_pause_mtmv.groovy diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index f8acaccb10fafb9..abef9ffa6d75552 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -99,6 +99,8 @@ statement | (REFRESH (refreshMethod | refreshTrigger | refreshMethod refreshTrigger)) | (SET LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN)) #alterMTMV | DROP MATERIALIZED VIEW (IF EXISTS)? mvName=multipartIdentifier #dropMTMV + | PAUSE MATERIALIZED VIEW JOB ON mvName=multipartIdentifier #pauseMTMV + | RESUME MATERIALIZED VIEW JOB ON mvName=multipartIdentifier #resumeMTMV | ALTER TABLE table=relation ADD CONSTRAINT constraintName=errorCapturingIdentifier constraint #addConstraint diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index a5bca6e46a29ed4..dea018b6425f513 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -519,6 +519,11 @@ public void dropDb(DropDbStmt stmt) throws DdlException { } } } + for (Table table : tableList) { + if (table.getType() == TableType.MATERIALIZED_VIEW) { + Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table); + } + } unprotectDropDb(db, stmt.isForceDrop(), false, 0); } finally { MetaLockUtils.writeUnlockTables(tableList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index 5ee9b43fe1f9858..c500e6932959d9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -47,9 +47,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class MTMVJob extends AbstractJob { private static final Logger LOG = LogManager.getLogger(MTMVJob.class); + private ReentrantReadWriteLock jobRwLock; + private static final ShowResultSetMetaData JOB_META_DATA = ShowResultSetMetaData.builder() .addColumn(new Column("JobId", ScalarType.createVarchar(20))) @@ -98,12 +101,14 @@ public class MTMVJob extends AbstractJob { private long mtmvId; public MTMVJob() { + jobRwLock = new ReentrantReadWriteLock(true); } public MTMVJob(long dbId, long mtmvId) { this.dbId = dbId; this.mtmvId = mtmvId; super.setCreateTimeMs(System.currentTimeMillis()); + jobRwLock = new ReentrantReadWriteLock(true); } @Override @@ -203,6 +208,22 @@ private MTMV getMTMV() throws DdlException, MetaNotFoundException { return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); } + public void readLock() { + this.jobRwLock.readLock().lock(); + } + + public void readUnlock() { + this.jobRwLock.readLock().unlock(); + } + + public void writeLock() { + this.jobRwLock.writeLock().lock(); + } + + public void writeUnlock() { + this.jobRwLock.writeLock().unlock(); + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 115d4eba303b939..ab9b6f9fa94211c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -220,6 +220,17 @@ public void before() throws JobException { } } + @Override + public void runTask() throws JobException { + MTMVJob job = (MTMVJob) getJobOrJobException(); + try { + job.writeLock(); + super.runTask(); + } finally { + job.writeUnlock(); + } + } + @Override public TRow getTvfInfo() { TRow trow = new TRow(); @@ -276,8 +287,10 @@ private TUniqueId generateQueryId() { } private void after() { - Env.getCurrentEnv() - .addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation); + if (mtmv != null) { + Env.getCurrentEnv() + .addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation); + } mtmv = null; relation = null; executor = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index f887961230f211c..7327183e95eda27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -135,4 +135,12 @@ public String getJobName() { return job == null ? "" : job.getJobName(); } + public Job getJobOrJobException() throws JobException { + AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId); + if (job == null) { + throw new JobException("job not exist, jobId:" + jobId); + } + return job; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java index 63ae15fef1ba71a..41bc506f5c0d62c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java @@ -21,8 +21,11 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; import org.apache.doris.persist.AlterMTMV; /** @@ -77,7 +80,7 @@ public interface MTMVHookService { * @throws DdlException * @throws MetaNotFoundException */ - void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException; + void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException; /** * triggered when mtmv task finish @@ -101,4 +104,16 @@ public interface MTMVHookService { * @param table */ void alterTable(Table table); + + /** + * Triggered when pause mtmv + * @param info + */ + void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException; + + /** + * Triggered when resume mtmv + * @param info + */ + void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index bdbc3231181d548..8cf225c59c92207 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -37,7 +37,10 @@ import org.apache.doris.job.extensions.mtmv.MTMVTaskContext; import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger; +import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.persist.AlterMTMV; import org.apache.doris.qe.ConnectContext; @@ -167,23 +170,11 @@ public void alterMTMV(MTMV mtmv, AlterMTMV alterMTMV) throws DdlException { * @throws MetaNotFoundException */ @Override - public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException { - Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getMvName().getDb()); - MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getMvName().getTbl(), TableType.MATERIALIZED_VIEW); - List jobs = Env.getCurrentEnv().getJobManager() - .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName()); - if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) { - throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size()); - } - try { - MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(), - info.isComplete()); - Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), mtmvTaskContext); - - } catch (JobException e) { - e.printStackTrace(); - throw new DdlException(e.getMessage()); - } + public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException { + MTMVJob job = getJobByTableNameInfo(info.getMvName()); + MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(), + info.isComplete()); + Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext); } @Override @@ -201,4 +192,27 @@ public void alterTable(Table table) { } + @Override + public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException { + MTMVJob job = getJobByTableNameInfo(info.getMvName()); + Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.PAUSED); + } + + @Override + public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException { + MTMVJob job = getJobByTableNameInfo(info.getMvName()); + Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.RUNNING); + } + + private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb()); + MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW); + List jobs = Env.getCurrentEnv().getJobManager() + .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName()); + if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) { + throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size()); + } + return jobs.get(0); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index 11da5104255fc33..7be43771e442f8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -24,9 +24,12 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.job.common.TaskStatus; +import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.persist.AlterMTMV; @@ -187,6 +190,16 @@ public void alterTable(Table table) { processBaseTableChange(table, "The base table has been updated:"); } + @Override + public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException { + + } + + @Override + public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException { + + } + private void processBaseTableChange(Table table, String msgPrefix) { BaseTableInfo baseTableInfo = new BaseTableInfo(table); Set mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 9530467dee75591..3ab05c92a463aaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -23,9 +23,12 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; import org.apache.doris.persist.AlterMTMV; import com.google.common.collect.Maps; @@ -108,7 +111,7 @@ public void alterMTMV(MTMV mtmv, AlterMTMV alterMTMV) throws DdlException { } } - public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException { + public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException { Objects.requireNonNull(info); LOG.info("refreshMTMV, RefreshMTMVInfo: {}", info); for (MTMVHookService mtmvHookService : hooks.values()) { @@ -140,4 +143,20 @@ public void refreshComplete(MTMV mtmv, MTMVRelation cache, MTMVTask task) { mtmvHookService.refreshComplete(mtmv, cache, task); } } + + public void pauseMTMV(PauseMTMVInfo info) throws DdlException, MetaNotFoundException, JobException { + Objects.requireNonNull(info); + LOG.info("pauseMTMV, PauseMTMVInfo: {}", info); + for (MTMVHookService mtmvHookService : hooks.values()) { + mtmvHookService.pauseMTMV(info); + } + } + + public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException { + Objects.requireNonNull(info); + LOG.info("resumeMTMV, ResumeMTMVInfo: {}", info); + for (MTMVHookService mtmvHookService : hooks.values()) { + mtmvHookService.resumeMTMV(info); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index d230fa85d9c5158..dcbfb9ef3f04e00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -123,6 +123,7 @@ import org.apache.doris.nereids.DorisParser.PartitionSpecContext; import org.apache.doris.nereids.DorisParser.PartitionValueDefContext; import org.apache.doris.nereids.DorisParser.PartitionsDefContext; +import org.apache.doris.nereids.DorisParser.PauseMTMVContext; import org.apache.doris.nereids.DorisParser.PlanTypeContext; import org.apache.doris.nereids.DorisParser.PredicateContext; import org.apache.doris.nereids.DorisParser.PredicatedContext; @@ -142,6 +143,7 @@ import org.apache.doris.nereids.DorisParser.RefreshTriggerContext; import org.apache.doris.nereids.DorisParser.RegularQuerySpecificationContext; import org.apache.doris.nereids.DorisParser.RelationContext; +import org.apache.doris.nereids.DorisParser.ResumeMTMVContext; import org.apache.doris.nereids.DorisParser.RollupDefContext; import org.apache.doris.nereids.DorisParser.RollupDefsContext; import org.apache.doris.nereids.DorisParser.RowConstructorContext; @@ -339,7 +341,9 @@ import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; +import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand; +import org.apache.doris.nereids.trees.plans.commands.ResumeMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.UpdateCommand; import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo; @@ -360,7 +364,9 @@ import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition; import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition; import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition.MaxValue; +import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RollupDefinition; import org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.StepPartition; @@ -688,6 +694,18 @@ public DropMTMVCommand visitDropMTMV(DropMTMVContext ctx) { return new DropMTMVCommand(new DropMTMVInfo(new TableNameInfo(nameParts), ctx.EXISTS() != null)); } + @Override + public PauseMTMVCommand visitPauseMTMV(PauseMTMVContext ctx) { + List nameParts = visitMultipartIdentifier(ctx.mvName); + return new PauseMTMVCommand(new PauseMTMVInfo(new TableNameInfo(nameParts))); + } + + @Override + public ResumeMTMVCommand visitResumeMTMV(ResumeMTMVContext ctx) { + List nameParts = visitMultipartIdentifier(ctx.mvName); + return new ResumeMTMVCommand(new ResumeMTMVInfo(new TableNameInfo(nameParts))); + } + @Override public AlterMTMVCommand visitAlterMTMV(AlterMTMVContext ctx) { List nameParts = visitMultipartIdentifier(ctx.mvName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index d75115a648ff6f4..3db4a43834507da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -133,5 +133,7 @@ public enum PlanType { DROP_CONSTRAINT_COMMAND, REFRESH_MTMV_COMMAND, DROP_MTMV_COMMAND, + PAUSE_MTMV_COMMAND, + RESUME_MTMV_COMMAND, CALL_COMMAND } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseMTMVCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseMTMVCommand.java new file mode 100644 index 000000000000000..f2a8cb233609e4f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseMTMVCommand.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import java.util.Objects; + +/** + * pause mtmv + */ +public class PauseMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback { + private final PauseMTMVInfo pauseMTMVInfo; + + public PauseMTMVCommand(PauseMTMVInfo pauseMTMVInfo) { + super(PlanType.PAUSE_MTMV_COMMAND); + this.pauseMTMVInfo = Objects.requireNonNull(pauseMTMVInfo, "require pauseMTMVInfo object"); + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + pauseMTMVInfo.analyze(ctx); + Env.getCurrentEnv().getMtmvService().pauseMTMV(pauseMTMVInfo); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPauseMTMVCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeMTMVCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeMTMVCommand.java new file mode 100644 index 000000000000000..a47c65eeb41a1a7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeMTMVCommand.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import java.util.Objects; + +/** + * resume mtmv + */ +public class ResumeMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback { + private final ResumeMTMVInfo resumeMTMVInfo; + + public ResumeMTMVCommand(ResumeMTMVInfo resumeMTMVInfo) { + super(PlanType.RESUME_MTMV_COMMAND); + this.resumeMTMVInfo = Objects.requireNonNull(resumeMTMVInfo, "require resumeMTMVInfo object"); + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + resumeMTMVInfo.analyze(ctx); + Env.getCurrentEnv().getMtmvService().resumeMTMV(resumeMTMVInfo); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitResumeMTMVCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java new file mode 100644 index 000000000000000..15bbcff6f72f99c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; + +import java.util.Objects; + +/** + * pause mtmv info + */ +public class PauseMTMVInfo { + private final TableNameInfo mvName; + + public PauseMTMVInfo(TableNameInfo mvName) { + this.mvName = Objects.requireNonNull(mvName, "require mvName object"); + } + + /** + * analyze pause info + * + * @param ctx ConnectContext + */ + public void analyze(ConnectContext ctx) { + mvName.analyze(ctx); + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(), + mvName.getTbl(), PrivPredicate.CREATE)) { + String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + mvName.getDb() + ": " + mvName.getTbl()); + throw new AnalysisException(message); + } + try { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); + db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); + } catch (MetaNotFoundException | DdlException e) { + throw new AnalysisException(e.getMessage()); + } + } + + /** + * getMvName + * + * @return TableNameInfo + */ + public TableNameInfo getMvName() { + return mvName; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java new file mode 100644 index 000000000000000..a7f23bedfc7500b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; + +import java.util.Objects; + +/** + * resume mtmv info + */ +public class ResumeMTMVInfo { + private final TableNameInfo mvName; + + public ResumeMTMVInfo(TableNameInfo mvName) { + this.mvName = Objects.requireNonNull(mvName, "require mvName object"); + } + + /** + * analyze resume info + * + * @param ctx ConnectContext + */ + public void analyze(ConnectContext ctx) { + mvName.analyze(ctx); + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(), + mvName.getTbl(), PrivPredicate.CREATE)) { + String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + mvName.getDb() + ": " + mvName.getTbl()); + throw new AnalysisException(message); + } + try { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); + db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); + } catch (MetaNotFoundException | DdlException e) { + throw new AnalysisException(e.getMessage()); + } + } + + /** + * getMvName + * + * @return TableNameInfo + */ + public TableNameInfo getMvName() { + return mvName; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index e67b78bb4b1925b..a48a8aaf981f289 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -34,7 +34,9 @@ import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; +import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand; +import org.apache.doris.nereids.trees.plans.commands.ResumeMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.UpdateCommand; /** CommandVisitor. */ @@ -113,6 +115,14 @@ default R visitDropMTMVCommand(DropMTMVCommand dropMTMVCommand, C context) { return visitCommand(dropMTMVCommand, context); } + default R visitPauseMTMVCommand(PauseMTMVCommand pauseMTMVCommand, C context) { + return visitCommand(pauseMTMVCommand, context); + } + + default R visitResumeMTMVCommand(ResumeMTMVCommand resumeMTMVCommand, C context) { + return visitCommand(resumeMTMVCommand, context); + } + default R visitCallCommand(CallCommand callCommand, C context) { return visitCommand(callCommand, context); } diff --git a/regression-test/data/mtmv_p0/test_db_mtmv.out b/regression-test/data/mtmv_p0/test_db_mtmv.out new file mode 100644 index 000000000000000..ac76df5cf046bef --- /dev/null +++ b/regression-test/data/mtmv_p0/test_db_mtmv.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !count_init -- +1 + +-- !count_dropped -- +0 + diff --git a/regression-test/data/mtmv_p0/test_pause_mtmv.out b/regression-test/data/mtmv_p0/test_pause_mtmv.out new file mode 100644 index 000000000000000..bf257d835088e2a --- /dev/null +++ b/regression-test/data/mtmv_p0/test_pause_mtmv.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !status_init -- +RUNNING + +-- !status_pause -- +PAUSED + +-- !status_resume -- +RUNNING + diff --git a/regression-test/suites/mtmv_p0/test_db_mtmv.groovy b/regression-test/suites/mtmv_p0/test_db_mtmv.groovy new file mode 100644 index 000000000000000..fa19c2f7f97608f --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_db_mtmv.groovy @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_db_mtmv") { + def tableName = "t_test_db_mtmv_user" + def mvName = "multi_mv_test_db_mtmv" + def dbName = "regression_test_mtmv_db" + sql """drop database if exists `${dbName}`""" + sql """create database `${dbName}`""" + sql """use `${dbName}`""" + + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + event_day DATE, + id BIGINT, + username VARCHAR(20) + ) + DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH COMPLETE ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + def jobName = getJobName(dbName, mvName); + order_qt_count_init "select count(*) from jobs('type'='mv') where Name='${jobName}'" + sql """ + drop database `${dbName}` + """ + order_qt_count_dropped "select count(*) from jobs('type'='mv') where Name='${jobName}'" +} diff --git a/regression-test/suites/mtmv_p0/test_pause_mtmv.groovy b/regression-test/suites/mtmv_p0/test_pause_mtmv.groovy new file mode 100644 index 000000000000000..dcd9d56ff7700e4 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_pause_mtmv.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_pause_mtmv") { + def tableName = "t_test_pause_mtmv_user" + def mvName = "multi_mv_test_pause_mtmv" + def dbName = "regression_test_mtmv_p0" + sql """drop table if exists `${tableName}`""" + + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + event_day DATE, + id BIGINT, + username VARCHAR(20) + ) + DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """drop materialized view if exists ${mvName};""" + + // IMMEDIATE MANUAL + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH COMPLETE ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + def jobName = getJobName("regression_test_mtmv_p0", mvName); + order_qt_status_init "select Status from jobs('type'='mv') where Name='${jobName}'" + sql """ + PAUSE MATERIALIZED VIEW JOB ON ${mvName} + """ + order_qt_status_pause "select Status from jobs('type'='mv') where Name='${jobName}'" + sql """ + RESUME MATERIALIZED VIEW JOB ON ${mvName} + """ + order_qt_status_resume "select Status from jobs('type'='mv') where Name='${jobName}'" + sql """ + DROP MATERIALIZED VIEW ${mvName} + """ +}