Skip to content

Commit

Permalink
get flink connector improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Jul 26, 2023
1 parent cab165e commit b084ca6
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,20 @@ public boolean eq(Dependency other) {
}

public DependencyInfo toJarPackDeps() {
List<Artifact> mvnArts =
this.pom.stream()
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier()))
.collect(Collectors.toList());
List<Artifact> mvnArts = toArtifact();
List<String> extJars =
this.jar.stream()
.map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
.collect(Collectors.toList());
return new DependencyInfo(mvnArts, extJars);
}

public List<Artifact> toArtifact() {
return this.pom.stream()
.map(
pom ->
new Artifact(
pom.getGroupId(), pom.getArtifactId(), pom.getVersion(), pom.getClassifier()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import org.apache.flink.configuration.ConfigOption;

import lombok.Data;

import java.util.Set;

@Data
public class FlinkConnectorResource {

private String className;
private String factoryIdentifier;
Set<ConfigOption<?>> requiredOptions;
Set<ConfigOption<?>> optionalOptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.bean.FlinkConnectorResource;
import org.apache.streampark.console.core.entity.Resource;

import com.baomidou.mybatisplus.core.metadata.IPage;
Expand Down Expand Up @@ -95,5 +96,5 @@ public interface ResourceService extends IService<Resource> {

RestResponse checkResource(Resource resource);

List<String> getConnectorId(Resource resource) throws Exception;
List<FlinkConnectorResource> getConnectorResource(Resource resource) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.bean.FlinkConnectorResource;
import org.apache.streampark.console.core.bean.Pom;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkSql;
Expand All @@ -42,6 +43,7 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.factories.Factory;
import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
Expand All @@ -58,10 +60,15 @@

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.jar.Manifest;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -262,18 +269,23 @@ public RestResponse checkResource(Resource resource) {
return RestResponse.success().data(0);
case CONNECTOR:
// 1) get connector id
List<String> connectorIds;
List<FlinkConnectorResource> connectorResources;
try {
connectorIds = getConnectorId(resource);
connectorResources = getConnectorResource(resource);
} catch (Exception e) {
return RestResponse.success().data(1);
}

if (Utils.isEmpty(connectorIds)) {
if (Utils.isEmpty(connectorResources)) {
// connector id is null
return RestResponse.success().data(2);
}
// 2) check connector exists
List<String> connectorIds =
connectorResources.stream()
.map(FlinkConnectorResource::getFactoryIdentifier)
.collect(Collectors.toList());

boolean exists = existsResourceByConnectorIds(connectorIds);
if (exists) {
return RestResponse.success(3);
Expand All @@ -288,17 +300,53 @@ private boolean existsResourceByConnectorIds(List<String> connectorIds) {
}

@Override
public List<String> getConnectorId(Resource resource) throws Exception {
public List<FlinkConnectorResource> getConnectorResource(Resource resource) throws Exception {

ApiAlertException.throwIfFalse(
!ResourceType.CONNECTOR.equals(resource.getResourceType()),
"getConnectorId method error, resource not flink connector.");
File connector = getResourceJar(resource);
if (connector != null) {
String spi = "META-INF/services/org.apache.flink.table.factories.Factory";

// TODO parse connector get connectorId
Dependency dependency = Dependency.toDependency(resource.getResource());
List<File> jars;
if (!dependency.getPom().isEmpty()) {
// 1) pom
Artifact artifact = dependency.toArtifact().get(0);
jars = MavenTool.resolveArtifacts(artifact);
} else {
// 2) jar
String jar = dependency.getJar().get(0);
jars = Collections.singletonList(new File(WebUtils.getAppTempDir(), jar));
}

Class<Factory> className = Factory.class;
URL[] array =
jars.stream()
.map(
x -> {
try {
return x.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
})
.toArray(URL[]::new);

try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, urlClassLoader);
List<FlinkConnectorResource> connectorResources = new ArrayList<>();
for (Factory factory : serviceLoader) {
String factoryClassName = factory.getClass().getName();
if (!factoryClassName.equals("org.apache.flink.table.module.CoreModuleFactory")) {
FlinkConnectorResource connectorResource = new FlinkConnectorResource();
connectorResource.setClassName(factoryClassName);
connectorResource.setFactoryIdentifier(factory.factoryIdentifier());
connectorResource.setRequiredOptions(factory.requiredOptions());
connectorResource.setOptionalOptions(factory.optionalOptions());
connectorResources.add(connectorResource);
}
}
return connectorResources;
}
return null;
}

private File getResourceJar(Resource resource) throws Exception {
Expand All @@ -310,9 +358,7 @@ private File getResourceJar(Resource resource) throws Exception {
String jar = dependency.getJar().get(0);
return new File(WebUtils.getAppTempDir(), jar);
} else {
Pom pom = dependency.getPom().get(0);
Artifact artifact =
new Artifact(pom.getGroupId(), pom.getArtifactId(), pom.getVersion(), null);
Artifact artifact = dependency.toArtifact().get(0);
List<File> files = MavenTool.resolveArtifacts(artifact);
if (!files.isEmpty()) {
String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,105 +19,68 @@

import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.util.ClassLoaderUtils;
import org.apache.streampark.console.core.bean.FlinkConnectorResource;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.Factory;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Scanner;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;

@Slf4j
class DependencyUtilsTest {

@Test
public void resolveFlinkConnector() throws Exception {

Artifact artifact =
new Artifact("org.apache.flink", "flink-connector-hive_2.12", "1.17.1", null);
Artifact artifact = new Artifact("com.ververica", "flink-connector-mysql-cdc", "2.4.1", null);

InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "~/tmp");

List<File> files = MavenTool.resolveArtifacts(artifact);
if (!files.isEmpty()) {
Class<Factory> className = Factory.class;
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
files.forEach(x -> ClassLoaderUtils.loadJar(x.getAbsolutePath()));
ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, classLoader);
for (Factory factory : serviceLoader) {
String name = factory.getClass().getName();
String id = factory.factoryIdentifier();
System.out.println("id: " + id + " class :" + name);

Set<ConfigOption<?>> requiredOptions = factory.requiredOptions();
System.out.println(" ------------requiredOptions---------- ");
requiredOptions.forEach(
x -> System.out.println(x.key() + " defValue: " + x.defaultValue()));

System.out.println(" ------------optionalOptions---------- ");
Set<ConfigOption<?>> options = factory.optionalOptions();
options.forEach(
x -> System.out.println(x.key() + " defValue: " + x.defaultValue()));

System.out.println();
}
}
}

@Test
public void resolveFlinkConnector2() throws Exception {
Artifact artifact =
new Artifact("org.apache.flink", "flink-connector-elasticsearch6", "3.0.1-1.17", null);

InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "~/tmp");
List<File> files = MavenTool.resolveArtifacts(artifact);

String path = null;
if (!files.isEmpty()) {
String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
Optional<File> jarFile = files.stream().filter(x -> x.getName().equals(fileName)).findFirst();
if (jarFile.isPresent()) {
path = jarFile.get().getAbsolutePath();
}
if (files.isEmpty()) {
return;
}

String configFile = "META-INF/services/org.apache.flink.table.factories.Factory";

JarFile jarFile = new JarFile(path);
JarEntry entry = jarFile.getJarEntry(configFile);

List<String> factories = new ArrayList<>(0);
InputStream inputStream = jarFile.getInputStream(entry);
Scanner scanner = new Scanner(new InputStreamReader(inputStream));
while (scanner.hasNextLine()) {
String line = scanner.nextLine().trim();
if (line.length() > 0 && !line.startsWith("#")) {
factories.add(line);
Class<Factory> className = Factory.class;
URL[] array =
files.stream()
.map(
x -> {
try {
return x.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
})
.toArray(URL[]::new);

URLClassLoader urlClassLoader = URLClassLoader.newInstance(array);
ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, urlClassLoader);

List<FlinkConnectorResource> connectorResources = new ArrayList<>();

for (Factory factory : serviceLoader) {
String factoryClassName = factory.getClass().getName();
if (!factoryClassName.equals("org.apache.flink.table.module.CoreModuleFactory")) {
FlinkConnectorResource connectorResource = new FlinkConnectorResource();
connectorResource.setClassName(factoryClassName);
connectorResource.setFactoryIdentifier(factory.factoryIdentifier());
connectorResource.setRequiredOptions(factory.requiredOptions());
connectorResource.setOptionalOptions(factory.optionalOptions());
connectorResources.add(connectorResource);
}
}

factories.forEach(System.out::println);
for (String factory : factories) {
String packageName = factory.replace('.', '/') + ".class";
JarEntry classEntry = jarFile.getJarEntry(packageName);
InputStream in = jarFile.getInputStream(classEntry);

// TODO parse connector
System.out.println(in);
}
urlClassLoader.close();
System.out.println(connectorResources);
}
}

0 comments on commit b084ca6

Please sign in to comment.