Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-24413, DX-24414 : Gandiva cache peformance improvements #15

Open
wants to merge 1 commit into
base: rel-460
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ else()
BOOST_SOURCE_URL
# These are trimmed boost bundles we maintain.
# See cpp/build_support/trim-boost.sh
"https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
"https://dl.bintray.com/boostorg/release/${ARROW_BOOST_BUILD_VERSION}/source/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
"https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
"https://github.com/boostorg/boost/archive/boost-${ARROW_BOOST_BUILD_VERSION}.tar.gz"
# FIXME(ARROW-6407) automate uploading this archive to ensure it reflects
# our currently used packages and doesn't fall out of sync with
Expand Down
27 changes: 25 additions & 2 deletions cpp/src/gandiva/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <cstdlib>
#include <iostream>
#include <mutex>

#include "gandiva/lru_cache.h"
Expand All @@ -26,7 +28,12 @@ namespace gandiva {
template <class KeyType, typename ValueType>
class Cache {
public:
explicit Cache(size_t capacity = CACHE_SIZE) : cache_(capacity) {}
explicit Cache(size_t capacity) : cache_(capacity) {
std::cout << "Creating gandiva cache with capacity: " << capacity << std::endl;
}

Cache() : Cache(GetCapacity()) {}

ValueType GetModule(KeyType cache_key) {
arrow::util::optional<ValueType> result;
mtx_.lock();
Expand All @@ -42,8 +49,24 @@ class Cache {
}

private:
static int GetCapacity() {
int capacity;
const char* env_cache_size = std::getenv("GANDIVA_CACHE_SIZE");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cant we instead pass this as a configuration (that is in turn passed from a support option) ? that way we can support other customers too..

https://github.com/apache/arrow/blob/master/cpp/src/gandiva/configuration.h

if (env_cache_size != nullptr) {
capacity = std::atoi(env_cache_size);
if (capacity <= 0) {
std::cout << "Invalid cache size provided. Using default cache size."
<< std::endl;
capacity = DEFAULT_CACHE_SIZE;
}
} else {
capacity = DEFAULT_CACHE_SIZE;
}
return capacity;
}

LruCache<KeyType, ValueType> cache_;
static const int CACHE_SIZE = 250;
static const int DEFAULT_CACHE_SIZE = 500;
std::mutex mtx_;
};
} // namespace gandiva
2 changes: 1 addition & 1 deletion cpp/src/gandiva/jni/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/Types.pb.h")
set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/java")
add_subdirectory(../../../../java/gandiva ./java/gandiva)

set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF})
set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF} ${BOOST_FILESYSTEM_LIBRARY})
if(ARROW_BUILD_STATIC)
list(APPEND GANDIVA_LINK_LIBS gandiva_static)
else()
Expand Down
218 changes: 213 additions & 5 deletions cpp/src/gandiva/jni/jni_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/builder.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>
#include <google/protobuf/io/coded_stream.h>

#include <boost/filesystem.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fstream>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
Expand All @@ -25,10 +34,6 @@
#include <utility>
#include <vector>

#include <arrow/builder.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>

#include "Types.pb.h"
#include "gandiva/configuration.h"
#include "gandiva/filter.h"
Expand Down Expand Up @@ -62,11 +67,15 @@ using gandiva::ConfigurationBuilder;
using gandiva::FilterHolder;
using gandiva::ProjectorHolder;

namespace fs = boost::filesystem;

// forward declarations
NodePtr ProtoTypeToNode(const types::TreeNode& node);

static jint JNI_VERSION = JNI_VERSION_1_6;

static const char* kEnvPrewarmCacheDir = "GANDIVA_PREWARM_CACHE_DIR";

// extern refs - initialized for other modules.
jclass configuration_builder_class_;

Expand All @@ -82,6 +91,8 @@ static jfieldID vector_expander_ret_capacity_;
gandiva::IdToModuleMap<std::shared_ptr<ProjectorHolder>> projector_modules_;
gandiva::IdToModuleMap<std::shared_ptr<FilterHolder>> filter_modules_;

