Skip to content

Commit

Permalink
[enchanment](udf) add more info when download package of jar failed
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Jul 3, 2023
1 parent b7d6a70 commit 1c70b06
Show file tree
Hide file tree
Showing 18 changed files with 138 additions and 33 deletions.
8 changes: 5 additions & 3 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,19 @@ JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
_jdbc_url(tdesc.jdbcTable.jdbc_url),
_jdbc_table_name(tdesc.jdbcTable.jdbc_table_name),
_jdbc_user(tdesc.jdbcTable.jdbc_user),
_jdbc_passwd(tdesc.jdbcTable.jdbc_password) {}
_jdbc_passwd(tdesc.jdbcTable.jdbc_password),
_file_size(tdesc.jdbcTable.__isset.jdbc_file_size ? tdesc.jdbcTable.jdbc_file_size : -1) {
}

std::string JdbcTableDescriptor::debug_string() const {
fmt::memory_buffer buf;
fmt::format_to(buf,
"JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} "
",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={})",
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={}, _jdbc_file_size={})",
TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url,
_jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name,
_jdbc_user, _jdbc_passwd);
_jdbc_user, _jdbc_passwd, _file_size);
return fmt::to_string(buf);
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <google/protobuf/stubs/port.h>
#include <stdint.h>

#include <cstdint>
#include <ostream>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -307,6 +308,7 @@ class JdbcTableDescriptor : public TableDescriptor {
const std::string& jdbc_table_name() const { return _jdbc_table_name; }
const std::string& jdbc_user() const { return _jdbc_user; }
const std::string& jdbc_passwd() const { return _jdbc_passwd; }
int64_t file_size() const { return _file_size; }

private:
std::string _jdbc_resource_name;
Expand All @@ -317,6 +319,7 @@ class JdbcTableDescriptor : public TableDescriptor {
std::string _jdbc_table_name;
std::string _jdbc_user;
std::string _jdbc_passwd;
int64_t _file_size;
};

class TupleDescriptor {
Expand Down
74 changes: 58 additions & 16 deletions be/src/runtime/user_function_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <unistd.h>

#include <atomic>
#include <cstdint>
#include <memory>
#include <ostream>
#include <regex>
Expand All @@ -51,14 +52,20 @@ static const int kLibShardNum = 128;
struct UserFunctionCacheEntry {
ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_,
LibType type)
: function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {}
int64_t file_size_, LibType type)
: function_id(fid_),
checksum(checksum_),
file_size(file_size_),
lib_file(lib_file_),
type(type) {}
~UserFunctionCacheEntry();

int64_t function_id = 0;
// used to check if this library is valid.
std::string checksum;

int64_t file_size = 0;

// library file
std::string lib_file;

Expand Down Expand Up @@ -136,16 +143,34 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
} else if (ends_with(file, ".jar")) {
lib_type = LibType::JAR;
} else {
return Status::InternalError("unknown library file format: " + file);
return Status::InternalError(
"unknown library file format. the file type is not end with jar or so.: " + file);
}

std::vector<std::string> split_parts = strings::Split(file, ".");
if (split_parts.size() != 3 && split_parts.size() != 4) {
// have to consider old version after spilt size
if (split_parts.size() != 3 && split_parts.size() != 4 && split_parts.size() != 5) {
return Status::InternalError(
"user function's name should be function_id.checksum[.file_name].file_type");
"user function's name should be "
"function_id.checksum.file_size[.file_name].file_type and maybe you file name "
"contians .(dot) " +
file);
}
int64_t function_id = std::stol(split_parts[0]);
std::string checksum = split_parts[1];
int64_t file_size = 0;
LOG(INFO) << "dir: file: " << dir << " , " << file << " " << split_parts.size();
for (auto s : split_parts) {
LOG(INFO) << "ss: " << s;
}
if (split_parts.size() == 5) {
try {
file_size = std::stol(split_parts[2]);
} catch (const std::invalid_argument& e) {
LOG(INFO) << "dir: file: size: " << dir << " , " << file << " " << split_parts.size()
<< " " << e.what();
}
}
auto it = _entry_map.find(function_id);
if (it != _entry_map.end()) {
LOG(WARNING) << "meet a same function id user function library, function_id=" << function_id
Expand All @@ -155,7 +180,7 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
}
// create a cache entry and put it into entry map
std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared(
function_id, checksum, dir + "/" + file, lib_type);
function_id, checksum, dir + "/" + file, file_size, lib_type);
entry->is_downloaded = true;
_entry_map[function_id] = entry;

