Skip to content

Commit

Permalink
[FLINK-35524][cdc-base] Clear connections pools when reader exist.
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 committed Aug 1, 2024
1 parent 1388cf9 commit be3a689
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
Expand All @@ -37,7 +39,7 @@
* @param <C> The source config of data source.
*/
@Experimental
public interface DataSourceDialect<C extends SourceConfig> extends Serializable {
public interface DataSourceDialect<C extends SourceConfig> extends Serializable, Closeable {

/** Get the name of dialect. */
String getName();
Expand Down Expand Up @@ -78,4 +80,7 @@ default void notifyCheckpointComplete(long checkpointId, Offset offset) throws E

/** Check if the tableId is included in SourceConfig. */
boolean isIncludeDataCollection(C sourceConfig, TableId tableId);

@Override
default void close() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;

import java.io.IOException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -59,4 +61,8 @@ public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfi

@Override
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);

default void close() throws IOException {
JdbcConnectionPools.getInstance(getPooledDataSourceFactory()).clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -83,4 +84,14 @@ public String getJdbcUrl(
}
return jdbcConnectionPoolFactory.getJdbcUrl(sourceConfig);
}

public void clear() throws IOException {
if (instance != null) {
synchronized (instance.pools) {
instance.pools.values().stream().forEach(HikariDataSource::close);
instance.pools.clear();
POOL_FACTORY_MAP.clear();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -218,7 +219,7 @@ public boolean noMoreSplits() {
}

@Override
public void close() {
public void close() throws IOException {
snapshotSplitAssigner.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -398,7 +399,9 @@ && allSnapshotSplitsFinished()) {
}

@Override
public void close() {}
public void close() throws IOException {
dialect.close();
}

@Override
public boolean noMoreSplits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +36,7 @@
* determines split processing order.
*/
@Experimental
public interface SplitAssigner {
public interface SplitAssigner extends Closeable {

/**
* Called to open the assigner to acquire any resources, like threads or network connections.
Expand Down Expand Up @@ -120,5 +122,5 @@ public interface SplitAssigner {
* Called to close the assigner, in case it holds on to any resources, like threads or network
* connections.
*/
void close();
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -133,7 +134,9 @@ public boolean noMoreSplits() {
}

@Override
public void close() {}
public void close() throws IOException {
dialect.close();
}

// ------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -187,7 +188,7 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public void close() {
public void close() throws IOException {
LOG.info("Closing enumerator...");
splitAssigner.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.util.CollectionUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -121,7 +123,10 @@ public void startAssignNewlyAddedTables() {}
public void onBinlogSplitUpdated() {}

@Override
public void close() {}
public void close() throws IOException {
// clear jdbc connection pools
JdbcConnectionPools.getInstance().clear();
}

// ------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
Expand Down Expand Up @@ -489,6 +490,8 @@ public void close() {
if (chunkSplitter != null) {
try {
chunkSplitter.close();
// clear jdbc connection pools
JdbcConnectionPools.getInstance().clear();
} catch (Exception e) {
LOG.warn("Fail to close the chunk splitter.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +36,7 @@
* determines split processing order.
*/
@Internal
public interface MySqlSplitAssigner {
public interface MySqlSplitAssigner extends Closeable {

/**
* Called to open the assigner to acquire any resources, like threads or network connections.
Expand Down Expand Up @@ -120,5 +122,5 @@ public interface MySqlSplitAssigner {
* Called to close the assigner, in case it holds on to any resources, like threads or network
* connections.
*/
void close();
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -51,4 +52,11 @@ public HikariDataSource getOrCreateConnectionPool(
return pools.get(poolId);
}
}

public synchronized void clear() throws IOException {
synchronized (INSTANCE.pools) {
INSTANCE.pools.values().stream().forEach(HikariDataSource::close);
INSTANCE.pools.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -190,7 +191,7 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public void close() {
public void close() throws IOException {
LOG.info("Closing enumerator...");
splitAssigner.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.junit.Test;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Optional;

Expand All @@ -40,36 +41,36 @@
public class MySqlBinlogSplitAssignerTest {

@Test
public void testStartFromEarliest() {
public void testStartFromEarliest() throws IOException {
checkAssignedBinlogOffset(StartupOptions.earliest(), BinlogOffset.ofEarliest());
}

@Test
public void testStartFromLatestOffset() {
public void testStartFromLatestOffset() throws IOException {
checkAssignedBinlogOffset(StartupOptions.latest(), BinlogOffset.ofLatest());
}

@Test
public void testStartFromTimestamp() {
public void testStartFromTimestamp() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.timestamp(15213000L), BinlogOffset.ofTimestampSec(15213L));
}

@Test
public void testStartFromBinlogFile() {
public void testStartFromBinlogFile() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.specificOffset("foo-file", 15213),
BinlogOffset.ofBinlogFilePosition("foo-file", 15213L));
}

@Test
public void testStartFromGtidSet() {
public void testStartFromGtidSet() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.specificOffset("foo-gtid"), BinlogOffset.ofGtidSet("foo-gtid"));
}

private void checkAssignedBinlogOffset(
StartupOptions startupOptions, BinlogOffset expectedOffset) {
StartupOptions startupOptions, BinlogOffset expectedOffset) throws IOException {
// Set starting from the given option
MySqlBinlogSplitAssigner assigner = new MySqlBinlogSplitAssigner(getConfig(startupOptions));
// Get splits from assigner
Expand Down

0 comments on commit be3a689

Please sign in to comment.