Skip to content

Commit

Permalink
[feature](mtmv)MTMV pause and resume (apache#28887)
Browse files Browse the repository at this point in the history
- PAUSE MATERIALIZED VIEW JOB ON mv1
- RESUME MATERIALIZED VIEW JOB ON mv1
- fix when drop db,not drop job
- add lock for one materialized view can only run one task at a time
  • Loading branch information
zddr authored and HappenLee committed Jan 12, 2024
1 parent 1725e73 commit d574647
Show file tree
Hide file tree
Showing 20 changed files with 534 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ statement
| (REFRESH (refreshMethod | refreshTrigger | refreshMethod refreshTrigger))
| (SET LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN)) #alterMTMV
| DROP MATERIALIZED VIEW (IF EXISTS)? mvName=multipartIdentifier #dropMTMV
| PAUSE MATERIALIZED VIEW JOB ON mvName=multipartIdentifier #pauseMTMV
| RESUME MATERIALIZED VIEW JOB ON mvName=multipartIdentifier #resumeMTMV
| ALTER TABLE table=relation
ADD CONSTRAINT constraintName=errorCapturingIdentifier
constraint #addConstraint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,11 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
}
}
}
for (Table table : tableList) {
if (table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table);
}
}
unprotectDropDb(db, stmt.isForceDrop(), false, 0);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
private ReentrantReadWriteLock jobRwLock;

private static final ShowResultSetMetaData JOB_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("JobId", ScalarType.createVarchar(20)))
Expand Down Expand Up @@ -98,12 +101,14 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
private long mtmvId;

public MTMVJob() {
jobRwLock = new ReentrantReadWriteLock(true);
}

public MTMVJob(long dbId, long mtmvId) {
this.dbId = dbId;
this.mtmvId = mtmvId;
super.setCreateTimeMs(System.currentTimeMillis());
jobRwLock = new ReentrantReadWriteLock(true);
}

@Override
Expand Down Expand Up @@ -203,6 +208,22 @@ private MTMV getMTMV() throws DdlException, MetaNotFoundException {
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
}

public void readLock() {
this.jobRwLock.readLock().lock();
}

public void readUnlock() {
this.jobRwLock.readLock().unlock();
}

public void writeLock() {
this.jobRwLock.writeLock().lock();
}

public void writeUnlock() {
this.jobRwLock.writeLock().unlock();
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ public void before() throws JobException {
}
}

@Override
public void runTask() throws JobException {
MTMVJob job = (MTMVJob) getJobOrJobException();
try {
job.writeLock();
super.runTask();
} finally {
job.writeUnlock();
}
}

@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
Expand Down Expand Up @@ -276,8 +287,10 @@ private TUniqueId generateQueryId() {
}

private void after() {
Env.getCurrentEnv()
.addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
if (mtmv != null) {
Env.getCurrentEnv()
.addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
}
mtmv = null;
relation = null;
executor = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,12 @@ public String getJobName() {
return job == null ? "" : job.getJobName();
}

public Job getJobOrJobException() throws JobException {
AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId);
if (job == null) {
throw new JobException("job not exist, jobId:" + jobId);
}
return job;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.persist.AlterMTMV;

/**
Expand Down Expand Up @@ -77,7 +80,7 @@ public interface MTMVHookService {
* @throws DdlException
* @throws MetaNotFoundException
*/
void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException;
void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException;

/**
* triggered when mtmv task finish
Expand All @@ -101,4 +104,16 @@ public interface MTMVHookService {
* @param table
*/
void alterTable(Table table);

/**
* Triggered when pause mtmv
* @param info
*/
void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException;

/**
* Triggered when resume mtmv
* @param info
*/
void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException;
}
48 changes: 31 additions & 17 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.qe.ConnectContext;

Expand Down Expand Up @@ -167,23 +170,11 @@ public void alterMTMV(MTMV mtmv, AlterMTMV alterMTMV) throws DdlException {
* @throws MetaNotFoundException
*/
@Override
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getMvName().getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getMvName().getTbl(), TableType.MATERIALIZED_VIEW);
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size());
}
try {
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
info.isComplete());
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), mtmvTaskContext);

} catch (JobException e) {
e.printStackTrace();
throw new DdlException(e.getMessage());
}
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
info.isComplete());
Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext);
}

