From 1c70b06d87ab0576eb265863144140934bdb42e6 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Mon, 3 Jul 2023 15:29:23 +0800 Subject: [PATCH] [enchanment](udf) add more info when download package of jar failed --- be/src/runtime/descriptors.cpp | 8 +- be/src/runtime/descriptors.h | 3 + be/src/runtime/user_function_cache.cpp | 74 +++++++++++++++---- be/src/runtime/user_function_cache.h | 7 +- .../aggregate_function_java_udaf.h | 4 +- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 1 + be/src/vec/exec/vjdbc_connector.cpp | 4 +- be/src/vec/exec/vjdbc_connector.h | 2 + be/src/vec/functions/function_java_udf.cpp | 3 +- .../apache/doris/common/FeMetaVersion.java | 5 +- .../doris/analysis/CreateFunctionStmt.java | 7 +- .../org/apache/doris/catalog/Function.java | 18 +++++ .../apache/doris/catalog/JdbcResource.java | 6 ++ .../org/apache/doris/catalog/JdbcTable.java | 19 ++++- .../catalog/external/JdbcExternalTable.java | 1 + .../doris/datasource/JdbcExternalCatalog.java | 5 ++ gensrc/thrift/Descriptors.thrift | 2 +- gensrc/thrift/Types.thrift | 2 + 18 files changed, 138 insertions(+), 33 deletions(-) diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 1855f5d58d79ce1..72dd7c055c037ba 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -252,17 +252,19 @@ JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc) _jdbc_url(tdesc.jdbcTable.jdbc_url), _jdbc_table_name(tdesc.jdbcTable.jdbc_table_name), _jdbc_user(tdesc.jdbcTable.jdbc_user), - _jdbc_passwd(tdesc.jdbcTable.jdbc_password) {} + _jdbc_passwd(tdesc.jdbcTable.jdbc_password), + _file_size(tdesc.jdbcTable.__isset.jdbc_file_size ? tdesc.jdbcTable.jdbc_file_size : -1) { +} std::string JdbcTableDescriptor::debug_string() const { fmt::memory_buffer buf; fmt::format_to(buf, "JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} " ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} " - ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={})", + ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={}, _jdbc_file_size={})", TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name, - _jdbc_user, _jdbc_passwd); + _jdbc_user, _jdbc_passwd, _file_size); return fmt::to_string(buf); } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index aff3b03a0f7011b..506dac254b7baa2 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -307,6 +308,7 @@ class JdbcTableDescriptor : public TableDescriptor { const std::string& jdbc_table_name() const { return _jdbc_table_name; } const std::string& jdbc_user() const { return _jdbc_user; } const std::string& jdbc_passwd() const { return _jdbc_passwd; } + int64_t file_size() const { return _file_size; } private: std::string _jdbc_resource_name; @@ -317,6 +319,7 @@ class JdbcTableDescriptor : public TableDescriptor { std::string _jdbc_table_name; std::string _jdbc_user; std::string _jdbc_passwd; + int64_t _file_size; }; class TupleDescriptor { diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index f7ec0890a6427e3..839c2cb0ce94caf 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -51,14 +52,20 @@ static const int kLibShardNum = 128; struct UserFunctionCacheEntry { ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry); UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_, - LibType type) - : function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {} + int64_t file_size_, LibType type) + : function_id(fid_), + checksum(checksum_), + file_size(file_size_), + lib_file(lib_file_), + type(type) {} ~UserFunctionCacheEntry(); int64_t function_id = 0; // used to check if this library is valid. std::string checksum; + int64_t file_size = 0; + // library file std::string lib_file; @@ -136,16 +143,34 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std } else if (ends_with(file, ".jar")) { lib_type = LibType::JAR; } else { - return Status::InternalError("unknown library file format: " + file); + return Status::InternalError( + "unknown library file format. the file type is not end with jar or so.: " + file); } std::vector split_parts = strings::Split(file, "."); - if (split_parts.size() != 3 && split_parts.size() != 4) { + // have to consider old version after spilt size + if (split_parts.size() != 3 && split_parts.size() != 4 && split_parts.size() != 5) { return Status::InternalError( - "user function's name should be function_id.checksum[.file_name].file_type"); + "user function's name should be " + "function_id.checksum.file_size[.file_name].file_type and maybe you file name " + "contians .(dot) " + + file); } int64_t function_id = std::stol(split_parts[0]); std::string checksum = split_parts[1]; + int64_t file_size = 0; + LOG(INFO) << "dir: file: " << dir << " , " << file << " " << split_parts.size(); + for (auto s : split_parts) { + LOG(INFO) << "ss: " << s; + } + if (split_parts.size() == 5) { + try { + file_size = std::stol(split_parts[2]); + } catch (const std::invalid_argument& e) { + LOG(INFO) << "dir: file: size: " << dir << " , " << file << " " << split_parts.size() + << " " << e.what(); + } + } auto it = _entry_map.find(function_id); if (it != _entry_map.end()) { LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id @@ -155,7 +180,7 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std } // create a cache entry and put it into entry map std::shared_ptr entry = UserFunctionCacheEntry::create_shared( - function_id, checksum, dir + "/" + file, lib_type); + function_id, checksum, dir + "/" + file, file_size, lib_type); entry->is_downloaded = true; _entry_map[function_id] = entry; @@ -195,11 +220,12 @@ std::string get_real_symbol(const std::string& symbol) { } Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, - const std::string& checksum, + const std::string& checksum, int64_t file_size, std::shared_ptr& output_entry, LibType type) { std::shared_ptr entry = nullptr; std::string file_name = _get_file_name_from_url(url); + LOG(INFO) << "_get_cache_entry: " << url << " " << file_name; { std::lock_guard l(_cache_lock); auto it = _entry_map.find(fid); @@ -207,7 +233,8 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, entry = it->second; } else { entry = UserFunctionCacheEntry::create_shared( - fid, checksum, _make_lib_file(fid, checksum, type, file_name), type); + fid, checksum, _make_lib_file(fid, checksum, file_size, type, file_name), + file_size, type); _entry_map.emplace(fid, entry); } } @@ -271,10 +298,13 @@ Status UserFunctionCache::_download_lib(const std::string& url, Md5Digest digest; HttpClient client; + int64_t file_size = 0; RETURN_IF_ERROR(client.init(real_url)); Status status; - auto download_cb = [&status, &tmp_file, &fp, &digest](const void* data, size_t length) { + auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const void* data, + size_t length) { digest.update(data, length); + file_size = file_size + length; auto res = fwrite(data, length, 1, fp.get()); if (res != 1) { LOG(WARNING) << "fail to write data to file, file=" << tmp_file @@ -288,9 +318,18 @@ Status UserFunctionCache::_download_lib(const std::string& url, RETURN_IF_ERROR(status); digest.digest(); if (!iequal(digest.hex(), entry->checksum)) { - LOG(WARNING) << "UDF's checksum is not equal, one=" << digest.hex() - << ", other=" << entry->checksum; - return Status::InternalError("UDF's library checksum is not match"); + fmt::memory_buffer error_msg; + fmt::format_to( + error_msg, + " The checksum is not equal of {} ({}). The init info of first create is " + "check_sum: " + "{}, file_size: {}, lib_file: {}. But download is check_sum is: {}, file_size " + "is: {}", + url, real_url, entry->checksum, entry->file_size, entry->lib_file, digest.hex(), + file_size); + + LOG(WARNING) << fmt::to_string(error_msg); + return Status::InternalError(fmt::to_string(error_msg)); } // close this file fp.reset(); @@ -336,22 +375,25 @@ Status UserFunctionCache::_load_cache_entry_internal( } std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum, - LibType type, const std::string& file_name) { + int64_t file_size, LibType type, + const std::string& file_name) { int shard = function_id % kLibShardNum; std::stringstream ss; - ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum; + ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum << '.' << file_size; if (type == LibType::JAR) { ss << '.' << file_name; } else { ss << ".so"; } + LOG(INFO) << "_make_lib_file: " << ss.str(); return ss.str(); } Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url, - const std::string& checksum, std::string* libpath) { + const std::string& checksum, int64_t file_size, + std::string* libpath) { std::shared_ptr entry = nullptr; - RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR)); + RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, file_size, entry, LibType::JAR)); *libpath = entry->lib_file; return Status::OK(); } diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h index e58f02294b438ca..b3e891a47b55845 100644 --- a/be/src/runtime/user_function_cache.h +++ b/be/src/runtime/user_function_cache.h @@ -57,19 +57,20 @@ class UserFunctionCache { static UserFunctionCache* instance(); Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, - std::string* libpath); + int64_t file_size, std::string* libpath); private: Status _load_cached_lib(); Status _load_entry_from_lib(const std::string& dir, const std::string& file); Status _get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum, + int64_t file_size, std::shared_ptr& output_entry, LibType type); Status _load_cache_entry(const std::string& url, std::shared_ptr entry); Status _download_lib(const std::string& url, std::shared_ptr entry); Status _load_cache_entry_internal(std::shared_ptr entry); - std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type, - const std::string& file_name); + std::string _make_lib_file(int64_t function_id, const std::string& checksum, int64_t file_size, + LibType type, const std::string& file_name); void _destroy_cache_entry(std::shared_ptr entry); std::string _get_real_url(const std::string& url); diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index fa0c4efd9d72942..ca96e35eeb0a30e 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -448,7 +448,9 @@ class AggregateJavaUdaf final //So need to check as soon as possible, before call Data function Status check_udaf(const TFunction& fn) { auto function_cache = UserFunctionCache::instance(); - return function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum, &_local_location); + int64_t file_size = fn.__isset.file_size ? fn.file_size : -1; + return function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum, file_size, + &_local_location); } void create(AggregateDataPtr __restrict place) const override { diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index bc8bf6e044b3d4e..bb5fc0a59e22dd0 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -88,6 +88,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con _jdbc_param.tuple_desc = _tuple_desc; _jdbc_param.query_string = std::move(_query_string); _jdbc_param.table_type = _table_type; + _jdbc_param.file_size = jdbc_table->file_size(); get_parent()->_scanner_profile->add_info_string("JdbcDriverClass", _jdbc_param.driver_class); get_parent()->_scanner_profile->add_info_string("JdbcDriverUrl", _jdbc_param.driver_path); diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 018606ea87fed0d..7335345ed21e31c 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -140,12 +140,12 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer); RETURN_IF_ERROR(function_cache->get_jarpath( std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path, - _conn_param.driver_checksum, &local_location)); + _conn_param.driver_checksum, _conn_param.file_size, &local_location)); } else { SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer); RETURN_IF_ERROR(function_cache->get_jarpath( std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path, - _conn_param.driver_checksum, &local_location)); + _conn_param.driver_checksum, _conn_param.file_size, &local_location)); } VLOG_QUERY << "driver local path = " << local_location; diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 2e25eb3353eedc9..f2bcbbd48a90dae 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -52,6 +53,7 @@ struct JdbcConnectorParam { std::string passwd; std::string query_string; TOdbcTableType::type table_type; + int64_t file_size; const TupleDescriptor* tuple_desc; }; diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index 9305bae94960d9d..964c1b6290ca107 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -79,8 +79,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio { std::string local_location; auto function_cache = UserFunctionCache::instance(); + int64_t file_size = fn_.__isset.file_size ? fn_.file_size : -1; RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum, - &local_location)); + file_size, &local_location)); TJavaUdfExecutorCtorParams ctor_params; ctor_params.__set_fn(fn_); ctor_params.__set_location(local_location); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index b1e42d343adf0e9..646b719d486c2e1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -66,9 +66,10 @@ public final class FeMetaVersion { public static final int VERSION_122 = 122; // For AnalysisInfo public static final int VERSION_123 = 123; - + // For udf Function, add file size field + public static final int VERSION_124 = 124; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_123; + public static final int VERSION_CURRENT = VERSION_124; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index 06422e9b357e0c3..a5b31d4b0459b56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -40,6 +40,7 @@ import org.apache.doris.proto.Types; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFunctionBinaryType; +import org.apache.doris.thrift.TPaloBrokerService.AsyncProcessor.fileSize; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -112,6 +113,7 @@ public class CreateFunctionStmt extends DdlStmt { private String userFile; private Function function; private String checksum = ""; + private long fileSize = 0; // now set udf default NullableMode is ALWAYS_NULLABLE // if not, will core dump when input is not null column, but need return null // like https://github.com/apache/doris/pull/14002/files @@ -290,7 +292,8 @@ private void computeObjectChecksum() throws IOException, NoSuchAlgorithmExceptio } digest.update(buf, 0, bytesRead); } while (true); - + fileSize = bytesRead; + LOG.info("void computeObjectChecksum() ------------------------------------------------- " + fileSize); checksum = Hex.encodeHexString(digest.digest()); } } @@ -353,6 +356,7 @@ private void analyzeUda() throws AnalysisException { function.setBinaryType(binaryType); function.setChecksum(checksum); function.setNullableMode(returnNullMode); + // function.setFileSize(fileSize); } private void analyzeUdf() throws AnalysisException { @@ -379,6 +383,7 @@ private void analyzeUdf() throws AnalysisException { location, symbol, prepareFnSymbol, closeFnSymbol); function.setChecksum(checksum); function.setNullableMode(returnNullMode); + // function.setFileSize(fileSize); } private void analyzeJavaUdaf(String clazz) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java index 4664b996467746e..558153d2d8421ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.FunctionName; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; import org.apache.doris.common.io.IOUtils; import org.apache.doris.common.io.Text; @@ -132,6 +133,8 @@ public enum NullableMode { // library's checksum to make sure all backends use one library to serve user's request protected String checksum = ""; + protected long fileSize = 0; + // If true, this function is global function protected boolean isGlobal = false; @@ -192,6 +195,7 @@ public Function(Function other) { System.arraycopy(other.argTypes, 0, this.argTypes, 0, other.argTypes.length); } this.checksum = other.checksum; + this.fileSize = other.fileSize; } public void setNestedFunction(Function nestedFunction) { @@ -299,6 +303,14 @@ public String getChecksum() { return checksum; } + public void setFileSize(long fileSize) { + this.fileSize = fileSize; + } + + public long getFileSize() { + return fileSize; + } + public boolean isGlobal() { return isGlobal; } @@ -557,6 +569,7 @@ public TFunction toThrift(Type realReturnType, Type[] realArgTypes, Boolean[] re fn.setChecksum(checksum); } fn.setVectorized(vectorized); + fn.setFileSize(fileSize); return fn; } @@ -664,6 +677,7 @@ protected void writeFields(DataOutput output) throws IOException { } IOUtils.writeOptionString(output, libUrl); IOUtils.writeOptionString(output, checksum); + // output.writeLong(fileSize); } @Override @@ -698,6 +712,10 @@ public void readFields(DataInput input) throws IOException { if (hasChecksum) { checksum = Text.readString(input); } + // if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_123) { + // return; //old version have not write file size field + // } + // fileSize = input.readLong(); } public static Function read(DataInput input) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 2e8a71f92aab18a..9321e90633bd133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -95,6 +95,8 @@ public class JdbcResource extends Resource { public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database"; public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names"; public static final String CHECK_SUM = "checksum"; + public static final String FILE_SIZE = "filesize"; + public static long fileSize = 0; private static final ImmutableList ALL_PROPERTIES = new ImmutableList.Builder().add( JDBC_URL, USER, @@ -184,6 +186,8 @@ protected void setProperties(Map properties) throws DdlException } this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL))); configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL))); + configs.put(FILE_SIZE, String.valueOf(fileSize)); + LOG.info("configs.put(FILE_SIZE, String.valueOf(fileSize)) " + fileSize); } /** @@ -241,6 +245,8 @@ public static String computeObjectChecksum(String driverPath) throws DdlExceptio } digest.update(buf, 0, bytesRead); } while (true); + fileSize = bytesRead; + LOG.info("static String computeObjectChecksum: " + fileSize); return Hex.encodeHexString(digest.digest()); } catch (IOException e) { throw new DdlException("compute driver checksum from url: " + driverPath diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index e2fac4ee53a1a17..0b422ca5efcbc71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -55,6 +55,7 @@ public class JdbcTable extends Table { private static final String DRIVER_CLASS = "driver_class"; private static final String DRIVER_URL = "driver_url"; private static final String CHECK_SUM = "checksum"; + private static final String FILE_SIZE = "filesize"; private static Map TABLE_TYPE_MAP; private String resourceName; private String externalTableName; @@ -66,6 +67,7 @@ public class JdbcTable extends Table { private String driverClass; private String driverUrl; private String checkSum; + private String fileSize; static { Map tempMap = new CaseInsensitiveMap(); @@ -119,6 +121,10 @@ public String getCheckSum() { return checkSum; } + public String getFileSize() { + return fileSize; + } + public String getExternalTableName() { return externalTableName; } @@ -169,6 +175,8 @@ public TTableDescriptor toThrift() { tJdbcTable.setJdbcDriverUrl(getDriverUrl()); tJdbcTable.setJdbcResourceName(resourceName); tJdbcTable.setJdbcDriverChecksum(checkSum); + tJdbcTable.setJdbcFileSize(Long.parseLong(fileSize)); + LOG.info("public TTableDescriptor toThrift(): " + Long.parseLong(fileSize)); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setJdbcTable(tJdbcTable); @@ -188,7 +196,8 @@ public void write(DataOutput out) throws IOException { serializeMap.put(DRIVER_CLASS, driverClass); serializeMap.put(DRIVER_URL, driverUrl); serializeMap.put(CHECK_SUM, checkSum); - + serializeMap.put(FILE_SIZE, fileSize); + LOG.info("write(DataInput in): " + fileSize); int size = (int) serializeMap.values().stream().filter(v -> { return v != null; }).count(); @@ -222,6 +231,8 @@ public void readFields(DataInput in) throws IOException { driverClass = serializeMap.get(DRIVER_CLASS); driverUrl = serializeMap.get(DRIVER_URL); checkSum = serializeMap.get(CHECK_SUM); + fileSize = serializeMap.get(FILE_SIZE); + LOG.info("readFields(DataInput in): " + fileSize); } public String getResourceName() { @@ -253,7 +264,8 @@ public String getSignature(int signatureVersion) { sb.append(driverClass); sb.append(driverUrl); sb.append(checkSum); - + sb.append(fileSize); + LOG.info("getSignature(int signatureVersion): " + fileSize); String md5 = DigestUtils.md5Hex(sb.toString()); LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString()); return md5; @@ -309,7 +321,8 @@ private void validate(Map properties) throws DdlException { driverClass = jdbcResource.getProperty(DRIVER_CLASS); driverUrl = jdbcResource.getProperty(DRIVER_URL); checkSum = jdbcResource.getProperty(CHECK_SUM); - + fileSize = jdbcResource.getProperty(FILE_SIZE); + LOG.info("validate(Map properties): " + fileSize); String urlType = jdbcUrl.split(":")[1]; if (!jdbcTypeName.equalsIgnoreCase(urlType)) { if (!(jdbcTypeName.equalsIgnoreCase("oceanbase_oracle") && urlType.equalsIgnoreCase("oceanbase"))) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java index 1f1fd8dcb08b50c..98ca30b7fd68c31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java @@ -91,6 +91,7 @@ private JdbcTable toJdbcTable() { jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl()); jdbcTable.setResourceName(jdbcCatalog.getResource()); jdbcTable.setCheckSum(jdbcCatalog.getCheckSum()); + jdbcTable.setFileSize(jdbcCatalog.getFileSize()); return jdbcTable; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java index 583cceddd68701b..2728f55f43cad3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java @@ -83,6 +83,7 @@ private Map processCompatibleProperties(Map prop if (properties.containsKey(JdbcResource.DRIVER_URL) && !properties.containsKey(JdbcResource.CHECK_SUM)) { properties.put(JdbcResource.CHECK_SUM, JdbcResource.computeObjectChecksum(properties.get(JdbcResource.DRIVER_URL))); + properties.put(JdbcResource.FILE_SIZE, String.valueOf(JdbcResource.fileSize)); } return properties; } @@ -115,6 +116,10 @@ public String getCheckSum() { return catalogProperty.getOrDefault(JdbcResource.CHECK_SUM, ""); } + public String getFileSize() { + return catalogProperty.getOrDefault(JdbcResource.FILE_SIZE, ""); + } + public String getOnlySpecifiedDatabase() { return catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, "false"); } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index d8fd59c101655c3..c651721db8614c1 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -308,7 +308,7 @@ struct TJdbcTable { 6: optional string jdbc_resource_name 7: optional string jdbc_driver_class 8: optional string jdbc_driver_checksum - + 9: optional i64 jdbc_file_size } struct TMCTable { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 9dec29db22ca209..f5d9c06d1614053 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -375,6 +375,8 @@ struct TFunction { 11: optional i64 id 12: optional string checksum 13: optional bool vectorized = false + //like checksum, record the file info of size + 14: optional i64 file_size } enum TJdbcOperation {