Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](jdbc catalog) Optimize connection pool parameter settings #30588

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,9 @@ DEFINE_mInt32(segment_compression_threshold_kb, "256");
// The connection timeout when connecting to external table such as odbc table.
DEFINE_mInt32(external_table_connect_timeout_sec, "30");

// Time to clean up useless JDBC connection pool cache
DEFINE_mInt32(jdbc_connection_pool_cache_clear_time_sec, "28800");

// Global bitmap cache capacity for aggregation cache, size in bytes
DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ DECLARE_mInt32(segment_compression_threshold_kb);
// The connection timeout when connecting to external table such as odbc table.
DECLARE_mInt32(external_table_connect_timeout_sec);

// Time to clean up useless JDBC connection pool cache
DECLARE_mInt32(jdbc_connection_pool_cache_clear_time_sec);

// Global bitmap cache capacity for aggregation cache, size in bytes
DECLARE_Int64(delete_bitmap_agg_cache_capacity);
DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec);
Expand Down
33 changes: 18 additions & 15 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ std::string ODBCTableDescriptor::debug_string() const {

JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc),
_jdbc_catalog_id(tdesc.jdbcTable.catalog_id),
_jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name),
_jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url),
_jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class),
Expand All @@ -266,24 +267,26 @@ JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
_jdbc_table_name(tdesc.jdbcTable.jdbc_table_name),
_jdbc_user(tdesc.jdbcTable.jdbc_user),
_jdbc_passwd(tdesc.jdbcTable.jdbc_password),
_jdbc_min_pool_size(tdesc.jdbcTable.jdbc_min_pool_size),
_jdbc_max_pool_size(tdesc.jdbcTable.jdbc_max_pool_size),
_jdbc_max_idle_time(tdesc.jdbcTable.jdbc_max_idle_time),
_jdbc_max_wait_time(tdesc.jdbcTable.jdbc_max_wait_time),
_jdbc_keep_alive(tdesc.jdbcTable.jdbc_keep_alive) {}
_connection_pool_min_size(tdesc.jdbcTable.connection_pool_min_size),
_connection_pool_max_size(tdesc.jdbcTable.connection_pool_max_size),
_connection_pool_max_wait_time(tdesc.jdbcTable.connection_pool_max_wait_time),
_connection_pool_max_life_time(tdesc.jdbcTable.connection_pool_max_life_time),
_connection_pool_keep_alive(tdesc.jdbcTable.connection_pool_keep_alive) {}

std::string JdbcTableDescriptor::debug_string() const {
fmt::memory_buffer buf;
fmt::format_to(buf,
"JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} "
",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_jdbc_min_pool_size={} "
",_jdbc_max_pool_size={} ,_jdbc_max_idle_time={} ,_jdbc_max_wait_time={} "
",_jdbc_keep_alive={})",
TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url,
_jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name,
_jdbc_user, _jdbc_passwd, _jdbc_min_pool_size, _jdbc_max_pool_size,
_jdbc_max_idle_time, _jdbc_max_wait_time, _jdbc_keep_alive);
fmt::format_to(
buf,
"JDBCTable({} ,_jdbc_catalog_id = {}, _jdbc_resource_name={} ,_jdbc_driver_url={} "
",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_connection_pool_min_size={} "
",_connection_pool_max_size={} ,_connection_pool_max_wait_time={} "
",_connection_pool_max_life_time={} ,_connection_pool_keep_alive={})",
TableDescriptor::debug_string(), _jdbc_catalog_id, _jdbc_resource_name,
_jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url,
_jdbc_table_name, _jdbc_user, _jdbc_passwd, _connection_pool_min_size,
_connection_pool_max_size, _connection_pool_max_wait_time,
_connection_pool_max_life_time, _connection_pool_keep_alive);
return fmt::to_string(buf);
}

