Skip to content

Commit

Permalink
Merge pull request #395 from WeDataSphere/dev-1.1.1-webank-fix
Browse files Browse the repository at this point in the history
Synchronize the latest code to the dev-1.1.1 branch
  • Loading branch information
jefftlin authored Jul 12, 2023
2 parents 7718c48 + cf6d229 commit 9938d58
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 35 deletions.
2 changes: 2 additions & 0 deletions assembly-package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>2.4</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.3</version>
<executions>
<execution>
<phase>package</phase>
Expand Down
19 changes: 16 additions & 3 deletions exchangis-engines/engineconn-plugins/datax/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>linkis</artifactId>
<groupId>org.apache.linkis</groupId>
<artifactId>exchangis-engines</artifactId>
<groupId>com.webank.wedatasphere.exchangis</groupId>
<version>1.1.1</version>
<relativePath/>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>linkis-engineplugin-datax</artifactId>

<properties>
<linkis.version>1.1.6-webank</linkis.version>
<datax.engine.version>3.0.0-Plus-2</datax.engine.version>
<datax.version>3.0.0</datax.version>
<hadoop.version>2.7.2</hadoop.version>
Expand Down Expand Up @@ -104,6 +105,18 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
Expand Down
25 changes: 5 additions & 20 deletions exchangis-engines/engineconn-plugins/sqoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>linkis</artifactId>
<groupId>org.apache.linkis</groupId>
<artifactId>exchangis-engines</artifactId>
<groupId>com.webank.wedatasphere.exchangis</groupId>
<version>1.1.1</version>
<relativePath/>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>linkis-engineplugin-sqoop</artifactId>
<properties>
<sqoop.version>1.4.6</sqoop.version>
<hive.version>3.1.2</hive.version>
<hadoop.version>2.7.2</hadoop.version>
<linkis.version>1.1.6-webank</linkis.version>
</properties>

