Skip to content

Commit

Permalink
[cdc-connector][cdc-base] Shade guava31 to avoid dependency conflict …
Browse files Browse the repository at this point in the history
…with flink below 1.18
  • Loading branch information
loserwang1024 authored and loserwang committed Aug 2, 2024
1 parent 8f2939e commit 9d4e42d
Show file tree
Hide file tree
Showing 30 changed files with 326 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,6 @@ This guide provides a simple `pom.xml` example for packaging DataStream job JARs
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>30.1.1-jre-16.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
Expand Down Expand Up @@ -178,8 +166,6 @@ This guide provides a simple `pom.xml` example for packaging DataStream job JARs
<include>com.google.guava:*</include>
<include>com.esri.geometry:esri-geometry-api</include>
<include>com.zaxxer:HikariCP</include>
<!-- Include fixed version 30.1.1-jre-16.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<relocations>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,6 @@ This guide provides a simple `pom.xml` example for packaging DataStream job JARs
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>30.1.1-jre-16.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
Expand Down Expand Up @@ -178,8 +166,6 @@ This guide provides a simple `pom.xml` example for packaging DataStream job JARs
<include>com.google.guava:*</include>
<include>com.esri.geometry:esri-geometry-api</include>
<include>com.zaxxer:HikariCP</include>
<!-- Include fixed version 30.1.1-jre-16.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<relocations>
Expand Down
6 changes: 6 additions & 0 deletions flink-cdc-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ limitations under the License.
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-${flink.shaded.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.flink.cdc.common.annotation.PublicEvolving;

import org.apache.flink.shaded.guava31.com.google.common.base.Strings;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -57,11 +55,6 @@ public static TextElement text(String text) {
return new TextElement(text, Collections.emptyList());
}

/** Wraps a list of {@link InlineElement}s into a single {@link TextElement}. */
public static InlineElement wrap(InlineElement... elements) {
return text(Strings.repeat("%s", elements.length), elements);
}

/**
* Creates a block of text formatted as code.
*
Expand Down
53 changes: 53 additions & 0 deletions flink-cdc-composer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,59 @@ limitations under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-${flink.shaded.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink-guava</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- Shading test jar have bug in some previous version, so close this configuration here,
see https://issues.apache.org/jira/browse/MSHADE-284 -->
<shadeTestJar>false</shadeTestJar>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters combine.children="append">
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<includes>
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.flink.shaded.guava</pattern>
<shadedPattern>org.apache.cdc.flink.shaded.guava</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ limitations under the License.
<include>com.google.guava:*</include>
<include>com.esri.geometry:esri-geometry-api</include>
<include>com.zaxxer:HikariCP</include>
<!-- Include fixed version 30.1.1-jre-14.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.table.api.ValidationException;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;


import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -44,14 +45,12 @@ void testCreateDataSink() {
FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class);
Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class);

Configuration conf =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
.put("load-url", "127.0.0.1:8030")
.put("username", "root")
.put("password", "")
.build());
Map configMap = new HashMap<>();
configMap.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
configMap.put("load-url", "127.0.0.1:8030");
configMap.put("username", "root");
configMap.put("password", "");
Configuration conf = Configuration.fromMap(configMap);
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimestampType;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;

import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.starrocks.connector.flink.catalog.StarRocksTable;
import org.junit.Before;
Expand All @@ -40,7 +38,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS;
import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT;
Expand All @@ -55,13 +55,11 @@ public class StarRocksMetadataApplierTest {

@Before
public void setup() {
Configuration configuration =
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(TABLE_SCHEMA_CHANGE_TIMEOUT.key(), "100s")
.put(TABLE_CREATE_NUM_BUCKETS.key(), "10")
.put("table.create.properties.replication_num", "5")
.build());
Map configMap = new HashMap<>();
configMap.put(TABLE_SCHEMA_CHANGE_TIMEOUT.key(), "100s");
configMap.put(TABLE_CREATE_NUM_BUCKETS.key(), "10");
configMap.put("table.create.properties.replication_num", "5");
Configuration configuration = Configuration.fromMap(configMap);
SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(configuration);
TableCreateConfig tableCreateConfig = TableCreateConfig.from(configuration);
this.catalog = new MockStarRocksCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,7 +73,7 @@ public class IncrementalSourceEnumerator

// using TreeSet to prefer assigning stream split to task-0 for easier debug
protected final TreeSet<Integer> readersAwaitingSplit;
private List<List<FinishedSnapshotSplitInfo>> finishedSnapshotSplitMeta;
private List<FinishedSnapshotSplitInfo> finishedSnapshotSplitMeta;

private Boundedness boundedness;

Expand Down Expand Up @@ -302,9 +300,7 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent
throw new FlinkRuntimeException(
"The assigner offer empty finished split information, this should not happen");
}
finishedSnapshotSplitMeta =
Lists.partition(
finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
finishedSnapshotSplitMeta = finishedSnapshotSplitInfos;
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize();
Expand All @@ -324,7 +320,7 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent
context.sendEventToSourceReader(subTask, metadataEvent);
} else if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaToSend =
finishedSnapshotSplitMeta.get(requestMetaGroupId);
getPartition(requestMetaGroupId, sourceConfig.getSplitMetaGroupSize());
StreamSplitMetaEvent metadataEvent =
new StreamSplitMetaEvent(
requestEvent.getSplitId(),
Expand Down Expand Up @@ -353,4 +349,15 @@ private void handleLatestFinishedSplitNumberRequest(int subtaskId) {
splitAssigner.getFinishedSplitInfos().size()));
}
}

private List<FinishedSnapshotSplitInfo> getPartition(int index, int size) {
int start = index * size;
int end = Math.min(start + size, finishedSnapshotSplitMeta.size());
return finishedSnapshotSplitMeta.subList(start, end);
}

private int getPartitionNumber(int partitionSize) {
return finishedSnapshotSplitMeta.size() / partitionSize
+ (finishedSnapshotSplitMeta.size() % partitionSize == 0 ? 0 : 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.utils.ThreadUtil;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -74,11 +73,9 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int subtaskId) {
this.taskContext = taskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("debezium-snapshot-reader-" + subtaskId)
.setUncaughtExceptionHandler(
(thread, throwable) -> setReadException(throwable))
.build();
ThreadUtil.buildThreadFactory(
"debezium-snapshot-reader-" + subtaskId,
(thread, throwable) -> setReadException(throwable));
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.hasNextElement = new AtomicBoolean(false);
this.reachEnd = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.utils.ThreadUtil;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -69,8 +68,7 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So

public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTaskId) {
this.taskContext = taskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
ThreadFactory threadFactory = ThreadUtil.buildThreadFactory("debezium-reader-" + subTaskId);
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true;
this.pureStreamPhaseTables = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.base.utils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/** A utility class to help build thread Factory. */
public class ThreadUtil {
private static final AtomicLong count = new AtomicLong(0);

public static ThreadFactory buildThreadFactory(String nameFormat) {
return buildThreadFactory(nameFormat, null);
}

public static ThreadFactory buildThreadFactory(
String nameFormat, Thread.UncaughtExceptionHandler eh) {
return r -> {
Thread t = new Thread(nameFormat + "-" + count.get());
if (eh != null) {
t.setUncaughtExceptionHandler(eh);
}
return t;
};
}
}
Loading

0 comments on commit 9d4e42d

Please sign in to comment.