Expand Down
22 changes: 12 additions & 10 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class JdbcTableDescriptor : public TableDescriptor {
public:
JdbcTableDescriptor(const TTableDescriptor& tdesc);
std::string debug_string() const override;
int64_t jdbc_catalog_id() const { return _jdbc_catalog_id; }
const std::string& jdbc_resource_name() const { return _jdbc_resource_name; }
const std::string& jdbc_driver_url() const { return _jdbc_driver_url; }
const std::string& jdbc_driver_class() const { return _jdbc_driver_class; }
Expand All @@ -318,13 +319,14 @@ class JdbcTableDescriptor : public TableDescriptor {
const std::string& jdbc_table_name() const { return _jdbc_table_name; }
const std::string& jdbc_user() const { return _jdbc_user; }
const std::string& jdbc_passwd() const { return _jdbc_passwd; }
int32_t jdbc_min_pool_size() const { return _jdbc_min_pool_size; }
int32_t jdbc_max_pool_size() const { return _jdbc_max_pool_size; }
int32_t jdbc_max_idle_time() const { return _jdbc_max_idle_time; }
int32_t jdbc_max_wait_time() const { return _jdbc_max_wait_time; }
bool jdbc_keep_alive() const { return _jdbc_keep_alive; }
int32_t connection_pool_min_size() const { return _connection_pool_min_size; }
int32_t connection_pool_max_size() const { return _connection_pool_max_size; }
int32_t connection_pool_max_wait_time() const { return _connection_pool_max_wait_time; }
int32_t connection_pool_max_life_time() const { return _connection_pool_max_life_time; }
bool connection_pool_keep_alive() const { return _connection_pool_keep_alive; }

private:
int64_t _jdbc_catalog_id;
std::string _jdbc_resource_name;
std::string _jdbc_driver_url;
std::string _jdbc_driver_class;
Expand All @@ -333,11 +335,11 @@ class JdbcTableDescriptor : public TableDescriptor {
std::string _jdbc_table_name;
std::string _jdbc_user;
std::string _jdbc_passwd;
int32_t _jdbc_min_pool_size;
int32_t _jdbc_max_pool_size;
int32_t _jdbc_max_idle_time;
int32_t _jdbc_max_wait_time;
bool _jdbc_keep_alive;
int32_t _connection_pool_min_size;
int32_t _connection_pool_max_size;
int32_t _connection_pool_max_wait_time;
int32_t _connection_pool_max_life_time;
bool _connection_pool_keep_alive;
};

class TupleDescriptor {
Expand Down
11 changes: 6 additions & 5 deletions be/src/vec/exec/scan/new_jdbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con
if (jdbc_table == nullptr) {
return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare.");
}
_jdbc_param.catalog_id = jdbc_table->jdbc_catalog_id();
_jdbc_param.driver_class = jdbc_table->jdbc_driver_class();
_jdbc_param.driver_path = jdbc_table->jdbc_driver_url();
_jdbc_param.resource_name = jdbc_table->jdbc_resource_name();
Expand All @@ -95,11 +96,11 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con
_jdbc_param.tuple_desc = _tuple_desc;
_jdbc_param.query_string = std::move(_query_string);
_jdbc_param.table_type = _table_type;
_jdbc_param.min_pool_size = jdbc_table->jdbc_min_pool_size();
_jdbc_param.max_pool_size = jdbc_table->jdbc_max_pool_size();
_jdbc_param.max_idle_time = jdbc_table->jdbc_max_idle_time();
_jdbc_param.max_wait_time = jdbc_table->jdbc_max_wait_time();
_jdbc_param.keep_alive = jdbc_table->jdbc_keep_alive();
_jdbc_param.connection_pool_min_size = jdbc_table->connection_pool_min_size();
_jdbc_param.connection_pool_max_size = jdbc_table->connection_pool_max_size();
_jdbc_param.connection_pool_max_life_time = jdbc_table->connection_pool_max_life_time();
_jdbc_param.connection_pool_max_wait_time = jdbc_table->connection_pool_max_wait_time();
_jdbc_param.connection_pool_keep_alive = jdbc_table->connection_pool_keep_alive();

if (get_parent() != nullptr) {
get_parent()->_scanner_profile->add_info_string("JdbcDriverClass",
Expand Down
13 changes: 8 additions & 5 deletions be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {

TJdbcExecutorCtorParams ctor_params;
ctor_params.__set_statement(_sql_str);
ctor_params.__set_catalog_id(_conn_param.catalog_id);
ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
ctor_params.__set_jdbc_user(_conn_param.user);
ctor_params.__set_jdbc_password(_conn_param.passwd);
Expand All @@ -143,11 +144,13 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
ctor_params.__set_table_type(_conn_param.table_type);
ctor_params.__set_min_pool_size(_conn_param.min_pool_size);
ctor_params.__set_max_pool_size(_conn_param.max_pool_size);
ctor_params.__set_max_idle_time(_conn_param.max_idle_time);
ctor_params.__set_max_wait_time(_conn_param.max_wait_time);
ctor_params.__set_keep_alive(_conn_param.keep_alive);
ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
ctor_params.__set_connection_pool_max_size(_conn_param.connection_pool_max_size);
ctor_params.__set_connection_pool_max_wait_time(_conn_param.connection_pool_max_wait_time);
ctor_params.__set_connection_pool_max_life_time(_conn_param.connection_pool_max_life_time);
ctor_params.__set_connection_pool_cache_clear_time(
config::jdbc_connection_pool_cache_clear_time_sec);
ctor_params.__set_connection_pool_keep_alive(_conn_param.connection_pool_keep_alive);

jbyteArray ctor_params_bytes;
// Pushed frame will be popped when jni_frame goes out-of-scope.
Expand Down
11 changes: 6 additions & 5 deletions be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class IColumn;
class VExprContext;

struct JdbcConnectorParam {
int64_t catalog_id;
std::string driver_path;
std::string driver_class;
std::string resource_name;
Expand All @@ -54,11 +55,11 @@ struct JdbcConnectorParam {
std::string table_name;
bool use_transaction;
TOdbcTableType::type table_type;
int32_t min_pool_size;
int32_t max_pool_size;
int32_t max_idle_time;
int32_t max_wait_time;
bool keep_alive;
int32_t connection_pool_min_size;
int32_t connection_pool_max_size;
int32_t connection_pool_max_wait_time;
int32_t connection_pool_max_life_time;
bool connection_pool_keep_alive;

const TupleDescriptor* tuple_desc = nullptr;
};
Expand Down
11 changes: 6 additions & 5 deletions be/src/vec/sink/writer/vjdbc_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ JdbcConnectorParam VJdbcTableWriter::create_connect_param(const doris::TDataSink

JdbcConnectorParam jdbc_param;

jdbc_param.catalog_id = t_jdbc_sink.jdbc_table.catalog_id;
jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url;
jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user;
jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password;
Expand All @@ -46,11 +47,11 @@ JdbcConnectorParam VJdbcTableWriter::create_connect_param(const doris::TDataSink
jdbc_param.query_string = t_jdbc_sink.insert_sql;
jdbc_param.table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
jdbc_param.use_transaction = t_jdbc_sink.use_transaction;
jdbc_param.min_pool_size = t_jdbc_sink.jdbc_table.jdbc_min_pool_size;
jdbc_param.max_pool_size = t_jdbc_sink.jdbc_table.jdbc_max_pool_size;
jdbc_param.max_idle_time = t_jdbc_sink.jdbc_table.jdbc_max_idle_time;
jdbc_param.max_wait_time = t_jdbc_sink.jdbc_table.jdbc_max_wait_time;
jdbc_param.keep_alive = t_jdbc_sink.jdbc_table.jdbc_keep_alive;
jdbc_param.connection_pool_min_size = t_jdbc_sink.jdbc_table.connection_pool_min_size;
jdbc_param.connection_pool_max_size = t_jdbc_sink.jdbc_table.connection_pool_max_size;
jdbc_param.connection_pool_max_wait_time = t_jdbc_sink.jdbc_table.connection_pool_max_wait_time;
jdbc_param.connection_pool_max_life_time = t_jdbc_sink.jdbc_table.connection_pool_max_life_time;
jdbc_param.connection_pool_keep_alive = t_jdbc_sink.jdbc_table.connection_pool_keep_alive;

return jdbc_param;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,77 @@
package org.apache.doris.jdbc;

import com.alibaba.druid.pool.DruidDataSource;
import org.apache.log4j.Logger;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class JdbcDataSource {
private static final Logger LOG = Logger.getLogger(JdbcDataSource.class);
private static final JdbcDataSource jdbcDataSource = new JdbcDataSource();
private final Map<String, DruidDataSource> sourcesMap = new ConcurrentHashMap<>();
private final Map<String, Long> lastAccessTimeMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private long cleanupInterval = 8 * 60 * 60 * 1000; // 8 hours
private ScheduledFuture<?> cleanupTask = null;

private JdbcDataSource() {
startCleanupTask();
}

public static JdbcDataSource getDataSource() {
return jdbcDataSource;
}

public DruidDataSource getSource(String cacheKey) {
lastAccessTimeMap.put(cacheKey, System.currentTimeMillis());
return sourcesMap.get(cacheKey);
}

public void putSource(String cacheKey, DruidDataSource ds) {
sourcesMap.put(cacheKey, ds);
lastAccessTimeMap.put(cacheKey, System.currentTimeMillis());
}

public Map<String, DruidDataSource> getSourcesMap() {
return sourcesMap;
}

public void setCleanupInterval(long interval) {
this.cleanupInterval = interval * 1000L;
restartCleanupTask();
}

private synchronized void restartCleanupTask() {
if (cleanupTask != null && !cleanupTask.isCancelled()) {
cleanupTask.cancel(false);
}
cleanupTask = executor.scheduleAtFixedRate(() -> {
try {
long now = System.currentTimeMillis();
lastAccessTimeMap.forEach((key, lastAccessTime) -> {
if (now - lastAccessTime > cleanupInterval) {
DruidDataSource ds = sourcesMap.remove(key);
if (ds != null) {
ds.close();
}
lastAccessTimeMap.remove(key);
LOG.info("remove jdbc data source: " + key.split("jdbc")[0]);
}
});
} catch (Exception e) {
LOG.error("failed to cleanup jdbc data source", e);
}
}, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
}

private void startCleanupTask() {
if (cleanupTask == null || cleanupTask.isCancelled()) {
restartCleanupTask();
}
}
}
Loading
Loading