Skip to content

Commit

Permalink
DependencyUtilsTest improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
benjobs committed Jul 26, 2023
1 parent 764a8c3 commit cab165e
Showing 1 changed file with 72 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,101 @@
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.Factory;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Scanner;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;

@Slf4j
class DependencyUtilsTest {

@Test
public void resolveFlinkConnector() throws Exception {
Artifact artifact = new Artifact("org.apache.flink", "flink-connector-kafka", "1.17.1", null);

InternalConfigHolder.set(
CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "/Users/benjobs/Desktop/streamx_workspace");
Artifact artifact =
new Artifact("org.apache.flink", "flink-connector-hive_2.12", "1.17.1", null);

InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "~/tmp");

List<File> files = MavenTool.resolveArtifacts(artifact);
if (!files.isEmpty()) {
Class<Factory> className = Factory.class;
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
files.forEach(x -> ClassLoaderUtils.loadJar(x.getAbsolutePath()));
ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, classLoader);
for (Factory factory : serviceLoader) {
String name = factory.getClass().getName();
String id = factory.factoryIdentifier();
System.out.println("id: " + id + " class :" + name);

Set<ConfigOption<?>> requiredOptions = factory.requiredOptions();
System.out.println(" ------------requiredOptions---------- ");
requiredOptions.forEach(
x -> System.out.println(x.key() + " defValue: " + x.defaultValue()));

System.out.println(" ------------optionalOptions---------- ");
Set<ConfigOption<?>> options = factory.optionalOptions();
options.forEach(
x -> System.out.println(x.key() + " defValue: " + x.defaultValue()));

System.out.println();
}
}
}

@Test
public void resolveFlinkConnector2() throws Exception {
Artifact artifact =
new Artifact("org.apache.flink", "flink-connector-elasticsearch6", "3.0.1-1.17", null);

InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "~/tmp");
List<File> files = MavenTool.resolveArtifacts(artifact);

String path = null;
if (!files.isEmpty()) {
String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
Optional<File> jarFile = files.stream().filter(x -> x.getName().equals(fileName)).findFirst();
if (jarFile.isPresent()) {
String jar = jarFile.get().getAbsolutePath();
Class<Factory> className = Factory.class;
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 动态加载jar到classpath,
// 优点:简单,
// 缺点: 会污染当前主线程,每上传一个新的connector都会加载到当前的主线程中,导致加载太多的 class, ...
ClassLoaderUtils.loadJar(jar);

ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, classLoader);
for (Factory factory : serviceLoader) {
String name = factory.getClass().getName();
String id = factory.factoryIdentifier();
System.out.println(id + " : " + name);
}
path = jarFile.get().getAbsolutePath();
}
}

String configFile = "META-INF/services/org.apache.flink.table.factories.Factory";

JarFile jarFile = new JarFile(path);
JarEntry entry = jarFile.getJarEntry(configFile);

List<String> factories = new ArrayList<>(0);
InputStream inputStream = jarFile.getInputStream(entry);
Scanner scanner = new Scanner(new InputStreamReader(inputStream));
while (scanner.hasNextLine()) {
String line = scanner.nextLine().trim();
if (line.length() > 0 && !line.startsWith("#")) {
factories.add(line);
}
}

factories.forEach(System.out::println);
for (String factory : factories) {
String packageName = factory.replace('.', '/') + ".class";
JarEntry classEntry = jarFile.getJarEntry(packageName);
InputStream in = jarFile.getInputStream(classEntry);

// TODO parse connector
System.out.println(in);
}
}
}

0 comments on commit cab165e

Please sign in to comment.