diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index e5e45a722c6621..ff9233381cffc1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -804,6 +804,9 @@ DEFINE_mInt32(segment_compression_threshold_kb, "256"); // The connection timeout when connecting to external table such as odbc table. DEFINE_mInt32(external_table_connect_timeout_sec, "30"); +// Time to clean up useless JDBC connection pool cache +DEFINE_mInt32(jdbc_connection_pool_cache_clear_time_sec, "28800"); + // Global bitmap cache capacity for aggregation cache, size in bytes DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600"); DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4813490e94943e..ff703680336797 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -858,6 +858,9 @@ DECLARE_mInt32(segment_compression_threshold_kb); // The connection timeout when connecting to external table such as odbc table. DECLARE_mInt32(external_table_connect_timeout_sec); +// Time to clean up useless JDBC connection pool cache +DECLARE_mInt32(jdbc_connection_pool_cache_clear_time_sec); + // Global bitmap cache capacity for aggregation cache, size in bytes DECLARE_Int64(delete_bitmap_agg_cache_capacity); DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec); diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 1f70c8e281b066..0f6f4319189716 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -258,6 +258,7 @@ std::string ODBCTableDescriptor::debug_string() const { JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc), + _jdbc_catalog_id(tdesc.jdbcTable.catalog_id), _jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name), _jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url), _jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class), @@ -266,24 +267,26 @@ JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc) _jdbc_table_name(tdesc.jdbcTable.jdbc_table_name), _jdbc_user(tdesc.jdbcTable.jdbc_user), _jdbc_passwd(tdesc.jdbcTable.jdbc_password), - _jdbc_min_pool_size(tdesc.jdbcTable.jdbc_min_pool_size), - _jdbc_max_pool_size(tdesc.jdbcTable.jdbc_max_pool_size), - _jdbc_max_idle_time(tdesc.jdbcTable.jdbc_max_idle_time), - _jdbc_max_wait_time(tdesc.jdbcTable.jdbc_max_wait_time), - _jdbc_keep_alive(tdesc.jdbcTable.jdbc_keep_alive) {} + _connection_pool_min_size(tdesc.jdbcTable.connection_pool_min_size), + _connection_pool_max_size(tdesc.jdbcTable.connection_pool_max_size), + _connection_pool_max_wait_time(tdesc.jdbcTable.connection_pool_max_wait_time), + _connection_pool_max_life_time(tdesc.jdbcTable.connection_pool_max_life_time), + _connection_pool_keep_alive(tdesc.jdbcTable.connection_pool_keep_alive) {} std::string JdbcTableDescriptor::debug_string() const { fmt::memory_buffer buf; - fmt::format_to(buf, - "JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} " - ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} " - ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_jdbc_min_pool_size={} " - ",_jdbc_max_pool_size={} ,_jdbc_max_idle_time={} ,_jdbc_max_wait_time={} " - ",_jdbc_keep_alive={})", - TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url, - _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name, - _jdbc_user, _jdbc_passwd, _jdbc_min_pool_size, _jdbc_max_pool_size, - _jdbc_max_idle_time, _jdbc_max_wait_time, _jdbc_keep_alive); + fmt::format_to( + buf, + "JDBCTable({} ,_jdbc_catalog_id = {}, _jdbc_resource_name={} ,_jdbc_driver_url={} " + ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} " + ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_connection_pool_min_size={} " + ",_connection_pool_max_size={} ,_connection_pool_max_wait_time={} " + ",_connection_pool_max_life_time={} ,_connection_pool_keep_alive={})", + TableDescriptor::debug_string(), _jdbc_catalog_id, _jdbc_resource_name, + _jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, + _jdbc_table_name, _jdbc_user, _jdbc_passwd, _connection_pool_min_size, + _connection_pool_max_size, _connection_pool_max_wait_time, + _connection_pool_max_life_time, _connection_pool_keep_alive); return fmt::to_string(buf); } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 42fa67a0f84f52..4aa3d28e47dd5e 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -310,6 +310,7 @@ class JdbcTableDescriptor : public TableDescriptor { public: JdbcTableDescriptor(const TTableDescriptor& tdesc); std::string debug_string() const override; + int64_t jdbc_catalog_id() const { return _jdbc_catalog_id; } const std::string& jdbc_resource_name() const { return _jdbc_resource_name; } const std::string& jdbc_driver_url() const { return _jdbc_driver_url; } const std::string& jdbc_driver_class() const { return _jdbc_driver_class; } @@ -318,13 +319,14 @@ class JdbcTableDescriptor : public TableDescriptor { const std::string& jdbc_table_name() const { return _jdbc_table_name; } const std::string& jdbc_user() const { return _jdbc_user; } const std::string& jdbc_passwd() const { return _jdbc_passwd; } - int32_t jdbc_min_pool_size() const { return _jdbc_min_pool_size; } - int32_t jdbc_max_pool_size() const { return _jdbc_max_pool_size; } - int32_t jdbc_max_idle_time() const { return _jdbc_max_idle_time; } - int32_t jdbc_max_wait_time() const { return _jdbc_max_wait_time; } - bool jdbc_keep_alive() const { return _jdbc_keep_alive; } + int32_t connection_pool_min_size() const { return _connection_pool_min_size; } + int32_t connection_pool_max_size() const { return _connection_pool_max_size; } + int32_t connection_pool_max_wait_time() const { return _connection_pool_max_wait_time; } + int32_t connection_pool_max_life_time() const { return _connection_pool_max_life_time; } + bool connection_pool_keep_alive() const { return _connection_pool_keep_alive; } private: + int64_t _jdbc_catalog_id; std::string _jdbc_resource_name; std::string _jdbc_driver_url; std::string _jdbc_driver_class; @@ -333,11 +335,11 @@ class JdbcTableDescriptor : public TableDescriptor { std::string _jdbc_table_name; std::string _jdbc_user; std::string _jdbc_passwd; - int32_t _jdbc_min_pool_size; - int32_t _jdbc_max_pool_size; - int32_t _jdbc_max_idle_time; - int32_t _jdbc_max_wait_time; - bool _jdbc_keep_alive; + int32_t _connection_pool_min_size; + int32_t _connection_pool_max_size; + int32_t _connection_pool_max_wait_time; + int32_t _connection_pool_max_life_time; + bool _connection_pool_keep_alive; }; class TupleDescriptor { diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index f403dad6c73aac..e0c64b58fb5073 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -85,6 +85,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con if (jdbc_table == nullptr) { return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare."); } + _jdbc_param.catalog_id = jdbc_table->jdbc_catalog_id(); _jdbc_param.driver_class = jdbc_table->jdbc_driver_class(); _jdbc_param.driver_path = jdbc_table->jdbc_driver_url(); _jdbc_param.resource_name = jdbc_table->jdbc_resource_name(); @@ -95,11 +96,11 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con _jdbc_param.tuple_desc = _tuple_desc; _jdbc_param.query_string = std::move(_query_string); _jdbc_param.table_type = _table_type; - _jdbc_param.min_pool_size = jdbc_table->jdbc_min_pool_size(); - _jdbc_param.max_pool_size = jdbc_table->jdbc_max_pool_size(); - _jdbc_param.max_idle_time = jdbc_table->jdbc_max_idle_time(); - _jdbc_param.max_wait_time = jdbc_table->jdbc_max_wait_time(); - _jdbc_param.keep_alive = jdbc_table->jdbc_keep_alive(); + _jdbc_param.connection_pool_min_size = jdbc_table->connection_pool_min_size(); + _jdbc_param.connection_pool_max_size = jdbc_table->connection_pool_max_size(); + _jdbc_param.connection_pool_max_life_time = jdbc_table->connection_pool_max_life_time(); + _jdbc_param.connection_pool_max_wait_time = jdbc_table->connection_pool_max_wait_time(); + _jdbc_param.connection_pool_keep_alive = jdbc_table->connection_pool_keep_alive(); if (get_parent() != nullptr) { get_parent()->_scanner_profile->add_info_string("JdbcDriverClass", diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index f20df0af98f8ec..e6419ec95e73bc 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -135,6 +135,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { TJdbcExecutorCtorParams ctor_params; ctor_params.__set_statement(_sql_str); + ctor_params.__set_catalog_id(_conn_param.catalog_id); ctor_params.__set_jdbc_url(_conn_param.jdbc_url); ctor_params.__set_jdbc_user(_conn_param.user); ctor_params.__set_jdbc_password(_conn_param.passwd); @@ -143,11 +144,13 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { ctor_params.__set_batch_size(read ? state->batch_size() : 0); ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE); ctor_params.__set_table_type(_conn_param.table_type); - ctor_params.__set_min_pool_size(_conn_param.min_pool_size); - ctor_params.__set_max_pool_size(_conn_param.max_pool_size); - ctor_params.__set_max_idle_time(_conn_param.max_idle_time); - ctor_params.__set_max_wait_time(_conn_param.max_wait_time); - ctor_params.__set_keep_alive(_conn_param.keep_alive); + ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size); + ctor_params.__set_connection_pool_max_size(_conn_param.connection_pool_max_size); + ctor_params.__set_connection_pool_max_wait_time(_conn_param.connection_pool_max_wait_time); + ctor_params.__set_connection_pool_max_life_time(_conn_param.connection_pool_max_life_time); + ctor_params.__set_connection_pool_cache_clear_time( + config::jdbc_connection_pool_cache_clear_time_sec); + ctor_params.__set_connection_pool_keep_alive(_conn_param.connection_pool_keep_alive); jbyteArray ctor_params_bytes; // Pushed frame will be popped when jni_frame goes out-of-scope. diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 5d8ac121323319..2ecdf210fac630 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -43,6 +43,7 @@ class IColumn; class VExprContext; struct JdbcConnectorParam { + int64_t catalog_id; std::string driver_path; std::string driver_class; std::string resource_name; @@ -54,11 +55,11 @@ struct JdbcConnectorParam { std::string table_name; bool use_transaction; TOdbcTableType::type table_type; - int32_t min_pool_size; - int32_t max_pool_size; - int32_t max_idle_time; - int32_t max_wait_time; - bool keep_alive; + int32_t connection_pool_min_size; + int32_t connection_pool_max_size; + int32_t connection_pool_max_wait_time; + int32_t connection_pool_max_life_time; + bool connection_pool_keep_alive; const TupleDescriptor* tuple_desc = nullptr; }; diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp b/be/src/vec/sink/writer/vjdbc_table_writer.cpp index f7e4941892356f..9493bfbf0723b0 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp @@ -35,6 +35,7 @@ JdbcConnectorParam VJdbcTableWriter::create_connect_param(const doris::TDataSink JdbcConnectorParam jdbc_param; + jdbc_param.catalog_id = t_jdbc_sink.jdbc_table.catalog_id; jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url; jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user; jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password; @@ -46,11 +47,11 @@ JdbcConnectorParam VJdbcTableWriter::create_connect_param(const doris::TDataSink jdbc_param.query_string = t_jdbc_sink.insert_sql; jdbc_param.table_name = t_jdbc_sink.jdbc_table.jdbc_table_name; jdbc_param.use_transaction = t_jdbc_sink.use_transaction; - jdbc_param.min_pool_size = t_jdbc_sink.jdbc_table.jdbc_min_pool_size; - jdbc_param.max_pool_size = t_jdbc_sink.jdbc_table.jdbc_max_pool_size; - jdbc_param.max_idle_time = t_jdbc_sink.jdbc_table.jdbc_max_idle_time; - jdbc_param.max_wait_time = t_jdbc_sink.jdbc_table.jdbc_max_wait_time; - jdbc_param.keep_alive = t_jdbc_sink.jdbc_table.jdbc_keep_alive; + jdbc_param.connection_pool_min_size = t_jdbc_sink.jdbc_table.connection_pool_min_size; + jdbc_param.connection_pool_max_size = t_jdbc_sink.jdbc_table.connection_pool_max_size; + jdbc_param.connection_pool_max_wait_time = t_jdbc_sink.jdbc_table.connection_pool_max_wait_time; + jdbc_param.connection_pool_max_life_time = t_jdbc_sink.jdbc_table.connection_pool_max_life_time; + jdbc_param.connection_pool_keep_alive = t_jdbc_sink.jdbc_table.connection_pool_keep_alive; return jdbc_param; } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java index 2fd0acf436d32b..3c8ac38cf7dcd7 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java @@ -18,27 +18,77 @@ package org.apache.doris.jdbc; import com.alibaba.druid.pool.DruidDataSource; +import org.apache.log4j.Logger; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; public class JdbcDataSource { + private static final Logger LOG = Logger.getLogger(JdbcDataSource.class); private static final JdbcDataSource jdbcDataSource = new JdbcDataSource(); private final Map sourcesMap = new ConcurrentHashMap<>(); + private final Map lastAccessTimeMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private long cleanupInterval = 8 * 60 * 60 * 1000; // 8 hours + private ScheduledFuture cleanupTask = null; + + private JdbcDataSource() { + startCleanupTask(); + } public static JdbcDataSource getDataSource() { return jdbcDataSource; } public DruidDataSource getSource(String cacheKey) { + lastAccessTimeMap.put(cacheKey, System.currentTimeMillis()); return sourcesMap.get(cacheKey); } public void putSource(String cacheKey, DruidDataSource ds) { sourcesMap.put(cacheKey, ds); + lastAccessTimeMap.put(cacheKey, System.currentTimeMillis()); } public Map getSourcesMap() { return sourcesMap; } + + public void setCleanupInterval(long interval) { + this.cleanupInterval = interval * 1000L; + restartCleanupTask(); + } + + private synchronized void restartCleanupTask() { + if (cleanupTask != null && !cleanupTask.isCancelled()) { + cleanupTask.cancel(false); + } + cleanupTask = executor.scheduleAtFixedRate(() -> { + try { + long now = System.currentTimeMillis(); + lastAccessTimeMap.forEach((key, lastAccessTime) -> { + if (now - lastAccessTime > cleanupInterval) { + DruidDataSource ds = sourcesMap.remove(key); + if (ds != null) { + ds.close(); + } + lastAccessTimeMap.remove(key); + LOG.info("remove jdbc data source: " + key.split("jdbc")[0]); + } + }); + } catch (Exception e) { + LOG.error("failed to cleanup jdbc data source", e); + } + }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS); + } + + private void startCleanupTask() { + if (cleanupTask == null || cleanupTask.isCancelled()) { + restartCleanupTask(); + } + } } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java index be32568bd2ec5a..dcf576986fef53 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java @@ -21,6 +21,7 @@ import org.apache.doris.thrift.TOdbcTableType; public class JdbcDataSourceConfig { + private Long catalogId; private String jdbcUrl; private String jdbcUser; private String jdbcPassword; @@ -29,16 +30,25 @@ public class JdbcDataSourceConfig { private int batchSize; private TJdbcOperation op; private TOdbcTableType tableType; - private int minPoolSize; - private int maxPoolSize; - private int minIdleSize; - private int maxIdleTime; - private int maxWaitTime; - private boolean keepAlive; + private int connectionPoolMinSize; + private int connectionPoolMaxSize; + private int connectionPoolMaxWaitTime; + private int connectionPoolMaxLifeTime; + private boolean connectionPoolKeepAlive; public String createCacheKey() { - return jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + jdbcDriverClass - + minPoolSize + maxPoolSize + minIdleSize + maxIdleTime + maxWaitTime + keepAlive; + return catalogId + jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + jdbcDriverClass + + connectionPoolMinSize + connectionPoolMaxSize + connectionPoolMaxLifeTime + connectionPoolMaxWaitTime + + connectionPoolKeepAlive; + } + + public long getCatalogId() { + return catalogId; + } + + public JdbcDataSourceConfig setCatalogId(long catalogId) { + this.catalogId = catalogId; + return this; } public String getJdbcUrl() { @@ -113,57 +123,48 @@ public JdbcDataSourceConfig setTableType(TOdbcTableType tableType) { return this; } - public int getMinPoolSize() { - return minPoolSize; - } - - public JdbcDataSourceConfig setMinPoolSize(int minPoolSize) { - this.minPoolSize = minPoolSize; - return this; - } - - public int getMaxPoolSize() { - return maxPoolSize; + public int getConnectionPoolMinSize() { + return connectionPoolMinSize; } - public JdbcDataSourceConfig setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; + public JdbcDataSourceConfig setConnectionPoolMinSize(int connectionPoolMinSize) { + this.connectionPoolMinSize = connectionPoolMinSize; return this; } - public int getMinIdleSize() { - return minIdleSize; + public int getConnectionPoolMaxSize() { + return connectionPoolMaxSize; } - public JdbcDataSourceConfig setMinIdleSize(int minIdleSize) { - this.minIdleSize = minIdleSize; + public JdbcDataSourceConfig setConnectionPoolMaxSize(int connectionPoolMaxSize) { + this.connectionPoolMaxSize = connectionPoolMaxSize; return this; } - public int getMaxIdleTime() { - return maxIdleTime; + public int getConnectionPoolMaxWaitTime() { + return connectionPoolMaxWaitTime; } - public JdbcDataSourceConfig setMaxIdleTime(int maxIdleTime) { - this.maxIdleTime = maxIdleTime; + public JdbcDataSourceConfig setConnectionPoolMaxWaitTime(int connectionPoolMaxWaitTime) { + this.connectionPoolMaxWaitTime = connectionPoolMaxWaitTime; return this; } - public int getMaxWaitTime() { - return maxWaitTime; + public int getConnectionPoolMaxLifeTime() { + return connectionPoolMaxLifeTime; } - public JdbcDataSourceConfig setMaxWaitTime(int maxWaitTime) { - this.maxWaitTime = maxWaitTime; + public JdbcDataSourceConfig setConnectionPoolMaxLifeTime(int connectionPoolMaxLifeTime) { + this.connectionPoolMaxLifeTime = connectionPoolMaxLifeTime; return this; } - public boolean isKeepAlive() { - return keepAlive; + public boolean isConnectionPoolKeepAlive() { + return connectionPoolKeepAlive; } - public JdbcDataSourceConfig setKeepAlive(boolean keepAlive) { - this.keepAlive = keepAlive; + public JdbcDataSourceConfig setConnectionPoolKeepAlive(boolean connectionPoolKeepAlive) { + this.connectionPoolKeepAlive = connectionPoolKeepAlive; return this; } } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java index 73773cd1b04787..8c684219af3cbc 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java @@ -93,7 +93,7 @@ public class JdbcExecutor { private int curBlockRows = 0; private static final byte[] emptyBytes = new byte[0]; private DruidDataSource druidDataSource = null; - private byte[] druidDataSourceLock = new byte[0]; + private final byte[] druidDataSourceLock = new byte[0]; private TOdbcTableType tableType; private JdbcDataSourceConfig config; @@ -107,6 +107,7 @@ public JdbcExecutor(byte[] thriftParams) throws Exception { } tableType = request.table_type; this.config = new JdbcDataSourceConfig() + .setCatalogId(request.catalog_id) .setJdbcUser(request.jdbc_user) .setJdbcPassword(request.jdbc_password) .setJdbcUrl(request.jdbc_url) @@ -115,42 +116,54 @@ public JdbcExecutor(byte[] thriftParams) throws Exception { .setBatchSize(request.batch_size) .setOp(request.op) .setTableType(request.table_type) - .setMinPoolSize(request.min_pool_size) - .setMaxPoolSize(request.max_pool_size) - .setMaxIdleTime(request.max_idle_time) - .setMaxWaitTime(request.max_wait_time) - .setMinIdleSize(request.min_pool_size > 0 ? 1 : 0) - .setKeepAlive(request.keep_alive); + .setConnectionPoolMinSize(request.connection_pool_min_size) + .setConnectionPoolMaxSize(request.connection_pool_max_size) + .setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time) + .setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time) + .setConnectionPoolKeepAlive(request.connection_pool_keep_alive); + JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time); init(config, request.statement); } public void close() throws Exception { try { if (stmt != null) { - stmt.cancel(); + try { + stmt.cancel(); + } catch (SQLException e) { + LOG.error("Error cancelling statement", e); + } } if (conn != null && resultSet != null) { abortReadConnection(conn, resultSet, tableType); - } - if (config.getMinIdleSize() == 0) { - // it can be immediately closed if there is no need to maintain the cache of datasource - druidDataSource.close(); - JdbcDataSource.getDataSource().getSourcesMap().clear(); - druidDataSource = null; + try { + resultSet.close(); + } catch (SQLException e) { + LOG.error("Error closing resultSet", e); + } + try { + stmt.close(); + } catch (SQLException e) { + LOG.error("Error closing statement", e); + } } } finally { - if (stmt != null) { - stmt.close(); - } - if (resultSet != null) { - resultSet.close(); + if (conn != null && !conn.isClosed()) { + try { + conn.close(); + } catch (SQLException e) { + LOG.error("Error closing connection", e); + } } - if (conn != null) { - conn.close(); + } + + if (config.getConnectionPoolMinSize() == 0) { + // Close and remove the datasource if necessary + if (druidDataSource != null) { + druidDataSource.close(); + JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); + druidDataSource = null; } - resultSet = null; - stmt = null; - conn = null; } } @@ -322,26 +335,30 @@ private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeExce ds.setUrl(config.getJdbcUrl()); ds.setUsername(config.getJdbcUser()); ds.setPassword(config.getJdbcPassword()); - ds.setMinIdle(config.getMinIdleSize()); - ds.setInitialSize(config.getMinPoolSize()); - ds.setMaxActive(config.getMaxPoolSize()); - ds.setMaxWait(config.getMaxWaitTime()); + ds.setMinIdle(config.getConnectionPoolMinSize()); // default 1 + ds.setInitialSize(config.getConnectionPoolMinSize()); // default 1 + ds.setMaxActive(config.getConnectionPoolMaxSize()); // default 10 + ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000 ds.setTestWhileIdle(true); ds.setTestOnBorrow(false); setValidationQuery(ds, config.getTableType()); - ds.setTimeBetweenEvictionRunsMillis(config.getMaxIdleTime() / 5); - ds.setMinEvictableIdleTimeMillis(config.getMaxIdleTime()); - ds.setKeepAlive(config.isKeepAlive()); + // default 3 min + ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() / 10L); + // default 15 min + ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L); + // default 30 min + ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime()); + ds.setKeepAlive(config.isConnectionPoolKeepAlive()); + // default 6 min + ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L); druidDataSource = ds; - // and the default datasource init = 1, min = 1, max = 100, if one of connection idle - // time greater than 10 minutes. then connection will be retrieved. JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds); - LOG.info("JdbcExecutor set minPoolSize = " + config.getMinPoolSize() - + ", maxPoolSize = " + config.getMaxPoolSize() - + ", maxIdleTime = " + config.getMaxIdleTime() - + ", maxWaitTime = " + config.getMaxWaitTime() - + ", minIdleSize = " + config.getMinIdleSize() - + ", keepAlive = " + config.isKeepAlive()); + LOG.info("JdbcClient set" + + " ConnectionPoolMinSize = " + config.getConnectionPoolMinSize() + + ", ConnectionPoolMaxSize = " + config.getConnectionPoolMaxSize() + + ", ConnectionPoolMaxWaitTime = " + config.getConnectionPoolMaxWaitTime() + + ", ConnectionPoolMaxLifeTime = " + config.getConnectionPoolMaxLifeTime() + + ", ConnectionPoolKeepAlive = " + config.isConnectionPoolKeepAlive()); LOG.info("init datasource [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( System.currentTimeMillis() - start) + " ms"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index ce0805bfb7d566..d95f77b0b7e598 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -40,12 +40,14 @@ import java.net.URISyntaxException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Map; /** * External JDBC Catalog resource for external table query. - * + *

