Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35524][cdc-base] Clear connections pools when reader exist. #3388

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
Expand All @@ -37,7 +39,7 @@
* @param <C> The source config of data source.
*/
@Experimental
public interface DataSourceDialect<C extends SourceConfig> extends Serializable {
public interface DataSourceDialect<C extends SourceConfig> extends Serializable, Closeable {

/** Get the name of dialect. */
String getName();
Expand Down Expand Up @@ -78,4 +80,7 @@ default void notifyCheckpointComplete(long checkpointId, Offset offset) throws E

/** Check if the tableId is included in SourceConfig. */
boolean isIncludeDataCollection(C sourceConfig, TableId tableId);

@Override
default void close() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;

import java.io.IOException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -59,4 +61,8 @@ public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfi

@Override
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);

default void close() throws IOException {
JdbcConnectionPools.getInstance(getPooledDataSourceFactory()).clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -83,4 +84,12 @@ public String getJdbcUrl(
}
return jdbcConnectionPoolFactory.getJdbcUrl(sourceConfig);
}

public void clear() throws IOException {
synchronized (pools) {
pools.values().stream().forEach(HikariDataSource::close);
pools.clear();
POOL_FACTORY_MAP.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -218,7 +219,7 @@ public boolean noMoreSplits() {
}

@Override
public void close() {
public void close() throws IOException {
snapshotSplitAssigner.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -398,7 +399,9 @@ && allSnapshotSplitsFinished()) {
}

@Override
public void close() {}
public void close() throws IOException {
dialect.close();
}

@Override
public boolean noMoreSplits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +36,7 @@
* determines split processing order.
*/
@Experimental
public interface SplitAssigner {
public interface SplitAssigner extends Closeable {

/**
* Called to open the assigner to acquire any resources, like threads or network connections.
Expand Down Expand Up @@ -120,5 +122,5 @@ public interface SplitAssigner {
* Called to close the assigner, in case it holds on to any resources, like threads or network
* connections.
*/
void close();
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -133,7 +134,9 @@ public boolean noMoreSplits() {
}

@Override
public void close() {}
public void close() throws IOException {
dialect.close();
}

// ------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -187,7 +188,7 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public void close() {
public void close() throws IOException {
LOG.info("Closing enumerator...");
splitAssigner.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.util.CollectionUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -121,7 +123,10 @@ public void startAssignNewlyAddedTables() {}
public void onBinlogSplitUpdated() {}

@Override
public void close() {}
public void close() throws IOException {
// clear jdbc connection pools
JdbcConnectionPools.getInstance().clear();
}

// ------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
Expand Down Expand Up @@ -489,6 +490,8 @@ public void close() {
if (chunkSplitter != null) {
try {
chunkSplitter.close();
// clear jdbc connection pools
JdbcConnectionPools.getInstance().clear();
} catch (Exception e) {
LOG.warn("Fail to close the chunk splitter.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +36,7 @@
* determines split processing order.
*/
@Internal
public interface MySqlSplitAssigner {
public interface MySqlSplitAssigner extends Closeable {

/**
* Called to open the assigner to acquire any resources, like threads or network connections.
Expand Down Expand Up @@ -120,5 +122,5 @@ public interface MySqlSplitAssigner {
* Called to close the assigner, in case it holds on to any resources, like threads or network
* connections.
*/
void close();
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -51,4 +52,11 @@ public HikariDataSource getOrCreateConnectionPool(
return pools.get(poolId);
}
}

public void clear() throws IOException {
synchronized (pools) {
pools.values().stream().forEach(HikariDataSource::close);
pools.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -190,7 +191,7 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public void close() {
public void close() throws IOException {
LOG.info("Closing enumerator...");
splitAssigner.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.junit.Test;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Optional;

Expand All @@ -40,36 +41,36 @@
public class MySqlBinlogSplitAssignerTest {

@Test
public void testStartFromEarliest() {
public void testStartFromEarliest() throws IOException {
checkAssignedBinlogOffset(StartupOptions.earliest(), BinlogOffset.ofEarliest());
}

@Test
public void testStartFromLatestOffset() {
public void testStartFromLatestOffset() throws IOException {
checkAssignedBinlogOffset(StartupOptions.latest(), BinlogOffset.ofLatest());
}

@Test
public void testStartFromTimestamp() {
public void testStartFromTimestamp() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.timestamp(15213000L), BinlogOffset.ofTimestampSec(15213L));
}

@Test
public void testStartFromBinlogFile() {
public void testStartFromBinlogFile() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.specificOffset("foo-file", 15213),
BinlogOffset.ofBinlogFilePosition("foo-file", 15213L));
}

@Test
public void testStartFromGtidSet() {
public void testStartFromGtidSet() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.specificOffset("foo-gtid"), BinlogOffset.ofGtidSet("foo-gtid"));
}

private void checkAssignedBinlogOffset(
StartupOptions startupOptions, BinlogOffset expectedOffset) {
StartupOptions startupOptions, BinlogOffset expectedOffset) throws IOException {
// Set starting from the given option
MySqlBinlogSplitAssigner assigner = new MySqlBinlogSplitAssigner(getConfig(startupOptions));
// Get splits from assigner
Expand Down
Loading