Skip to content

Commit

Permalink
#567 修复,作业配置在DB和ZK数据库的相关问题
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaopeng-he committed Jan 7, 2019
1 parent 13714fe commit 19ee9a9
Showing 1 changed file with 44 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,18 @@ private void validateAndUpdateStream(JobConfig jobConfig, Set<String> stream, Li
continue;
}
if (isDownStream) {
otherJob.setUpStream(removeFromStream(jobName, otherJob.getUpStream()));
String upStream = removeFromStreamIfNecessary(jobName, otherJob.getUpStream());
if (upStream != null) {
otherJob.setUpStream(upStream);
streamChangedJobs.add(otherJob);
}
} else {
otherJob.setDownStream(removeFromStream(jobName, otherJob.getDownStream()));
String downStream = removeFromStreamIfNecessary(jobName, otherJob.getDownStream());
if (downStream != null) {
otherJob.setDownStream(downStream);
streamChangedJobs.add(otherJob);
}
}
streamChangedJobs.add(otherJob);
}
}

Expand All @@ -710,12 +717,14 @@ private String appendToStream(String jobName, String stream) {
return formatStream(streamSet);
}

private String removeFromStream(String jobName, String stream) {
private String removeFromStreamIfNecessary(String jobName, String stream) {
Set<String> streamSet = parseStreamToList(stream);
if (StringUtils.isNotBlank(jobName)) {
streamSet.remove(jobName);
if (streamSet.remove(jobName)) {
return formatStream(streamSet);
}
}
return formatStream(streamSet);
return null;
}

private String formatStream(Set<String> streamSet) {
Expand Down Expand Up @@ -1121,9 +1130,16 @@ private void createJobConfigToZk(JobConfig jobConfig, Set<JobConfig> streamChang
// 添加作业根节点和config结点
curatorFrameworkOp.create(JobNodePath.getConfigNodePath(jobName), "");
CuratorFrameworkOp.CuratorTransactionOp curatorTransactionOp = curatorFrameworkOp.inTransaction();
// 数据库有可能有重复作业的数据,去重,zk无需更新两次
Collection<JobConfig> streamChangedJobsNew = removeDuplicateByJobName(streamChangedJobs);
// 更新关联作业的上下游
for (JobConfig streamChangedJob : streamChangedJobs) {
for (JobConfig streamChangedJob : streamChangedJobsNew) {
String changedJobName = streamChangedJob.getJobName();
if (!curatorFrameworkOp.checkExists(JobNodePath.getConfigNodePath(changedJobName))) {
// 数据库存在该作业,但是zk不存在该作业,为垃圾数据
log.warn("the job({}) config node is not existing in zk", changedJobName);
continue;
}
curatorTransactionOp
.replaceIfChanged(JobNodePath.getConfigNodePath(changedJobName, CONFIG_ITEM_UPSTREAM),
streamChangedJob.getUpStream())
Expand Down Expand Up @@ -2216,9 +2232,16 @@ private void updateJobConfigToZk(JobConfig jobConfig, Set<JobConfig> streamChang
}
}
CuratorFrameworkOp.CuratorTransactionOp curatorTransactionOp = curatorFrameworkOp.inTransaction();
// 数据库有可能有重复作业的数据,去重,zk无需更新两次
Collection<JobConfig> streamChangedJobsNew = removeDuplicateByJobName(streamChangedJobs);
// 更新关联作业的上下游
for (JobConfig streamChangedJob : streamChangedJobs) {
for (JobConfig streamChangedJob : streamChangedJobsNew) {
String changedJobName = streamChangedJob.getJobName();
if (!curatorFrameworkOp.checkExists(JobNodePath.getConfigNodePath(changedJobName))) {
// 数据库存在该作业,但是zk不存在该作业,为垃圾数据
log.warn("the job({}) config node is not existing in ZK", changedJobName);
continue;
}
curatorTransactionOp
.replaceIfChanged(JobNodePath.getConfigNodePath(changedJobName, CONFIG_ITEM_UPSTREAM),
streamChangedJob.getUpStream())
Expand Down Expand Up @@ -2297,6 +2320,19 @@ private void updateJobConfigToZk(JobConfig jobConfig, Set<JobConfig> streamChang
}
}

private Collection<JobConfig> removeDuplicateByJobName(Set<JobConfig> streamChangedJobs) {
Map<String, JobConfig> streamChangedJobsMap = new HashMap<>();
for (JobConfig streamChangedJob : streamChangedJobs) {
String jobName = streamChangedJob.getJobName();
if (streamChangedJobsMap.containsKey(jobName)) {
log.warn("the DB have duplicated jobName({})", jobName);
} else {
streamChangedJobsMap.put(jobName, streamChangedJob);
}
}
return streamChangedJobsMap.values();
}

@Override
public List<String> getAllJobNamesFromZK(String namespace) throws SaturnJobConsoleException {
CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = registryCenterService
Expand Down

1 comment on commit 19ee9a9

@lzw2006
Copy link
Contributor

@lzw2006 lzw2006 commented on 19ee9a9 Jan 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewed

Please sign in to comment.