Expand Down Expand Up @@ -195,19 +220,21 @@ std::string get_real_symbol(const std::string& symbol) {
}

Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
const std::string& checksum,
const std::string& checksum, int64_t file_size,
std::shared_ptr<UserFunctionCacheEntry>& output_entry,
LibType type) {
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
std::string file_name = _get_file_name_from_url(url);
LOG(INFO) << "_get_cache_entry: " << url << " " << file_name;
{
std::lock_guard<std::mutex> l(_cache_lock);
auto it = _entry_map.find(fid);
if (it != _entry_map.end()) {
entry = it->second;
} else {
entry = UserFunctionCacheEntry::create_shared(
fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
fid, checksum, _make_lib_file(fid, checksum, file_size, type, file_name),
file_size, type);
_entry_map.emplace(fid, entry);
}
}
Expand Down Expand Up @@ -271,10 +298,13 @@ Status UserFunctionCache::_download_lib(const std::string& url,

Md5Digest digest;
HttpClient client;
int64_t file_size = 0;
RETURN_IF_ERROR(client.init(real_url));
Status status;
auto download_cb = [&status, &tmp_file, &fp, &digest](const void* data, size_t length) {
auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const void* data,
size_t length) {
digest.update(data, length);
file_size = file_size + length;
auto res = fwrite(data, length, 1, fp.get());
if (res != 1) {
LOG(WARNING) << "fail to write data to file, file=" << tmp_file
Expand All @@ -288,9 +318,18 @@ Status UserFunctionCache::_download_lib(const std::string& url,
RETURN_IF_ERROR(status);
digest.digest();
if (!iequal(digest.hex(), entry->checksum)) {
LOG(WARNING) << "UDF's checksum is not equal, one=" << digest.hex()
<< ", other=" << entry->checksum;
return Status::InternalError("UDF's library checksum is not match");
fmt::memory_buffer error_msg;
fmt::format_to(
error_msg,
" The checksum is not equal of {} ({}). The init info of first create is "
"check_sum: "
"{}, file_size: {}, lib_file: {}. But download is check_sum is: {}, file_size "
"is: {}",
url, real_url, entry->checksum, entry->file_size, entry->lib_file, digest.hex(),
file_size);

LOG(WARNING) << fmt::to_string(error_msg);
return Status::InternalError(fmt::to_string(error_msg));
}
// close this file
fp.reset();
Expand Down Expand Up @@ -336,22 +375,25 @@ Status UserFunctionCache::_load_cache_entry_internal(
}

std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum,
LibType type, const std::string& file_name) {
int64_t file_size, LibType type,
const std::string& file_name) {
int shard = function_id % kLibShardNum;
std::stringstream ss;
ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum;
ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum << '.' << file_size;
if (type == LibType::JAR) {
ss << '.' << file_name;
} else {
ss << ".so";
}
LOG(INFO) << "_make_lib_file: " << ss.str();
return ss.str();
}

Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
const std::string& checksum, std::string* libpath) {
const std::string& checksum, int64_t file_size,
std::string* libpath) {
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, file_size, entry, LibType::JAR));
*libpath = entry->lib_file;
return Status::OK();
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/user_function_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,20 @@ class UserFunctionCache {
static UserFunctionCache* instance();

Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum,
std::string* libpath);
int64_t file_size, std::string* libpath);

private:
Status _load_cached_lib();
Status _load_entry_from_lib(const std::string& dir, const std::string& file);
Status _get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum,
int64_t file_size,
std::shared_ptr<UserFunctionCacheEntry>& output_entry, LibType type);
Status _load_cache_entry(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
Status _download_lib(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
Status _load_cache_entry_internal(std::shared_ptr<UserFunctionCacheEntry> entry);

std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type,
const std::string& file_name);
std::string _make_lib_file(int64_t function_id, const std::string& checksum, int64_t file_size,
LibType type, const std::string& file_name);
void _destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry);

