From cab165ec11df06e588f56722e4256bfa8678f9f1 Mon Sep 17 00:00:00 2001 From: benjobs Date: Wed, 26 Jul 2023 14:54:38 +0800 Subject: [PATCH] DependencyUtilsTest improvement --- .../base/util/DependencyUtilsTest.java | 89 +++++++++++++++---- 1 file changed, 72 insertions(+), 17 deletions(-) diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java index c70d5be797..3570486d11 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java @@ -23,46 +23,101 @@ import org.apache.streampark.flink.packer.maven.Artifact; import org.apache.streampark.flink.packer.maven.MavenTool; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.factories.Factory; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Scanner; import java.util.ServiceLoader; +import java.util.Set; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; @Slf4j class DependencyUtilsTest { @Test public void resolveFlinkConnector() throws Exception { - Artifact artifact = new Artifact("org.apache.flink", "flink-connector-kafka", "1.17.1", null); - InternalConfigHolder.set( - CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "/Users/benjobs/Desktop/streamx_workspace"); + Artifact artifact = + new Artifact("org.apache.flink", "flink-connector-hive_2.12", "1.17.1", null); + + InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "~/tmp"); List files = MavenTool.resolveArtifacts(artifact); + if (!files.isEmpty()) { + Class className = Factory.class; + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + files.forEach(x -> ClassLoaderUtils.loadJar(x.getAbsolutePath())); + ServiceLoader serviceLoader = ServiceLoader.load(className, classLoader); + for (Factory factory : serviceLoader) { + String name = factory.getClass().getName(); + String id = factory.factoryIdentifier(); + System.out.println("id: " + id + " class :" + name); + + Set> requiredOptions = factory.requiredOptions(); + System.out.println(" ------------requiredOptions---------- "); + requiredOptions.forEach( + x -> System.out.println(x.key() + " defValue: " + x.defaultValue())); + + System.out.println(" ------------optionalOptions---------- "); + Set> options = factory.optionalOptions(); + options.forEach( + x -> System.out.println(x.key() + " defValue: " + x.defaultValue())); + + System.out.println(); + } + } + } + + @Test + public void resolveFlinkConnector2() throws Exception { + Artifact artifact = + new Artifact("org.apache.flink", "flink-connector-elasticsearch6", "3.0.1-1.17", null); + + InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "~/tmp"); + List files = MavenTool.resolveArtifacts(artifact); + + String path = null; if (!files.isEmpty()) { String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version()); Optional jarFile = files.stream().filter(x -> x.getName().equals(fileName)).findFirst(); if (jarFile.isPresent()) { - String jar = jarFile.get().getAbsolutePath(); - Class className = Factory.class; - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - // 动态加载jar到classpath, - // 优点:简单, - // 缺点: 会污染当前主线程,每上传一个新的connector都会加载到当前的主线程中,导致加载太多的 class, ... - ClassLoaderUtils.loadJar(jar); - - ServiceLoader serviceLoader = ServiceLoader.load(className, classLoader); - for (Factory factory : serviceLoader) { - String name = factory.getClass().getName(); - String id = factory.factoryIdentifier(); - System.out.println(id + " : " + name); - } + path = jarFile.get().getAbsolutePath(); + } + } + + String configFile = "META-INF/services/org.apache.flink.table.factories.Factory"; + + JarFile jarFile = new JarFile(path); + JarEntry entry = jarFile.getJarEntry(configFile); + + List factories = new ArrayList<>(0); + InputStream inputStream = jarFile.getInputStream(entry); + Scanner scanner = new Scanner(new InputStreamReader(inputStream)); + while (scanner.hasNextLine()) { + String line = scanner.nextLine().trim(); + if (line.length() > 0 && !line.startsWith("#")) { + factories.add(line); } } + + factories.forEach(System.out::println); + for (String factory : factories) { + String packageName = factory.replace('.', '/') + ".class"; + JarEntry classEntry = jarFile.getJarEntry(packageName); + InputStream in = jarFile.getInputStream(classEntry); + + // TODO parse connector + System.out.println(in); + } } }