void PrewarmCache();

jint JNI_OnLoad(JavaVM* vm, void* reserved) {
JNIEnv* env;
if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
Expand Down Expand Up @@ -117,6 +128,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env->GetFieldID(vector_expander_ret_class_, "address", "J");
vector_expander_ret_capacity_ =
env->GetFieldID(vector_expander_ret_class_, "capacity", "J");

PrewarmCache();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this going to block all queries until the cache is warmed? should we do this in the background instead for e.g. if we have a 100 expression list this is going to take upto a minute to prepare the cache during which no query can progress


return JNI_VERSION;
}

Expand Down Expand Up @@ -572,6 +586,192 @@ void releaseProjectorInput(jbyteArray schema_arr, jbyte* schema_bytes,
env->ReleaseByteArrayElements(exprs_arr, exprs_bytes, JNI_ABORT);
}

void PrewarmCache() {
try {
auto prewarm_cache_dir = std::getenv(kEnvPrewarmCacheDir);
if (prewarm_cache_dir == nullptr) {
std::cout << "[Gandiva Cache Prewarm] No cache directory env variable is set. "
"Skipping prewarming"
<< std::endl;
return;
}

fs::path path(prewarm_cache_dir);
std::cout << path.string() << "\n";
if (!fs::is_directory(path)) {
std::cerr << "[Gandiva Cache Prewarm] Prewarm cache dir env variable set is not a "
"directory"
<< std::endl;
return;
}

fs::directory_iterator end_iter;
for (fs::directory_iterator dir_iter(path); dir_iter != end_iter; ++dir_iter) {
try {
if (fs::is_regular_file(dir_iter->status())) {
std::ifstream fin;
fin.open(dir_iter->path().string(), std::ios::binary);
if (!fin.is_open()) {
std::cerr << "[Gandiva Cache Prewarm] Failure opening warmup cache file"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe also mention which file..

<< std::endl;
continue;
}

fin.seekg(0, std::ios::end);
size_t remaining = fin.tellg();
fin.seekg(0, std::ios::beg);

int32_t schema_len;
if (remaining < sizeof(schema_len)) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

across the board - file name along with errors.

continue;
}
fin.read(reinterpret_cast<char*>(&schema_len), sizeof schema_len);
remaining -= sizeof(schema_len);

if (remaining < (size_t)schema_len) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
continue;
}
std::vector<char> schema_vec(schema_len);
fin.read(schema_vec.data(), schema_len);
remaining -= (size_t)schema_len;

int32_t exprs_len;
if (remaining < sizeof(exprs_len)) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
continue;
}
fin.read(reinterpret_cast<char*>(&exprs_len), sizeof exprs_len);
remaining -= sizeof(exprs_len);

if (remaining != (size_t)exprs_len) {
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
continue;
}
std::vector<char> exprs_vec(exprs_len);
fin.read(exprs_vec.data(), exprs_len);

fin.close();

uint8_t* schema_bytes = reinterpret_cast<uint8_t*>(schema_vec.data());
uint8_t* exprs_bytes = reinterpret_cast<uint8_t*>(exprs_vec.data());

std::shared_ptr<Projector> projector;
types::Schema schema;
types::ExpressionList exprs;
ExpressionVector expr_vector;
SchemaPtr schema_ptr;
FieldVector ret_types;
gandiva::Status status;
auto mode = gandiva::SelectionVector::MODE_NONE;

ConfigurationBuilder configuration_builder;
std::shared_ptr<Configuration> config = configuration_builder.build();

if (!ParseProtobuf(schema_bytes, schema_len, &schema)) {
std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for schema"
<< std::endl;
continue;
}

if (!ParseProtobuf(exprs_bytes, exprs_len, &exprs)) {
std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for expr list"
<< std::endl;
continue;
}

// convert types::Schema to arrow::Schema
schema_ptr = ProtoTypeToSchema(schema);
if (schema_ptr == nullptr) {
std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf schema to "
"arrow type"
<< std::endl;
continue;
}

// create Expression out of the list of exprs
for (int i = 0; i < exprs.exprs_size(); i++) {
ExpressionPtr root = ProtoTypeToExpression(exprs.exprs(i));

if (root == nullptr) {
std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf expr to "
"arrow type"
<< std::endl;
continue;
}

expr_vector.push_back(root);
ret_types.push_back(root->result());
}

status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector);

if (!status.ok()) {
std::cerr << "[Gandiva Cache Prewarm] Failed to create a projector module";
continue;
}
std::cout << "[Gandiva Cache Prewarm] Built and cached a projector from the "
"expression and schema in the file"
<< std::endl;
}
} catch (const std::exception& ex) {
std::cerr << "[Gandiva Cache Prewarm] " << dir_iter->path().filename() << " "
<< ex.what() << std::endl;
}
}

} catch (const std::exception& ex) {
std::cerr << "[Gandiva Cache Prewarm] Failed prewarming the cache: " << ex.what()
<< std::endl;
}
}