std::string _get_real_url(const std::string& url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ class AggregateJavaUdaf final
//So need to check as soon as possible, before call Data function
Status check_udaf(const TFunction& fn) {
auto function_cache = UserFunctionCache::instance();
return function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum, &_local_location);
int64_t file_size = fn.__isset.file_size ? fn.file_size : -1;
return function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum, file_size,
&_local_location);
}

void create(AggregateDataPtr __restrict place) const override {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/new_jdbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con
_jdbc_param.tuple_desc = _tuple_desc;
_jdbc_param.query_string = std::move(_query_string);
_jdbc_param.table_type = _table_type;
_jdbc_param.file_size = jdbc_table->file_size();

get_parent()->_scanner_profile->add_info_string("JdbcDriverClass", _jdbc_param.driver_class);
get_parent()->_scanner_profile->add_info_string("JdbcDriverUrl", _jdbc_param.driver_path);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
RETURN_IF_ERROR(function_cache->get_jarpath(
std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path,
_conn_param.driver_checksum, &local_location));
_conn_param.driver_checksum, _conn_param.file_size, &local_location));
} else {
SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
RETURN_IF_ERROR(function_cache->get_jarpath(
std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
_conn_param.driver_checksum, &local_location));
_conn_param.driver_checksum, _conn_param.file_size, &local_location));
}
VLOG_QUERY << "driver local path = " << local_location;

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <jni.h>
#include <stdint.h>

#include <cstdint>
#include <map>
#include <string>
#include <vector>
Expand Down Expand Up @@ -52,6 +53,7 @@ struct JdbcConnectorParam {
std::string passwd;
std::string query_string;
TOdbcTableType::type table_type;
int64_t file_size;

const TupleDescriptor* tuple_desc;
};
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/functions/function_java_udf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
{
std::string local_location;
auto function_cache = UserFunctionCache::instance();
int64_t file_size = fn_.__isset.file_size ? fn_.file_size : -1;
RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
&local_location));
file_size, &local_location));
TJavaUdfExecutorCtorParams ctor_params;
ctor_params.__set_fn(fn_);
ctor_params.__set_location(local_location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ public final class FeMetaVersion {
public static final int VERSION_122 = 122;
// For AnalysisInfo
public static final int VERSION_123 = 123;

// For udf Function, add file size field
public static final int VERSION_124 = 124;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_123;
public static final int VERSION_CURRENT = VERSION_124;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFunctionBinaryType;
import org.apache.doris.thrift.TPaloBrokerService.AsyncProcessor.fileSize;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class CreateFunctionStmt extends DdlStmt {
private String userFile;
private Function function;
private String checksum = "";
private long fileSize = 0;
// now set udf default NullableMode is ALWAYS_NULLABLE
// if not, will core dump when input is not null column, but need return null
// like https://github.com/apache/doris/pull/14002/files
Expand Down Expand Up @@ -290,7 +292,8 @@ private void computeObjectChecksum() throws IOException, NoSuchAlgorithmExceptio
}
digest.update(buf, 0, bytesRead);
} while (true);

fileSize = bytesRead;
LOG.info("void computeObjectChecksum() ------------------------------------------------- " + fileSize);
checksum = Hex.encodeHexString(digest.digest());
}
}
Expand Down Expand Up @@ -353,6 +356,7 @@ private void analyzeUda() throws AnalysisException {
function.setBinaryType(binaryType);
function.setChecksum(checksum);
function.setNullableMode(returnNullMode);
// function.setFileSize(fileSize);
}

private void analyzeUdf() throws AnalysisException {
Expand All @@ -379,6 +383,7 @@ private void analyzeUdf() throws AnalysisException {
location, symbol, prepareFnSymbol, closeFnSymbol);
function.setChecksum(checksum);
function.setNullableMode(returnNullMode);
// function.setFileSize(fileSize);
}

private void analyzeJavaUdaf(String clazz) throws AnalysisException {
Expand Down
Loading

0 comments on commit 1c70b06

Please sign in to comment.