<dependencies>
Expand All @@ -46,12 +48,6 @@
<classifier>hadoop200</classifier>
<version>${sqoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<scope>provided</scope>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down Expand Up @@ -213,17 +209,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-bml-engine-hook</artifactId>
<version>${linkis.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,22 @@ public TaskStatus getStatus() throws ExchangisTaskLaunchException {
String linkisJobStatus = this.onceJob.getStatus(this.jobInfo);
if ("success".equalsIgnoreCase(linkisJobStatus)) {
this.status = TaskStatus.Success;
} else if ("failed".equalsIgnoreCase(linkisJobStatus)) {
} else if ("failed".equalsIgnoreCase(linkisJobStatus)){
this.status = TaskStatus.Failed;
} else if ("shuttingdown".equalsIgnoreCase(linkisJobStatus)) {
LOG.warn("Will retry on linkis job status: [{}]", linkisJobStatus);
// Retry on shutting down status
this.status = TaskStatus.WaitForRetry;
} else {
this.status = TaskStatus.Running;
}
// Init the error count
this.reqError.set(0);
} catch (Exception e){
try {
dealException(e);
} catch (ExchangisTaskNotExistException ne){
LOG.warn("Not find the launcher task in exchangis", e);
this.status = TaskStatus.Failed;
}
}
Expand All @@ -151,7 +158,10 @@ public Map<String, Object> getMetricsInfo() throws ExchangisTaskLaunchException
try{
// Invoke getStatus() to get real time status
if(!TaskStatus.isCompleted(getStatus())){
return (Map<String, Object>)this.metricsOperator.apply();
Map<String, Object> metrics = (Map<String, Object>)this.metricsOperator.apply();
// Init the error count
this.reqError.set(0);
return metrics;
}
}catch(Exception e){
dealException(e);
Expand Down Expand Up @@ -184,6 +194,8 @@ public TaskProgressInfo getProgressInfo() throws ExchangisTaskLaunchException {
}
this.progressInfo.setProgress(1.0f);
}
// Init the error count
this.reqError.set(0);
} catch(Exception e){
dealException(e);
}
Expand All @@ -197,6 +209,8 @@ public void kill() throws ExchangisTaskLaunchException {
try{
this.onceJob.kill();
this.status = TaskStatus.Cancelled;
// Init the error count
this.reqError.set(0);
}catch(Exception e){
dealException(e);
}
Expand All @@ -223,6 +237,8 @@ public LogResult queryLogs(LogQuery query) throws ExchangisTaskLaunchException {
if (isEnd){
isEnd = TaskStatus.isCompleted(getStatus());
}
// Init the error count
this.reqError.set(0);
return new LogResult(logs.endLine(), isEnd, logs.logs());
} catch (Exception e){
dealException(e);
Expand All @@ -239,6 +255,10 @@ public synchronized void submit() throws ExchangisTaskLaunchException {
}
try {
((SubmittableOnceJob) this.onceJob).submit();
TaskStatus status = getStatus();
if (status == TaskStatus.Undefined || status == TaskStatus.WaitForRetry){
throw new ExchangisTaskLaunchException("Fail to submit to linkis server with unexpected final status: [" + status + "]", null);
}
// New the operators for job
prepareOperators(this.onceJob);
Map<String, Object> jobInfo = getJobInfo(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public TaskDequeueEvent(String taskId) {
this.taskId = taskId;
}

@Override
public String eventId() {
return "_TaskExecution_" + this.taskId;
}

public String getTaskId() {
return taskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ protected void schedule() throws ExchangisSchedulerException, ExchangisScheduler
launchedExchangisTask = launcher.launch(this.launchableExchangisTask);
// launchedExchangisTask = new LaunchedExchangisTask(launchableExchangisTask);
launchedExchangisTask.setLaunchTime(launchTime);
info(jobExecutionId, "Success to submit task:[name:{}, id:{}] to Linkis [linkis_id: {}, info: {}]",
launchedExchangisTask.getName(), launchedExchangisTask.getId(), launchedExchangisTask.getLinkisJobId(), launchedExchangisTask.getLinkisJobInfo());
} catch (Exception e) {
info(jobExecutionId, "Launch task:[name:{} ,id:{}] fail, possible reason is: [{}]",
launchableExchangisTask.getName(), launchableExchangisTask.getId(), getActualCause(e).getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public List<T> onPublish(int batchSize) throws ExchangisTaskObserverException {
}
int fetchTaskSize = cacheTasks.size();
int restBatchSize = batchSize - fetchTaskSize;
if (restBatchSize > 0 && (this.lastPublishTime + this.publishInterval < System.currentTimeMillis())) {
if (restBatchSize > 0 && (this.lastPublishTime + this.publishInterval <= System.currentTimeMillis())) {
Optional.ofNullable(onPublishNext(restBatchSize)).ifPresent(cacheTasks::addAll);
}
return cacheTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public Message ExecutedJobKill(@PathVariable(value = "jobExecutionId") String jo
ExchangisJobProgressVo jobStatus = executeService.getJobStatus(jobExecutionId);
Message message = null;
String loginUser = SecurityFilter.getLoginUsername(request);
String oringinUser = SecurityFilter.getLoginUsername(request);
if(!JobAuthorityUtils.hasJobExecuteSituationAuthority(loginUser, jobExecutionId, OperationType.JOB_EXECUTE)) {
return Message.error("You have no permission to get kill job (没有权限去杀死任务)");
}
Expand Down
5 changes: 5 additions & 0 deletions exchangis-plugins/exchangis-appconn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@
<version>${dss.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Message queryProjects(HttpServletRequest request,
@RequestMapping( value = "projects/{projectId:\\d+}", method = RequestMethod.GET)
public Message queryProjectDetail(HttpServletRequest request,
@PathVariable("projectId") Long projectId) {
String username = UserUtils.getLoginUser(request);
String username = SecurityFilter.getLoginUsername(request);
try {
ExchangisProjectInfo project = projectService.getProjectDetailById(projectId);
if (Objects.isNull(project)){
Expand Down Expand Up @@ -124,14 +124,13 @@ public Message createProject(@Validated @RequestBody ExchangisProjectInfo projec
String username = UserUtils.getLoginUser(request);
String oringinUser = SecurityFilter.getLoginUsername(request);
if (StringUtils.isBlank(projectVo.getViewUsers()) || !StringUtils.contains(projectVo.getViewUsers(), username)) {
projectVo.setViewUsers(username + projectVo.getViewUsers());
projectVo.setViewUsers(username + "," + projectVo.getViewUsers());
}
if (StringUtils.isBlank(projectVo.getEditUsers()) || !StringUtils.contains(projectVo.getEditUsers(), username)) {
projectVo.setEditUsers(username + projectVo.getEditUsers());
projectVo.setEditUsers(username + "," + projectVo.getEditUsers());
}
if (StringUtils.isBlank(projectVo.getExecUsers()) || !StringUtils.contains(projectVo.getExecUsers(), username)) {
projectVo.setExecUsers(username + projectVo.getExecUsers());

projectVo.setExecUsers(username + "," + projectVo.getExecUsers());
}

try {
Expand All @@ -149,7 +148,6 @@ public Message createProject(@Validated @RequestBody ExchangisProjectInfo projec
return Message.error("Fail to create project (创建项目失败)");
}
}

/**
* check project name
* @param request http request
Expand All @@ -169,6 +167,7 @@ public Message getProjectByName(HttpServletRequest request, @PathVariable("name"
}
}


/**
* Update project
* @param request request
Expand All @@ -184,10 +183,9 @@ public Message updateProject(@Validated({UpdateGroup.class, Default.class}) @Req
if (result.hasErrors()){
return Message.error(result.getFieldErrors().get(0).getDefaultMessage());
}

String oringinUser = SecurityFilter.getLoginUsername(request);
String username = UserUtils.getLoginUser(request);

//String username = SecurityFilter.getLoginUsername(request);
if (StringUtils.isBlank(projectVo.getViewUsers()) || !StringUtils.contains(projectVo.getViewUsers(), username)) {
projectVo.setViewUsers(username + "," + projectVo.getViewUsers());
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<reflections.version>0.9.10</reflections.version>
<jersey-bean-validation.version>2.21</jersey-bean-validation.version>
<aspectj.version>1.9.5</aspectj.version>
<xstream.version>1.4.15</xstream.version>
<xstream.version>1.4.19</xstream.version>
<jobmanager.version>0.1.0-SNAPSHOT</jobmanager.version>
</properties>

Expand Down

0 comments on commit 9938d58

Please sign in to comment.