Skip to content

Commit

Permalink
[fix](routine load) write edit log when rescheduled job (#40728)
Browse files Browse the repository at this point in the history
```
2024-09-11 20:00:53,079 ERROR (replayer|105) [RoutineLoadManager.replayChangeRoutineLoadJob():836] should not happened
org.apache.doris.common.DdlException: errCode = 2, detailMessage = Could not transform PAUSED to PAUSED
	at org.apache.doris.load.routineload.RoutineLoadJob.checkStateTransform(RoutineLoadJob.java:855) ~[doris-fe.jar:1.2-SNAPSHOT]
	at org.apache.doris.load.routineload.RoutineLoadJob.unprotectUpdateState(RoutineLoadJob.java:1407) ~[doris-fe.jar:1.2-SNAPSHOT]
	at org.apache.doris.load.routineload.RoutineLoadJob.updateState(RoutineLoadJob.java:1394) ~[doris-fe.jar:1.2-SNAPSHOT]
	at org.apache.doris.load.routineload.RoutineLoadManager.replayChangeRoutineLoadJob(RoutineLoadManager.java:834) ~[doris-fe.jar:1.2-SNAPSHOT]
	at org.apache.doris.persist.EditLog.loadJournal(EditLog.java:717) ~[doris-fe.jar:1.2-SNAPSHOT]
	at org.apache.doris.catalog.Env.replayJournal(Env.java:2913) ~[doris-fe.jar:1.2-SNAPSHOT]
	at org.apache.doris.catalog.Env$4.runOneCycle(Env.java:2675) ~[doris-fe.jar:1.2-SNAPSHOT]
	at org.apache.doris.common.util.Daemon.run(Daemon.java:116) ~[doris-fe.jar:1.2-SNAPSHOT]
```

`unprotectNeedReschedule()` will change job state to
`JobState.NEED_SCHEDULE` without `logOpRoutineLoadJob`.If job is paused
then rescheduled and paused finally, the record of two consecutive edit
logs will be 'PAUSED', the correct
replay sequence should be: `PAUSED` -> `NEED_SCHEDULE` ->` PAUSED`. 

Therefore, it is need to write edit log when rescheduled job.
  • Loading branch information
sollhui committed Sep 23, 2024
1 parent 76d62c4 commit ec68f92
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@ public void update() throws UserException {
.add("msg", "Job need to be rescheduled")
.build());
unprotectUpdateProgress();
executeNeedSchedule();
unprotectUpdateState(JobState.NEED_SCHEDULE, null, false);
}
} finally {
writeUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ public List<Pair<Integer, Long>> getRealOffsets(String brokerList, String topic,
};

RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress);
routineLoadJob.update();

Expand Down

0 comments on commit ec68f92

Please sign in to comment.