From ca1470d5dd48f9238d13a350be99c174ff600b7b Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Fri, 5 Jul 2024 14:06:57 +0800 Subject: [PATCH] [FLINK-35281][cdc-common] FlinkEnvironmentUtils#addJar add each jar only once (#3301) --- .../composer/flink/FlinkEnvironmentUtils.java | 28 ++++++++-- .../flink/FlinkEnvironmentUtilsTest.java | 53 +++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java index 1f1034c09b..b00717cb0a 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java @@ -22,29 +22,51 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.lang.reflect.Field; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** Utilities for {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */ @Internal public class FlinkEnvironmentUtils { + private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvironmentUtils.class); + + private FlinkEnvironmentUtils() {} /** * Add the specified JAR to {@link StreamExecutionEnvironment} so that the JAR will be uploaded * together with the job graph. */ public static void addJar(StreamExecutionEnvironment env, URL jarUrl) { + addJar(env, Collections.singletonList(jarUrl)); + } + + /** + * Add the specified JARs to {@link StreamExecutionEnvironment} so that the JAR will be uploaded + * together with the job graph. + */ + public static void addJar(StreamExecutionEnvironment env, Collection jarUrls) { try { Class envClass = StreamExecutionEnvironment.class; Field field = envClass.getDeclaredField("configuration"); field.setAccessible(true); Configuration configuration = ((Configuration) field.get(env)); - List jars = + List previousJars = configuration.getOptional(PipelineOptions.JARS).orElse(new ArrayList<>()); - jars.add(jarUrl.toString()); - configuration.set(PipelineOptions.JARS, jars); + List currentJars = + Stream.concat(previousJars.stream(), jarUrls.stream().map(URL::toString)) + .distinct() + .collect(Collectors.toList()); + LOG.info("pipeline.jars is " + String.join(",", currentJars)); + configuration.set(PipelineOptions.JARS, currentJars); } catch (Exception e) { throw new RuntimeException("Failed to add JAR to Flink execution environment", e); } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java new file mode 100644 index 0000000000..70bf2fb3ef --- /dev/null +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.URL; +import java.util.Collections; +import java.util.List; + +/** Test for {@link FlinkEnvironmentUtils}. */ +public class FlinkEnvironmentUtilsTest { + + @Test + public void testAddJars() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(PipelineOptions.JARS, Collections.EMPTY_LIST); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.createLocalEnvironment(configuration); + + FlinkEnvironmentUtils.addJar( + env, Lists.newArrayList(new URL("file://a.jar"), new URL("file://a.jar"))); + List expectedJars = Lists.newArrayList("file://a.jar"); + Assert.assertEquals(expectedJars, env.getConfiguration().get(PipelineOptions.JARS)); + + FlinkEnvironmentUtils.addJar( + env, Lists.newArrayList(new URL("file://b.jar"), new URL("file://a.jar"))); + expectedJars.add("file://b.jar"); + Assert.assertEquals(expectedJars, env.getConfiguration().get(PipelineOptions.JARS)); + } +}