* create external resource jdbc_mysql * properties ( * "type"="jdbc", @@ -55,7 +57,7 @@ * "driver_url"="http://127.0.0.1:8888/mysql-connector-java-5.1.47.jar", * "driver_class"="com.mysql.jdbc.Driver" * ); - * + *

* DROP RESOURCE "jdbc_mysql"; */ public class JdbcResource extends Resource { @@ -94,12 +96,13 @@ public class JdbcResource extends Resource { public static final String TYPE = "type"; public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database"; public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names"; - public static final String MIN_POOL_SIZE = "min_pool_size"; - public static final String MAX_POOL_SIZE = "max_pool_size"; - public static final String MAX_IDLE_TIME = "max_idle_time"; - public static final String MAX_WAIT_TIME = "max_wait_time"; - public static final String KEEP_ALIVE = "keep_alive"; + public static final String CONNECTION_POOL_MIN_SIZE = "connection_pool_min_size"; + public static final String CONNECTION_POOL_MAX_SIZE = "connection_pool_max_size"; + public static final String CONNECTION_POOL_MAX_WAIT_TIME = "connection_pool_max_wait_time"; + public static final String CONNECTION_POOL_MAX_LIFE_TIME = "connection_pool_max_life_time"; + public static final String CONNECTION_POOL_KEEP_ALIVE = "connection_pool_keep_alive"; public static final String CHECK_SUM = "checksum"; + public static final String CREATE_TIME = "create_time"; private static final ImmutableList ALL_PROPERTIES = new ImmutableList.Builder().add( JDBC_URL, USER, @@ -107,21 +110,27 @@ public class JdbcResource extends Resource { DRIVER_CLASS, DRIVER_URL, TYPE, + CREATE_TIME, ONLY_SPECIFIED_DATABASE, LOWER_CASE_TABLE_NAMES, INCLUDE_DATABASE_LIST, - EXCLUDE_DATABASE_LIST + EXCLUDE_DATABASE_LIST, + CONNECTION_POOL_MIN_SIZE, + CONNECTION_POOL_MAX_SIZE, + CONNECTION_POOL_MAX_LIFE_TIME, + CONNECTION_POOL_MAX_WAIT_TIME, + CONNECTION_POOL_KEEP_ALIVE ).build(); private static final ImmutableList OPTIONAL_PROPERTIES = new ImmutableList.Builder().add( ONLY_SPECIFIED_DATABASE, LOWER_CASE_TABLE_NAMES, INCLUDE_DATABASE_LIST, EXCLUDE_DATABASE_LIST, - MIN_POOL_SIZE, - MAX_POOL_SIZE, - MAX_IDLE_TIME, - MAX_WAIT_TIME, - KEEP_ALIVE + CONNECTION_POOL_MIN_SIZE, + CONNECTION_POOL_MAX_SIZE, + CONNECTION_POOL_MAX_LIFE_TIME, + CONNECTION_POOL_MAX_WAIT_TIME, + CONNECTION_POOL_KEEP_ALIVE ).build(); // The default value of optional properties @@ -133,11 +142,11 @@ public class JdbcResource extends Resource { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, ""); - OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MIN_POOL_SIZE, "1"); - OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_POOL_SIZE, "100"); - OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_IDLE_TIME, "30000"); - OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_WAIT_TIME, "5000"); - OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(KEEP_ALIVE, "false"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "10"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "1800000"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false"); } // timeout for both connection and read. 10 seconds is long enough. @@ -153,7 +162,7 @@ public JdbcResource(String name) { this(name, Maps.newHashMap()); } - private JdbcResource(String name, Map configs) { + public JdbcResource(String name, Map configs) { super(name, ResourceType.JDBC); this.configs = configs; } @@ -183,13 +192,11 @@ public void checkProperties(Map properties) throws AnalysisExcep @Override protected void setProperties(Map properties) throws DdlException { Preconditions.checkState(properties != null); - for (String key : properties.keySet()) { - if (!ALL_PROPERTIES.contains(key)) { - throw new DdlException("JDBC resource Property of " + key + " is unknown"); - } - } + validateProperties(properties); configs = properties; - handleOptionalArguments(); + applyDefaultProperties(); + String currentDateTime = LocalDateTime.now(ZoneId.systemDefault()).toString().replace("T", " "); + configs.put(CREATE_TIME, currentDateTime); // check properties for (String property : ALL_PROPERTIES) { String value = configs.get(property); @@ -205,7 +212,9 @@ protected void setProperties(Map properties) throws DdlException * This function used to handle optional arguments * eg: only_specified_database态lower_case_table_names */ - private void handleOptionalArguments() { + + @Override + public void applyDefaultProperties() { for (String s : OPTIONAL_PROPERTIES) { if (!configs.containsKey(s)) { configs.put(s, OPTIONAL_PROPERTIES_DEFAULT_VALUE.get(s)); @@ -245,7 +254,7 @@ public static String computeObjectChecksum(String driverPath) throws DdlExceptio String fullDriverUrl = getFullDriverUrl(driverPath); try (InputStream inputStream = - Util.getInputStreamFromUrl(fullDriverUrl, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS)) { + Util.getInputStreamFromUrl(fullDriverUrl, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS)) { MessageDigest digest = MessageDigest.getInstance("MD5"); byte[] buf = new byte[4096]; int bytesRead = 0; @@ -399,4 +408,56 @@ private static String getDelimiter(String jdbcUrl, String dbType) { } } + public static String getDefaultPropertyValue(String propertyName) { + return OPTIONAL_PROPERTIES_DEFAULT_VALUE.getOrDefault(propertyName, ""); + } + + public static void validateProperties(Map properties) throws DdlException { + for (String key : properties.keySet()) { + if (!ALL_PROPERTIES.contains(key)) { + throw new DdlException("JDBC resource Property of " + key + " is unknown"); + } + } + } + + public static void checkBooleanProperty(String propertyName, String propertyValue) throws DdlException { + if (!propertyValue.equalsIgnoreCase("true") && !propertyValue.equalsIgnoreCase("false")) { + throw new DdlException(propertyName + " must be true or false"); + } + } + + public static void checkDatabaseListProperties(String onlySpecifiedDatabase, + Map includeDatabaseList, Map excludeDatabaseList) throws DdlException { + if (!onlySpecifiedDatabase.equalsIgnoreCase("true")) { + if ((includeDatabaseList != null && !includeDatabaseList.isEmpty()) || (excludeDatabaseList != null + && !excludeDatabaseList.isEmpty())) { + throw new DdlException( + "include_database_list and exclude_database_list " + + "cannot be set when only_specified_database is false"); + } + } + } + + public static void checkConnectionPoolProperties(int minSize, int maxSize, int maxWaitTime, int maxLifeTime) + throws DdlException { + if (minSize < 0) { + throw new DdlException("connection_pool_min_size must be greater than or equal to 0"); + } + if (maxSize < 1) { + throw new DdlException("connection_pool_max_size must be greater than or equal to 1"); + } + if (maxSize < minSize) { + throw new DdlException( + "connection_pool_max_size must be greater than or equal to connection_pool_min_size"); + } + if (maxWaitTime < 0) { + throw new DdlException("connection_pool_max_wait_time must be greater than or equal to 0"); + } + if (maxWaitTime > 30000) { + throw new DdlException("connection_pool_max_wait_time must be less than or equal to 30000"); + } + if (maxLifeTime < 150000) { + throw new DdlException("connection_pool_max_life_time must be greater than or equal to 150000"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index 06d42b158f894b..38aa8a73bddd61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -51,6 +51,7 @@ public class JdbcTable extends Table { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String CATALOG_ID = "catalog_id"; private static final String TABLE = "table"; private static final String REAL_DATABASE = "real_database"; private static final String REAL_TABLE = "real_table"; @@ -81,11 +82,13 @@ public class JdbcTable extends Table { private String driverUrl; private String checkSum; - private int minPoolSize = 1; - private int maxPoolSize = 100; - private int maxIdleTime = 30000; - private int maxWaitTime = 5000; - private boolean keepAlive = false; + private long catalogId = -1; + + private int connectionPoolMinSize; + private int connectionPoolMaxSize; + private int connectionPoolMaxWaitTime; + private int connectionPoolMaxLifeTime; + private boolean connectionPoolKeepAlive; static { Map tempMap = new CaseInsensitiveMap(); @@ -169,24 +172,33 @@ public String getDriverUrl() { return getFromJdbcResourceOrDefault(JdbcResource.DRIVER_URL, driverUrl); } - public int getMinPoolSize() { - return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MIN_POOL_SIZE, String.valueOf(minPoolSize))); + public long getCatalogId() { + return catalogId; + } + + public int getConnectionPoolMinSize() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE, + String.valueOf(connectionPoolMinSize))); } - public int getMaxPoolSize() { - return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_POOL_SIZE, String.valueOf(maxPoolSize))); + public int getConnectionPoolMaxSize() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE, + String.valueOf(connectionPoolMaxSize))); } - public int getMaxIdleTime() { - return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_IDLE_TIME, String.valueOf(maxIdleTime))); + public int getConnectionPoolMaxWaitTime() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME, + String.valueOf(connectionPoolMaxWaitTime))); } - public int getMaxWaitTime() { - return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_WAIT_TIME, String.valueOf(maxWaitTime))); + public int getConnectionPoolMaxLifeTime() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME, + String.valueOf(connectionPoolMaxLifeTime))); } - public boolean getKeepAlive() { - return Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.KEEP_ALIVE, String.valueOf(keepAlive))); + public boolean isConnectionPoolKeepAlive() { + return Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE, + String.valueOf(connectionPoolKeepAlive))); } private String getFromJdbcResourceOrDefault(String key, String defaultVal) { @@ -203,6 +215,7 @@ private String getFromJdbcResourceOrDefault(String key, String defaultVal) { @Override public TTableDescriptor toThrift() { TJdbcTable tJdbcTable = new TJdbcTable(); + tJdbcTable.setCatalogId(catalogId); tJdbcTable.setJdbcUrl(getJdbcUrl()); tJdbcTable.setJdbcUser(getJdbcUser()); tJdbcTable.setJdbcPassword(getJdbcPasswd()); @@ -211,11 +224,11 @@ public TTableDescriptor toThrift() { tJdbcTable.setJdbcDriverUrl(getDriverUrl()); tJdbcTable.setJdbcResourceName(resourceName); tJdbcTable.setJdbcDriverChecksum(checkSum); - tJdbcTable.setJdbcMinPoolSize(getMinPoolSize()); - tJdbcTable.setJdbcMaxPoolSize(getMaxPoolSize()); - tJdbcTable.setJdbcMaxIdleTime(getMaxIdleTime()); - tJdbcTable.setJdbcMaxWaitTime(getMaxWaitTime()); - tJdbcTable.setJdbcKeepAlive(getKeepAlive()); + tJdbcTable.setConnectionPoolMinSize(getConnectionPoolMinSize()); + tJdbcTable.setConnectionPoolMaxSize(getConnectionPoolMaxSize()); + tJdbcTable.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime()); + tJdbcTable.setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime()); + tJdbcTable.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive()); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setJdbcTable(tJdbcTable); @@ -226,6 +239,7 @@ public TTableDescriptor toThrift() { public void write(DataOutput out) throws IOException { super.write(out); Map serializeMap = Maps.newHashMap(); + serializeMap.put(CATALOG_ID, String.valueOf(catalogId)); serializeMap.put(TABLE, externalTableName); serializeMap.put(RESOURCE, resourceName); serializeMap.put(TABLE_TYPE, jdbcTypeName); @@ -263,6 +277,7 @@ public void readFields(DataInput in) throws IOException { String value = Text.readString(in); serializeMap.put(key, value); } + catalogId = serializeMap.get(CATALOG_ID) != null ? Long.parseLong(serializeMap.get(CATALOG_ID)) : -1; externalTableName = serializeMap.get(TABLE); resourceName = serializeMap.get(RESOURCE); jdbcTypeName = serializeMap.get(TABLE_TYPE); @@ -393,6 +408,14 @@ private void validate(Map properties) throws DdlException { driverClass = jdbcResource.getProperty(DRIVER_CLASS); driverUrl = jdbcResource.getProperty(DRIVER_URL); checkSum = jdbcResource.getProperty(CHECK_SUM); + connectionPoolMinSize = Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE)); + connectionPoolMaxSize = Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_SIZE)); + connectionPoolMaxWaitTime = Integer.parseInt( + jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME)); + connectionPoolMaxLifeTime = Integer.parseInt( + jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME)); + connectionPoolKeepAlive = Boolean.parseBoolean( + jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE)); String urlType = jdbcUrl.split(":")[1]; if (!jdbcTypeName.equalsIgnoreCase(urlType)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java index 781c5fb3f6159a..1a51d42f57a5c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java @@ -296,4 +296,6 @@ private void notifyUpdate(Map properties) { } }); } + + public void applyDefaultProperties() {} } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java index 0a0b43adf602a3..34dfb3fa919c52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java @@ -96,6 +96,7 @@ public boolean createResource(Resource resource, boolean ifNotExists) throws Ddl } public void replayCreateResource(Resource resource) { + resource.applyDefaultProperties(); nameToResource.put(resource.getName(), resource); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java index 100fa2280981d9..b0a0654e908182 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java @@ -82,6 +82,7 @@ private JdbcTable toJdbcTable() { JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog; String fullDbName = this.dbName + "." + this.name; JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE); + jdbcTable.setCatalogId(jdbcCatalog.getId()); jdbcTable.setExternalTableName(fullDbName); jdbcTable.setRealDatabaseName(((JdbcExternalCatalog) catalog).getJdbcClient().getRealDatabaseName(this.dbName)); jdbcTable.setRealTableName( @@ -96,11 +97,11 @@ private JdbcTable toJdbcTable() { jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl()); jdbcTable.setResourceName(jdbcCatalog.getResource()); jdbcTable.setCheckSum(jdbcCatalog.getCheckSum()); - jdbcTable.setMinPoolSize(jdbcCatalog.getMinPoolSize()); - jdbcTable.setMaxPoolSize(jdbcCatalog.getMaxPoolSize()); - jdbcTable.setMaxIdleTime(jdbcCatalog.getMaxIdleTime()); - jdbcTable.setMaxWaitTime(jdbcCatalog.getMaxWaitTime()); - jdbcTable.setKeepAlive(jdbcCatalog.getKeepAlive()); + jdbcTable.setConnectionPoolMinSize(jdbcCatalog.getConnectionPoolMinSize()); + jdbcTable.setConnectionPoolMaxSize(jdbcCatalog.getConnectionPoolMaxSize()); + jdbcTable.setConnectionPoolMaxLifeTime(jdbcCatalog.getConnectionPoolMaxLifeTime()); + jdbcTable.setConnectionPoolMaxWaitTime(jdbcCatalog.getConnectionPoolMaxWaitTime()); + jdbcTable.setConnectionPoolKeepAlive(jdbcCatalog.isConnectionPoolKeepAlive()); return jdbcTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index c5b33f7200e89d..e99174c1c209d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -65,6 +65,17 @@ public void checkProperties() throws DdlException { throw new DdlException("Required property '" + requiredProperty + "' is missing"); } } + + Map propertiesWithoutCheckSum = Maps.newHashMap(catalogProperty.getProperties()); + propertiesWithoutCheckSum.remove(JdbcResource.CHECK_SUM); + JdbcResource.validateProperties(propertiesWithoutCheckSum); + + JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, getOnlySpecifiedDatabase()); + JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_TABLE_NAMES, getLowerCaseTableNames()); + JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), getIncludeDatabaseMap(), + getExcludeDatabaseMap()); + JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), getConnectionPoolMaxSize(), + getConnectionPoolMaxWaitTime(), getConnectionPoolMaxLifeTime()); } @Override @@ -130,7 +141,8 @@ public String getCheckSum() { } public String getOnlySpecifiedDatabase() { - return catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, "false"); + return catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, JdbcResource.getDefaultPropertyValue( + JdbcResource.ONLY_SPECIFIED_DATABASE)); } public String getLowerCaseTableNames() { @@ -140,27 +152,33 @@ public String getLowerCaseTableNames() { } // Otherwise, it defaults to false - return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false"); + return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, JdbcResource.getDefaultPropertyValue( + JdbcResource.LOWER_CASE_TABLE_NAMES)); } - public int getMinPoolSize() { - return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MIN_POOL_SIZE, "1")); + public int getConnectionPoolMinSize() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE, JdbcResource + .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE))); } - public int getMaxPoolSize() { - return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_POOL_SIZE, "100")); + public int getConnectionPoolMaxSize() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE, JdbcResource + .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_SIZE))); } - public int getMaxIdleTime() { - return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_IDLE_TIME, "300000")); + public int getConnectionPoolMaxWaitTime() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME, JdbcResource + .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME))); } - public int getMaxWaitTime() { - return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_WAIT_TIME, "5000")); + public int getConnectionPoolMaxLifeTime() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME, JdbcResource + .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME))); } - public boolean getKeepAlive() { - return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.KEEP_ALIVE, "false")); + public boolean isConnectionPoolKeepAlive() { + return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE, JdbcResource + .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE))); } @Override @@ -176,12 +194,11 @@ protected void initLocalObjectsImpl() { .setIsLowerCaseTableNames(getLowerCaseTableNames()) .setIncludeDatabaseMap(getIncludeDatabaseMap()) .setExcludeDatabaseMap(getExcludeDatabaseMap()) - .setMinPoolSize(getMinPoolSize()) - .setMaxPoolSize(getMaxPoolSize()) - .setMinIdleSize(getMinPoolSize() > 0 ? 1 : 0) - .setMaxIdleTime(getMaxIdleTime()) - .setMaxWaitTime(getMaxWaitTime()) - .setKeepAlive(getKeepAlive()); + .setConnectionPoolMinSize(getConnectionPoolMinSize()) + .setConnectionPoolMaxSize(getConnectionPoolMaxSize()) + .setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime()) + .setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime()) + .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive()); jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig); } @@ -219,27 +236,11 @@ public void setDefaultPropsWhenCreating(boolean isReplay) throws DdlException { properties.put(JdbcResource.CHECK_SUM, JdbcResource.computeObjectChecksum(properties.get(JdbcResource.DRIVER_URL))); } - String onlySpecifiedDatabase = getOnlySpecifiedDatabase(); - if (!onlySpecifiedDatabase.equalsIgnoreCase("true") && !onlySpecifiedDatabase.equalsIgnoreCase("false")) { - throw new DdlException("only_specified_database must be true or false"); - } - String lowerCaseTableNames = getLowerCaseTableNames(); - if (!lowerCaseTableNames.equalsIgnoreCase("true") && !lowerCaseTableNames.equalsIgnoreCase("false")) { - throw new DdlException("lower_case_table_names must be true or false"); - } - if (!onlySpecifiedDatabase.equalsIgnoreCase("true")) { - Map includeDatabaseList = getIncludeDatabaseMap(); - Map excludeDatabaseList = getExcludeDatabaseMap(); - if ((includeDatabaseList != null && !includeDatabaseList.isEmpty()) - || (excludeDatabaseList != null && !excludeDatabaseList.isEmpty())) { - throw new DdlException("include_database_list and exclude_database_list can not be set when " - + "only_specified_database is false"); - } - } } /** * Execute stmt direct via jdbc + * * @param stmt, the raw stmt string */ public void executeStmt(String stmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 3e1f5a73f69cf3..7851b708d36f1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -135,24 +135,23 @@ private void initializeDataSource(JdbcClientConfig config) { dataSource.setUrl(config.getJdbcUrl()); dataSource.setUsername(config.getUser()); dataSource.setPassword(config.getPassword()); - dataSource.setMinIdle(config.getMinIdleSize()); - dataSource.setInitialSize(config.getMinPoolSize()); - dataSource.setMaxActive(config.getMaxPoolSize()); - dataSource.setTimeBetweenEvictionRunsMillis(config.getMaxIdleTime() * 2L); - dataSource.setMinEvictableIdleTimeMillis(config.getMaxIdleTime()); + dataSource.setMinIdle(config.getConnectionPoolMinSize()); // default 1 + dataSource.setInitialSize(config.getConnectionPoolMinSize()); // default 1 + dataSource.setMaxActive(config.getConnectionPoolMaxSize()); // default 10 // set connection timeout to 5s. // The default is 30s, which is too long. // Because when querying information_schema db, BE will call thrift rpc(default timeout is 30s) // to FE to get schema info, and may create connection here, if we set it too long and the url is invalid, // it may cause the thrift rpc timeout. - dataSource.setMaxWait(config.getMaxWaitTime()); - dataSource.setKeepAlive(config.isKeepAlive()); - LOG.info("JdbcExecutor set minPoolSize = " + config.getMinPoolSize() - + ", maxPoolSize = " + config.getMaxPoolSize() - + ", maxIdleTime = " + config.getMaxIdleTime() - + ", maxWaitTime = " + config.getMaxWaitTime() - + ", minIdleSize = " + config.getMinIdleSize() - + ", keepAlive = " + config.isKeepAlive()); + dataSource.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000 + dataSource.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() / 10L); // default 3 min + dataSource.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L); // default 15 min + dataSource.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime()); // default 30 min + LOG.info("JdbcClient set" + + " ConnectionPoolMinSize = " + config.getConnectionPoolMinSize() + + ", ConnectionPoolMaxSize = " + config.getConnectionPoolMaxSize() + + ", ConnectionPoolMaxWaitTime = " + config.getConnectionPoolMaxWaitTime() + + ", ConnectionPoolMaxLifeTime = " + config.getConnectionPoolMaxLifeTime()); } catch (MalformedURLException e) { throw new JdbcClientException("MalformedURLException to load class about " + config.getDriverUrl(), e); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java index ff080f8c87557d..41fac872e46a57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java @@ -18,6 +18,8 @@ package org.apache.doris.datasource.jdbc.client; +import org.apache.doris.catalog.JdbcResource; + import com.google.common.collect.Maps; import java.util.Map; @@ -31,22 +33,44 @@ public class JdbcClientConfig implements Cloneable { private String driverClass; private String onlySpecifiedDatabase; private String isLowerCaseTableNames; - private int minPoolSize = 1; - private int maxPoolSize = 100; - private int minIdleSize = 1; - private int maxIdleTime = 300000; - private int maxWaitTime = 5000; - private boolean keepAlive = false; - - private Map includeDatabaseMap = Maps.newHashMap(); - private Map excludeDatabaseMap = Maps.newHashMap(); - private Map customizedProperties = Maps.newHashMap(); + private int connectionPoolMinSize; + private int connectionPoolMaxSize; + private int connectionPoolMaxWaitTime; + private int connectionPoolMaxLifeTime; + private boolean connectionPoolKeepAlive; + + private Map includeDatabaseMap; + private Map excludeDatabaseMap; + private Map customizedProperties; + + public JdbcClientConfig() { + this.onlySpecifiedDatabase = JdbcResource.getDefaultPropertyValue(JdbcResource.ONLY_SPECIFIED_DATABASE); + this.isLowerCaseTableNames = JdbcResource.getDefaultPropertyValue(JdbcResource.LOWER_CASE_TABLE_NAMES); + this.connectionPoolMinSize = Integer.parseInt( + JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE)); + this.connectionPoolMaxSize = Integer.parseInt( + JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_SIZE)); + this.connectionPoolMaxWaitTime = Integer.parseInt( + JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME)); + this.connectionPoolMaxLifeTime = Integer.parseInt( + JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME)); + this.connectionPoolKeepAlive = Boolean.parseBoolean( + JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE)); + this.includeDatabaseMap = Maps.newHashMap(); + this.excludeDatabaseMap = Maps.newHashMap(); + this.customizedProperties = Maps.newHashMap(); + } @Override public JdbcClientConfig clone() { try { JdbcClientConfig cloned = (JdbcClientConfig) super.clone(); + cloned.connectionPoolMinSize = connectionPoolMinSize; + cloned.connectionPoolMaxSize = connectionPoolMaxSize; + cloned.connectionPoolMaxLifeTime = connectionPoolMaxLifeTime; + cloned.connectionPoolMaxWaitTime = connectionPoolMaxWaitTime; + cloned.connectionPoolKeepAlive = connectionPoolKeepAlive; cloned.includeDatabaseMap = Maps.newHashMap(includeDatabaseMap); cloned.excludeDatabaseMap = Maps.newHashMap(excludeDatabaseMap); cloned.customizedProperties = Maps.newHashMap(customizedProperties); @@ -128,57 +152,48 @@ public JdbcClientConfig setIsLowerCaseTableNames(String isLowerCaseTableNames) { return this; } - public int getMinPoolSize() { - return minPoolSize; - } - - public JdbcClientConfig setMinPoolSize(int minPoolSize) { - this.minPoolSize = minPoolSize; - return this; - } - - public int getMaxPoolSize() { - return maxPoolSize; + public int getConnectionPoolMinSize() { + return connectionPoolMinSize; } - public JdbcClientConfig setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; + public JdbcClientConfig setConnectionPoolMinSize(int connectionPoolMinSize) { + this.connectionPoolMinSize = connectionPoolMinSize; return this; } - public int getMinIdleSize() { - return minIdleSize; + public int getConnectionPoolMaxSize() { + return connectionPoolMaxSize; } - public JdbcClientConfig setMinIdleSize(int minIdleSize) { - this.minIdleSize = minIdleSize; + public JdbcClientConfig setConnectionPoolMaxSize(int connectionPoolMaxSize) { + this.connectionPoolMaxSize = connectionPoolMaxSize; return this; } - public int getMaxIdleTime() { - return maxIdleTime; + public int getConnectionPoolMaxLifeTime() { + return connectionPoolMaxLifeTime; } - public JdbcClientConfig setMaxIdleTime(int maxIdleTime) { - this.maxIdleTime = maxIdleTime; + public JdbcClientConfig setConnectionPoolMaxLifeTime(int connectionPoolMaxLifeTime) { + this.connectionPoolMaxLifeTime = connectionPoolMaxLifeTime; return this; } - public int getMaxWaitTime() { - return maxWaitTime; + public int getConnectionPoolMaxWaitTime() { + return connectionPoolMaxWaitTime; } - public JdbcClientConfig setMaxWaitTime(int maxWaitTime) { - this.maxWaitTime = maxWaitTime; + public JdbcClientConfig setConnectionPoolMaxWaitTime(int connectionPoolMaxWaitTime) { + this.connectionPoolMaxWaitTime = connectionPoolMaxWaitTime; return this; } - public boolean isKeepAlive() { - return keepAlive; + public boolean isConnectionPoolKeepAlive() { + return connectionPoolKeepAlive; } - public JdbcClientConfig setKeepAlive(boolean keepAlive) { - this.keepAlive = keepAlive; + public JdbcClientConfig setConnectionPoolKeepAlive(boolean connectionPoolKeepAlive) { + this.connectionPoolKeepAlive = connectionPoolKeepAlive; return this; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java index e91898fb82da78..ee0d1949be2812 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java @@ -17,13 +17,154 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.AccessTestUtil; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CreateResourceStmt; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.util.Map; + public class JdbcResourceTest { + private final ResourceMgr resourceMgr = new ResourceMgr(); + + private Map jdbcProperties; + + private Analyzer analyzer; + + @Before + public void setUp() { + FeConstants.runningUnitTest = true; + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + jdbcProperties = Maps.newHashMap(); + jdbcProperties.put("type", "jdbc"); + jdbcProperties.put("user", "postgres"); + jdbcProperties.put("password", ""); + jdbcProperties.put("jdbc_url", "jdbc:postgresql://127.0.0.1:5432/postgres?currentSchema=doris_test"); + jdbcProperties.put("driver_url", "postgresql-42.5.0.jar"); + jdbcProperties.put("driver_class", "org.postgresql.Driver"); + jdbcProperties.put("checksum", "20c8228267b6c9ce620fddb39467d3eb"); + } + + @Test + public void testJdbcResourceCreateWithDefaultProperties(@Mocked Env env, + @Injectable AccessControllerManager accessManager) + throws UserException { + new Expectations() { + { + env.getAccessManager(); + result = accessManager; + accessManager.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN); + result = true; + } + }; + + jdbcProperties.remove("checksum"); + + CreateResourceStmt stmt = new CreateResourceStmt(true, false, "jdbc_resource_pg_14", jdbcProperties); + + stmt.analyze(analyzer); + + resourceMgr.createResource(stmt); + + JdbcResource jdbcResource = (JdbcResource) resourceMgr.getResource("jdbc_resource_pg_14"); + + + // Verify the default properties were applied during the replay + Map properties = jdbcResource.getCopiedProperties(); + Assert.assertEquals("1", properties.get("connection_pool_min_size")); + Assert.assertEquals("10", properties.get("connection_pool_max_size")); + Assert.assertEquals("1800000", properties.get("connection_pool_max_life_time")); + Assert.assertEquals("5000", properties.get("connection_pool_max_wait_time")); + Assert.assertEquals("false", properties.get("connection_pool_keep_alive")); + } + + @Test + public void testJdbcResourceReplayWithDefaultProperties() { + + JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14", jdbcProperties); + + // Replay the resource creation to simulate the edit log replay + resourceMgr.replayCreateResource(jdbcResource); + + // Retrieve the replayed resource + Resource replayedResource = resourceMgr.getResource("jdbc_resource_pg_14"); + + Assert.assertNotNull(replayedResource); + Assert.assertTrue(replayedResource instanceof JdbcResource); + + // Verify the default properties were applied during the replay + Map properties = replayedResource.getCopiedProperties(); + Assert.assertEquals("1", properties.get("connection_pool_min_size")); + Assert.assertEquals("10", properties.get("connection_pool_max_size")); + Assert.assertEquals("1800000", properties.get("connection_pool_max_life_time")); + Assert.assertEquals("5000", properties.get("connection_pool_max_wait_time")); + Assert.assertEquals("false", properties.get("connection_pool_keep_alive")); + } + + @Test + public void testJdbcResourceReplayWithSetProperties() { + + // Add some properties to the JDBC properties + jdbcProperties.put("connection_pool_min_size", "2"); + jdbcProperties.put("connection_pool_max_size", "20"); + jdbcProperties.put("connection_pool_max_life_time", "3600000"); + jdbcProperties.put("connection_pool_max_wait_time", "10000"); + jdbcProperties.put("connection_pool_keep_alive", "true"); + + JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14", jdbcProperties); + + // Replay the resource creation to simulate the edit log replay + resourceMgr.replayCreateResource(jdbcResource); + + // Retrieve the replayed resource + Resource replayedResource = resourceMgr.getResource("jdbc_resource_pg_14"); + + Assert.assertNotNull(replayedResource); + Assert.assertTrue(replayedResource instanceof JdbcResource); + + // Verify the modified properties were applied during the replay + Map properties = replayedResource.getCopiedProperties(); + Assert.assertEquals("2", properties.get("connection_pool_min_size")); + Assert.assertEquals("20", properties.get("connection_pool_max_size")); + Assert.assertEquals("3600000", properties.get("connection_pool_max_life_time")); + Assert.assertEquals("10000", properties.get("connection_pool_max_wait_time")); + Assert.assertEquals("true", properties.get("connection_pool_keep_alive")); + } + + @Test + public void testJdbcResourceReplayWithModifiedAfterSetDefaultProperties() throws DdlException { + JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14", jdbcProperties); + + // Replay the resource creation to simulate the edit log replay + resourceMgr.replayCreateResource(jdbcResource); + + // Retrieve the replayed resource + Resource replayedResource = resourceMgr.getResource("jdbc_resource_pg_14"); + Map newProperties = Maps.newHashMap(); + newProperties.put(JdbcResource.CONNECTION_POOL_MIN_SIZE, "2"); + replayedResource.modifyProperties(newProperties); + Map properties = replayedResource.getCopiedProperties(); + Assert.assertEquals("2", properties.get("connection_pool_min_size")); + resourceMgr.replayCreateResource(replayedResource); + Resource replayedResource2 = resourceMgr.getResource("jdbc_resource_pg_14"); + Map properties2 = replayedResource2.getCopiedProperties(); + Assert.assertEquals("2", properties2.get("connection_pool_min_size")); + } + @Test public void testHandleJdbcUrlForMySql() throws DdlException { String inputUrl = "jdbc:mysql://127.0.0.1:3306/test"; @@ -36,7 +177,7 @@ public void testHandleJdbcUrlForMySql() throws DdlException { @Test public void testHandleJdbcUrlForSqlServerWithoutParams() throws DdlException { - String inputUrl = "jdbc:sqlserver://43.129.237.12:1433;databaseName=doris_test"; + String inputUrl = "jdbc:sqlserver://127.0.0.1:1433;databaseName=doris_test"; String resultUrl = JdbcResource.handleJdbcUrl(inputUrl); // Ensure that the result URL for SQL Server doesn't have '?' or '&' @@ -49,7 +190,8 @@ public void testHandleJdbcUrlForSqlServerWithoutParams() throws DdlException { @Test public void testHandleJdbcUrlForSqlServerWithParams() throws DdlException { - String inputUrl = "jdbc:sqlserver://43.129.237.12:1433;encrypt=false;databaseName=doris_test;trustServerCertificate=false"; + String inputUrl + = "jdbc:sqlserver://127.0.0.1:1433;encrypt=false;databaseName=doris_test;trustServerCertificate=false"; String resultUrl = JdbcResource.handleJdbcUrl(inputUrl); // Ensure that the result URL for SQL Server doesn't have '?' or '&' diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java index 0f2977a9886afb..7bc268b64211bb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java @@ -20,10 +20,12 @@ import org.apache.doris.catalog.JdbcResource; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.CatalogFactory; +import com.google.common.collect.Maps; import org.junit.Assert; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.Before; +import org.junit.Test; import java.util.HashMap; import java.util.Map; @@ -31,32 +33,53 @@ public class JdbcExternalCatalogTest { private JdbcExternalCatalog jdbcExternalCatalog; - @BeforeEach + @Before public void setUp() throws DdlException { FeConstants.runningUnitTest = true; Map properties = new HashMap<>(); + properties.put("type", "jdbc"); properties.put(JdbcResource.DRIVER_URL, "ojdbc8.jar"); properties.put(JdbcResource.JDBC_URL, "jdbc:oracle:thin:@127.0.0.1:1521:XE"); properties.put(JdbcResource.DRIVER_CLASS, "oracle.jdbc.driver.OracleDriver"); - jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", "testResource", properties, "testComment"); + jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null, properties, "testComment"); } @Test - public void setDefaultPropsWhenCreatingTest() { + public void replayJdbcCatalogTest() throws DdlException { + jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE, "1"); + JdbcExternalCatalog replayJdbcCatalog = (JdbcExternalCatalog) CatalogFactory.createFromLog( + jdbcExternalCatalog.constructEditLog()); + Map properties = replayJdbcCatalog.getProperties(); + Assert.assertEquals("1", properties.get("connection_pool_min_size")); + Map newProperties = Maps.newHashMap(); + newProperties.put(JdbcResource.CONNECTION_POOL_MIN_SIZE, "2"); + jdbcExternalCatalog.getCatalogProperty().modifyCatalogProps(newProperties); + JdbcExternalCatalog replayJdbcCatalog2 = (JdbcExternalCatalog) CatalogFactory.createFromLog( + jdbcExternalCatalog.constructEditLog()); + Map properties2 = replayJdbcCatalog2.getProperties(); + Assert.assertEquals("2", properties2.get("connection_pool_min_size")); + } + + @Test + public void checkPropertiesTest() { jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, "1"); - Exception exception1 = Assert.assertThrows(DdlException.class, () -> jdbcExternalCatalog.setDefaultPropsWhenCreating(false)); - Assert.assertEquals("errCode = 2, detailMessage = only_specified_database must be true or false", exception1.getMessage()); + Exception exception1 = Assert.assertThrows(DdlException.class, () -> jdbcExternalCatalog.checkProperties()); + Assert.assertEquals("errCode = 2, detailMessage = only_specified_database must be true or false", + exception1.getMessage()); jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, "true"); jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES, "1"); - Exception exception2 = Assert.assertThrows(DdlException.class, () -> jdbcExternalCatalog.setDefaultPropsWhenCreating(false)); - Assert.assertEquals("errCode = 2, detailMessage = lower_case_table_names must be true or false", exception2.getMessage()); + Exception exception2 = Assert.assertThrows(DdlException.class, () -> jdbcExternalCatalog.checkProperties()); + Assert.assertEquals("errCode = 2, detailMessage = lower_case_table_names must be true or false", + exception2.getMessage()); jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, "false"); jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES, "false"); jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.INCLUDE_DATABASE_LIST, "db1,db2"); - DdlException exceptione3 = Assert.assertThrows(DdlException.class, () -> jdbcExternalCatalog.setDefaultPropsWhenCreating(false)); - Assert.assertEquals("errCode = 2, detailMessage = include_database_list and exclude_database_list can not be set when only_specified_database is false", exceptione3.getMessage()); + DdlException exceptione3 = Assert.assertThrows(DdlException.class, () -> jdbcExternalCatalog.checkProperties()); + Assert.assertEquals( + "errCode = 2, detailMessage = include_database_list and exclude_database_list cannot be set when only_specified_database is false", + exceptione3.getMessage()); } } diff --git a/fe/pom.xml b/fe/pom.xml index 51157e3008284c..8fb8fd0a37f57b 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -251,7 +251,7 @@ under the License. 2.7 1.1.1 5.8.2 - 1.2.5 + 1.2.20 0.4.6 0.16.0 8.5.86 diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 16a37d1393a780..c720a402dd4127 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -324,11 +324,12 @@ struct TJdbcTable { 6: optional string jdbc_resource_name 7: optional string jdbc_driver_class 8: optional string jdbc_driver_checksum - 9: optional i32 jdbc_min_pool_size - 10: optional i32 jdbc_max_pool_size - 11: optional i32 jdbc_max_idle_time - 12: optional i32 jdbc_max_wait_time - 13: optional bool jdbc_keep_alive + 9: optional i32 connection_pool_min_size + 10: optional i32 connection_pool_max_size + 11: optional i32 connection_pool_max_wait_time + 12: optional i32 connection_pool_max_life_time + 13: optional bool connection_pool_keep_alive + 14: optional i64 catalog_id } struct TMCTable { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 5ca7328a728f60..cde75a2601296d 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -428,11 +428,13 @@ struct TJdbcExecutorCtorParams { 9: optional TOdbcTableType table_type - 10: optional i32 min_pool_size - 11: optional i32 max_pool_size - 12: optional i32 max_idle_time - 13: optional i32 max_wait_time - 14: optional bool keep_alive + 10: optional i32 connection_pool_min_size + 11: optional i32 connection_pool_max_size + 12: optional i32 connection_pool_max_wait_time + 13: optional i32 connection_pool_max_life_time + 14: optional i32 connection_pool_cache_clear_time + 15: optional bool connection_pool_keep_alive + 16: optional i64 catalog_id } struct TJavaUdfExecutorCtorParams {