Skip to content

Commit

Permalink
Add testcontainer utils for test.
Browse files Browse the repository at this point in the history
  • Loading branch information
RocMarshal committed Jul 26, 2023
1 parent 5e0ce99 commit 63899f0
Show file tree
Hide file tree
Showing 27 changed files with 1,089 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
distribution: "adopt"
cache: "maven"
- name: Build with Maven
run: ./mvnw -B clean install -Pshaded -DskipTests
run: ./mvnw -B clean install -Pshaded,dist -DskipTests
- name: Test with Maven
run: ./mvnw -B test

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<module>streampark-spark</module>
<module>streampark-storage</module>
<module>streampark-console</module>
<module>streampark-test-utils</module>
</modules>

<properties>
Expand Down
14 changes: 14 additions & 0 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,20 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.rauschig</groupId>
<artifactId>jarchivelib</artifactId>
<version>0.7.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-testcontainer</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!--Test dependencies end.-->

<!--log4j -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<assembly>
<id>bin</id>
<formats>
<format>dir</format>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1661,7 +1661,7 @@ public void start(Application appParam, boolean auto) throws Exception {
}

private Map<String, Object> getProperties(Application application) {
Map<String, Object> properties = application.getOptionMap();
Map<String, Object> properties = new HashMap<>(application.getOptionMap());
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public ResponseResult check(FlinkCluster cluster) {
@Override
public Boolean create(FlinkCluster flinkCluster) {
flinkCluster.setUserId(commonService.getUserId());
return internalCreate(flinkCluster);
}

@VisibleForTesting
public boolean internalCreate(FlinkCluster flinkCluster) {
boolean successful = validateQueueIfNeeded(flinkCluster);
ApiAlertException.throwIfFalse(
successful, String.format(ERROR_CLUSTER_QUEUE_HINT, flinkCluster.getYarnQueue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.streampark.console.core.service.alert.AlertService;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hc.client5.http.config.RequestConfig;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
Expand All @@ -50,6 +51,8 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

Expand Down Expand Up @@ -85,7 +88,7 @@ public class FlinkHttpWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;

// track interval every 5 seconds
private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);

// option interval within 10 seconds
private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
Expand Down Expand Up @@ -198,6 +201,12 @@ public void start() {
}
}

@VisibleForTesting
public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
Application app = WATCHING_APPS.get(appId);
return (app == null || app.getState() == null) ? null : FlinkAppState.of(app.getState());
}

private void watch(Long id, Application application) {
EXECUTOR.execute(
() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console;

import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.util.SystemPropertyUtils;

import org.apache.commons.io.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.io.IOUtils;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.junit.jupiter.MockitoExtension;
import org.rauschig.jarchivelib.Archiver;
import org.rauschig.jarchivelib.ArchiverFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* Integration base tester. Note: The all children classes of the base must run after the
* project-level package phrase.
*/
@Slf4j
@EnableScheduling
@ActiveProfiles("integration-test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
@TestPropertySource(locations = {"classpath:application-integration-test.yml"})
@ExtendWith({MockitoExtension.class, SpringExtension.class})
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public abstract class SpringIntegrationTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(SpringIntegrationTestBase.class);

protected static final String RUN_PKG_SCRIPT_HINT =
"Please run package script before running the test case.";

protected static final String DEFAULT_APP_HOME_DIR_NAME = "apache-streampark";
protected static final String DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME =
"flink-official-releases";
protected static final String DEFAULT_LOCAL_WORKSPACE_DIR_NAME = "localWorkspace";
protected static final String DEFAULT_FLINK_VERSION = "1.17.1";
protected static final String DEFAULT_FLINK_DOWNLOAD_URL =
"https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz";
protected static final FileFilter PKG_NAME_FILTER =
file -> file.getName().startsWith(DEFAULT_APP_HOME_DIR_NAME) && file.isDirectory();

protected static String defaultFlinkHome;
protected static String appHome;

@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {

LOG.info("Start prepare the real running env.");
String tempAbsPath = tempPath.getAbsolutePath();
LOG.info("Integration test base tmp dir: {}", tempAbsPath);

FileUtils.copyDirectory(
tryFindStreamParkPackagedDirFile(), new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME));

final File flinkOfficialReleases =
new File(tempAbsPath, DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME);
Files.createDirectories(flinkOfficialReleases.toPath());

defaultFlinkHome =
prepareFlinkOfficialRelease(
DEFAULT_FLINK_DOWNLOAD_URL,
DEFAULT_FLINK_VERSION,
flinkOfficialReleases.getAbsolutePath());
Path localWorkspace =
Files.createDirectories(new File(tempAbsPath, DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());

appHome = new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
System.setProperty(
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath().toString());

LOG.info(
"Complete mock EnvInitializer init, app home: {}, {}: {}",
appHome,
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath());
}

/**
* @param httpUrl flink official release download url.
* @return return the target un-packed flink home absolute dir.
*/
public static String prepareFlinkOfficialRelease(
@Nonnull String httpUrl, @Nonnull String flinkVersion, String workDirAbsolutePath)
throws IOException {
String downloadedFilePath =
new File(workDirAbsolutePath, flinkVersion).getAbsolutePath() + ".tgz";
httpDownload(httpUrl, downloadedFilePath);
File archive = new File(downloadedFilePath);
File destination = new File(archive.getParentFile().getAbsolutePath());
Files.createDirectories(destination.toPath());

Archiver archiver = ArchiverFactory.createArchiver("tar", "gz");
archiver.extract(archive, destination);
Optional<File> first =
Arrays.stream(
requireNonNull(
destination.listFiles(
file -> file.getName().contains(flinkVersion) && file.isDirectory())))
.findFirst();
File file =
first.orElseThrow(
() ->
new RuntimeException(
String.format(
"Error in prepareFlinkOfficialRelease for httpUrl: %s, flinkVersion: %s",
httpUrl, flinkVersion)));

LOG.info("Prepared flink release: {}.", file.getAbsolutePath());
return file.getAbsolutePath();
}

public static void httpDownload(String httpUrl, String saveFile) {

try {
URL url = new URL(httpUrl);
URLConnection conn = url.openConnection();
InputStream inStream = conn.getInputStream();
FileOutputStream fs = new FileOutputStream(saveFile);
IOUtils.copyBytes(inStream, fs, 2048);
inStream.close();
fs.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static File tryFindStreamParkPackagedDirFile() {
String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
File pkgTargetDirFile = new File(userDir, "target");
Preconditions.checkState(
pkgTargetDirFile.exists(),
"The target directory of %s doesn't exist. %s",
userDir,
RUN_PKG_SCRIPT_HINT);
Optional<File> availablePkgParentFileOpt =
Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
final File availablePkgParentFile =
availablePkgParentFileOpt.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
Optional<File> targetDirFile =
Arrays.stream(requireNonNull(availablePkgParentFile.listFiles(PKG_NAME_FILTER)))
.findFirst();
return targetDirFile.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.YarnQueue;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -33,6 +34,7 @@
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
Expand All @@ -44,6 +46,8 @@
import java.nio.file.Path;

/** base tester. */
@Slf4j
@EnableScheduling
@Transactional
@ActiveProfiles("test")
@AutoConfigureTestEntityManager
Expand All @@ -53,9 +57,9 @@
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public abstract class SpringTestBase {
public abstract class SpringUnitTestBase {

protected static final Logger LOG = LoggerFactory.getLogger(SpringTestBase.class);
protected static final Logger LOG = LoggerFactory.getLogger(SpringUnitTestBase.class);

@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.console.core.service;

import org.apache.streampark.console.SpringTestBase;
import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.util.WebUtils;
Expand All @@ -33,7 +33,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

public class AccessTokenServiceTest extends SpringTestBase {
public class AccessTokenServiceTest extends SpringUnitTestBase {

@Autowired private AccessTokenService accessTokenService;

Expand Down
Loading

0 comments on commit 63899f0

Please sign in to comment.