void CacheExpressionAndSchemaForPrewarmOnStartup(char* schema_bytes, int32_t schema_len,
char* exprs_bytes, int32_t exprs_len) {
try {
auto env_path = std::getenv(kEnvPrewarmCacheDir);
if (env_path == nullptr) {
return;
}

fs::path path(env_path);
if (!fs::is_directory(path) && !fs::create_directories(path)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this thread safe? two threads trying to create the directory at the same time

std::cerr << "[CacheExpression] Failed to create directory to save the schema and "
"expressions"
<< std::endl;
return;
}

boost::uuids::uuid uuid = boost::uuids::random_generator()();

path /= boost::uuids::to_string(uuid);

std::ofstream fout;
fout.open(path.string(), std::ios::binary | std::ios::out);
if (!fout.is_open()) {
std::cerr
<< "[CacheExpression] Failed to open file to write the schema and expression"
<< std::endl;
return;
}

fout.write(reinterpret_cast<char*>(&schema_len), sizeof(schema_len));
fout.write(schema_bytes, schema_len);

fout.write(reinterpret_cast<char*>(&exprs_len), sizeof(exprs_len));
fout.write(exprs_bytes, exprs_len);

fout.close();

std::cout << "[CacheExpression] Cached schema and expression bytes to a file"
<< std::endl;
} catch (const std::exception& ex) {
std::cerr << "[CacheExpression] Failed to cache the expression to file " << ex.what()
<< std::endl;
}
}

JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_buildProjector(
JNIEnv* env, jobject obj, jbyteArray schema_arr, jbyteArray exprs_arr,
jint selection_vector_type, jlong configuration_id) {
Expand Down Expand Up @@ -648,7 +848,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build
break;
}
// good to invoke the evaluator now
status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector);
bool cache_hit;
status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector, &cache_hit);

if (!status.ok()) {
ss << "Failed to make LLVM module due to " << status.message() << "\n";
Expand All @@ -660,6 +861,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build
holder = std::shared_ptr<ProjectorHolder>(
new ProjectorHolder(schema_ptr, ret_types, std::move(projector)));
module_id = projector_modules_.Insert(holder);

if (!cache_hit) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so is the expectation, apple will get files from all executors? wont there be duplicates in that case..

also is the expectation that cache size be at-least as big to accommodate all expressions needed?

are new queries expected in this cluster - if yes and new a query comes in now we might evict one of these expressions. is that ok

CacheExpressionAndSchemaForPrewarmOnStartup(
reinterpret_cast<char*>(schema_bytes), schema_len,
reinterpret_cast<char*>(exprs_bytes), exprs_len);
}

releaseProjectorInput(schema_arr, schema_bytes, exprs_arr, exprs_bytes, env);
return module_id;

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/gandiva/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <iostream>
#include <list>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -107,6 +108,7 @@ class LruCache {

private:
void evict() {
std::cout << "Evicted a cache item from the lru cache" << std::endl;
// evict item from the end of most recently used list
typename list_type::iterator i = --lru_list_.end();
map_.erase(*i);
Expand Down
Loading