@Override
Expand All @@ -201,4 +192,27 @@ public void alterTable(Table table) {

}

@Override
public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.PAUSED);
}

@Override
public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.RUNNING);
}

private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW);
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size());
}
return jobs.get(0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;

Expand Down Expand Up @@ -187,6 +190,16 @@ public void alterTable(Table table) {
processBaseTableChange(table, "The base table has been updated:");
}

@Override
public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {

}

@Override
public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {

}

private void processBaseTableChange(Table table, String msgPrefix) {
BaseTableInfo baseTableInfo = new BaseTableInfo(table);
Set<BaseTableInfo> mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo);
Expand Down
21 changes: 20 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.persist.AlterMTMV;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -108,7 +111,7 @@ public void alterMTMV(MTMV mtmv, AlterMTMV alterMTMV) throws DdlException {
}
}

public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException {
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
Objects.requireNonNull(info);
LOG.info("refreshMTMV, RefreshMTMVInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
Expand Down Expand Up @@ -140,4 +143,20 @@ public void refreshComplete(MTMV mtmv, MTMVRelation cache, MTMVTask task) {
mtmvHookService.refreshComplete(mtmv, cache, task);
}
}

public void pauseMTMV(PauseMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
Objects.requireNonNull(info);
LOG.info("pauseMTMV, PauseMTMVInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.pauseMTMV(info);
}
}

public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
Objects.requireNonNull(info);
LOG.info("resumeMTMV, ResumeMTMVInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.resumeMTMV(info);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.apache.doris.nereids.DorisParser.PartitionSpecContext;
import org.apache.doris.nereids.DorisParser.PartitionValueDefContext;
import org.apache.doris.nereids.DorisParser.PartitionsDefContext;
import org.apache.doris.nereids.DorisParser.PauseMTMVContext;
import org.apache.doris.nereids.DorisParser.PlanTypeContext;
import org.apache.doris.nereids.DorisParser.PredicateContext;
import org.apache.doris.nereids.DorisParser.PredicatedContext;
Expand All @@ -142,6 +143,7 @@
import org.apache.doris.nereids.DorisParser.RefreshTriggerContext;
import org.apache.doris.nereids.DorisParser.RegularQuerySpecificationContext;
import org.apache.doris.nereids.DorisParser.RelationContext;
import org.apache.doris.nereids.DorisParser.ResumeMTMVContext;
import org.apache.doris.nereids.DorisParser.RollupDefContext;
import org.apache.doris.nereids.DorisParser.RollupDefsContext;
import org.apache.doris.nereids.DorisParser.RowConstructorContext;
Expand Down Expand Up @@ -339,7 +341,9 @@
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.ResumeMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
Expand All @@ -360,7 +364,9 @@
import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition.MaxValue;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RollupDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.StepPartition;
Expand Down Expand Up @@ -688,6 +694,18 @@ public DropMTMVCommand visitDropMTMV(DropMTMVContext ctx) {
return new DropMTMVCommand(new DropMTMVInfo(new TableNameInfo(nameParts), ctx.EXISTS() != null));
}

@Override
public PauseMTMVCommand visitPauseMTMV(PauseMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
return new PauseMTMVCommand(new PauseMTMVInfo(new TableNameInfo(nameParts)));
}

@Override
public ResumeMTMVCommand visitResumeMTMV(ResumeMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
return new ResumeMTMVCommand(new ResumeMTMVInfo(new TableNameInfo(nameParts)));
}

@Override
public AlterMTMVCommand visitAlterMTMV(AlterMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,7 @@ public enum PlanType {
DROP_CONSTRAINT_COMMAND,
REFRESH_MTMV_COMMAND,
DROP_MTMV_COMMAND,
PAUSE_MTMV_COMMAND,
RESUME_MTMV_COMMAND,
CALL_COMMAND
}
Loading

0 comments on commit d574647

Please sign in to comment.