Skip to content

Commit

Permalink
Merge branch 'apache:dev' into Refactor_the_lifecycle_control_of_Flin…
Browse files Browse the repository at this point in the history
…k_application-mode_
  • Loading branch information
caicancai authored Sep 1, 2023
2 parents d2b3c7a + fd3f2e0 commit 09f1e2d
Show file tree
Hide file tree
Showing 30 changed files with 1,017 additions and 40 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
distribution: "adopt"
cache: "maven"
- name: Build with Maven
run: ./mvnw -B clean install -Pshaded -DskipTests
run: ./mvnw -B clean install -Pshaded,dist -DskipTests
- name: Test with Maven
run: ./mvnw -B test
run: wget -c https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz -P /tmp/ && tar -zxvf /tmp/flink-1.17.1-bin-scala_2.12.tgz -C /tmp/ && ./mvnw -B test

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
<module>streampark-common</module>
<module>streampark-flink</module>
<module>streampark-console</module>
<module>streampark-test-utils</module>
</modules>

<properties>
Expand Down
10 changes: 8 additions & 2 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -412,6 +411,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-testcontainer</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!--Test dependencies end.-->

<!--log4j -->
Expand Down Expand Up @@ -447,7 +453,7 @@
<groupId>dev.zio</groupId>
<artifactId>zio-logging_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<assembly>
<id>bin</id>
<formats>
<format>dir</format>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ public void start(Application appParam, boolean auto) throws Exception {
}

private Map<String, Object> getProperties(Application application) {
Map<String, Object> properties = application.getOptionMap();
Map<String, Object> properties = new HashMap<>(application.getOptionMap());
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public ResponseResult check(FlinkCluster cluster) {
@Override
public Boolean create(FlinkCluster flinkCluster) {
flinkCluster.setUserId(commonService.getUserId());
return internalCreate(flinkCluster);
}

@VisibleForTesting
public boolean internalCreate(FlinkCluster flinkCluster) {
boolean successful = validateQueueIfNeeded(flinkCluster);
ApiAlertException.throwIfFalse(
successful, String.format(ERROR_CLUSTER_QUEUE_HINT, flinkCluster.getYarnQueue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.streampark.console.core.service.alert.AlertService;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hc.client5.http.config.RequestConfig;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
Expand All @@ -49,6 +50,8 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

Expand Down Expand Up @@ -84,7 +87,7 @@ public class FlinkHttpWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;

// track interval every 5 seconds
private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);

// option interval within 10 seconds
private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
Expand Down Expand Up @@ -197,6 +200,12 @@ public void start() {
}
}

@VisibleForTesting
public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
Application app = WATCHING_APPS.get(appId);
return (app == null || app.getState() == null) ? null : FlinkAppState.of(app.getState());
}

private void watch(Long id, Application application) {
EXECUTOR.execute(
() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console;

import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.util.SystemPropertyUtils;

import org.apache.commons.io.FileUtils;
import org.apache.flink.util.Preconditions;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* Integration base tester. Note: The all children classes of the base must run after the
* project-level package phrase.
*/
@Slf4j
@EnableScheduling
@ActiveProfiles("integration-test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
@TestPropertySource(locations = {"classpath:application-integration-test.yml"})
@ExtendWith({MockitoExtension.class, SpringExtension.class})
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public abstract class SpringIntegrationTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(SpringIntegrationTestBase.class);

protected static final String RUN_PKG_SCRIPT_HINT =
"Please run package script before running the test case.";

protected static final String DEFAULT_APP_HOME_DIR_NAME = "apache-streampark";
protected static final String DEFAULT_LOCAL_WORKSPACE_DIR_NAME = "localWorkspace";
protected static final String DEFAULT_FLINK_VERSION = "1.17.1";
protected static final FileFilter PKG_NAME_FILTER =
file -> file.getName().startsWith(DEFAULT_APP_HOME_DIR_NAME) && file.isDirectory();
protected static String defaultFlinkHome = "/tmp/flink-1.17.1";
protected static String appHome;

@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {

LOG.info("Start prepare the real running env.");
String tempAbsPath = tempPath.getAbsolutePath();
LOG.info("Integration test base tmp dir: {}", tempAbsPath);

FileUtils.copyDirectory(
tryFindStreamParkPackagedDirFile(), new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME));

Path localWorkspace =
Files.createDirectories(new File(tempAbsPath, DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());

appHome = new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
System.setProperty(
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath().toString());

LOG.info(
"Complete mock EnvInitializer init, app home: {}, {}: {}",
appHome,
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath());
}

private static File tryFindStreamParkPackagedDirFile() {
String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
File pkgTargetDirFile = new File(userDir, "target");
Preconditions.checkState(
pkgTargetDirFile.exists(),
"The target directory of %s doesn't exist. %s",
userDir,
RUN_PKG_SCRIPT_HINT);
Optional<File> availablePkgParentFileOpt =
Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
final File availablePkgParentFile =
availablePkgParentFileOpt.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
Optional<File> targetDirFile =
Arrays.stream(requireNonNull(availablePkgParentFile.listFiles(PKG_NAME_FILTER)))
.findFirst();
return targetDirFile.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.YarnQueue;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -33,18 +34,19 @@
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.annotation.Transactional;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

/** base tester. */
@Transactional
@Slf4j
@EnableScheduling
@ActiveProfiles("test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
Expand All @@ -53,9 +55,9 @@
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public abstract class SpringTestBase {
public abstract class SpringUnitTestBase {

protected static final Logger LOG = LoggerFactory.getLogger(SpringTestBase.class);
protected static final Logger LOG = LoggerFactory.getLogger(SpringUnitTestBase.class);

@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.console.core.service;

import org.apache.streampark.console.SpringTestBase;
import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.util.WebUtils;
Expand All @@ -33,7 +33,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

public class AccessTokenServiceTest extends SpringTestBase {
public class AccessTokenServiceTest extends SpringUnitTestBase {

@Autowired private AccessTokenService accessTokenService;

Expand Down
Loading

0 comments on commit 09f1e2d

Please sign in to comment.