Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ Client: Arrow deprecation; more style guide #4859

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp-client/deephaven/dhclient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ set(ALL_FILES
include/private/deephaven/client/subscription/subscribe_thread.h
include/private/deephaven/client/subscription/subscription_handle.h

src/utility/arrow_util.cc
src/utility/executor.cc
include/private/deephaven/client/utility/executor.h

src/utility/arrow_util.cc
src/utility/table_maker.cc

include/public/deephaven/client/utility/arrow_util.h
include/public/deephaven/client/utility/misc_types.h
include/public/deephaven/client/utility/table_maker.h
Expand Down
80 changes: 39 additions & 41 deletions cpp-client/deephaven/dhclient/src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,33 @@ const char *const Server::kAuthorizationKey = "authorization";

namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_Resp);
const ConfigurationConstantsResponse &cc_resp);

const char *timeoutKey = "http.session.durationMs";
constexpr const char *kTimeoutKey = "http.session.durationMs";

// A handshake resend interval to use as a default if our normal interval calculation
// fails, e.g. due to GRPC errors.
constexpr const auto kHandshakeResendInterval = std::chrono::seconds(5);
} // namespace

namespace {
std::shared_ptr<grpc::ChannelCredentials> getCredentials(
const bool useTls,
const std::string &tlsRootCerts,
const std::string &clientCertChain,
const std::string &clientPrivateKey) {
if (!useTls) {
std::shared_ptr<grpc::ChannelCredentials> GetCredentials(
const bool use_tls,
const std::string &tls_root_certs,
const std::string &client_root_chain,
const std::string &client_private_key) {
if (!use_tls) {
return grpc::InsecureChannelCredentials();
}
grpc::SslCredentialsOptions options;
if (!tlsRootCerts.empty()) {
options.pem_root_certs = tlsRootCerts;
if (!tls_root_certs.empty()) {
options.pem_root_certs = tls_root_certs;
}
if (!clientCertChain.empty()) {
options.pem_cert_chain = clientCertChain;
if (!client_root_chain.empty()) {
options.pem_cert_chain = client_root_chain;
}
if (!clientPrivateKey.empty()) {
options.pem_private_key = clientPrivateKey;
if (!client_private_key.empty()) {
options.pem_private_key = client_private_key;
}
return grpc::SslCredentials(options);
}
Expand All @@ -120,7 +120,7 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.generic_options.emplace_back(opt.first, opt.second);
}

auto credentials = getCredentials(
auto credentials = GetCredentials(
client_options.UseTls(),
client_options.TlsRootCerts(),
client_options.ClientCertChain(),
Expand All @@ -145,13 +145,12 @@ std::shared_ptr<Server> Server::CreateFromTarget(
auto its = InputTableService::NewStub(channel);

// TODO(kosak): Warn about this string conversion or do something more general.
auto flightTarget = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;
arrow::flight::Location location;
auto flight_target = ((client_options.UseTls()) ? "grpc+tls://" : "grpc://") + target;

auto rc1 = arrow::flight::Location::Parse(flightTarget, &location);
if (!rc1.ok()) {
auto location_res = arrow::flight::Location::Parse(flight_target);
if (!location_res.ok()) {
auto message = Stringf("Location::Parse(%o) failed, error = %o",
flightTarget, rc1.ToString());
flight_target, location_res.status());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

Expand All @@ -165,33 +164,32 @@ std::shared_ptr<Server> Server::CreateFromTarget(
options.private_key = client_options.ClientPrivateKey();
}

std::unique_ptr<arrow::flight::FlightClient> fc;
auto rc2 = arrow::flight::FlightClient::Connect(location, options, &fc);
if (!rc2.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", rc2.ToString());
auto client_res = arrow::flight::FlightClient::Connect(*location_res, options);
if (!client_res.ok()) {
auto message = Stringf("FlightClient::Connect() failed, error = %o", client_res.status());
throw std::runtime_error(message);
}
gpr_log(GPR_DEBUG,
"%s: "
"FlightClient(%p) created, "
"target=%s",
"Server::CreateFromTarget",
static_cast<void*>(fc.get()),
static_cast<void*>(client_res->get()),
target.c_str());

std::string sessionToken;
std::chrono::milliseconds expirationInterval;
auto sendTime = std::chrono::system_clock::now();
std::string session_token;
std::chrono::milliseconds expiration_interval;
auto send_time = std::chrono::system_clock::now();
{
ConfigurationConstantsRequest ccReq;
ConfigurationConstantsResponse ccResp;
ConfigurationConstantsRequest cc_req;
ConfigurationConstantsResponse cc_resp;
grpc::ClientContext ctx;
ctx.AddMetadata(kAuthorizationKey, client_options.AuthorizationValue());
for (const auto &header : client_options.ExtraHeaders()) {
ctx.AddMetadata(header.first, header.second);
}

auto result = cfs->GetConfigurationConstants(&ctx, ccReq, &ccResp);
auto result = cfs->GetConfigurationConstants(&ctx, cc_req, &cc_resp);

if (!result.ok()) {
auto message = Stringf("Can't get configuration constants. Error %o: %o",
Expand All @@ -205,29 +203,29 @@ std::shared_ptr<Server> Server::CreateFromTarget(
throw std::runtime_error(
DEEPHAVEN_LOCATION_STR("Configuration response didn't contain authorization token"));
}
sessionToken.assign(ip->second.begin(), ip->second.end());
session_token.assign(ip->second.begin(), ip->second.end());

// Get expiration interval.
auto expInt = ExtractExpirationInterval(ccResp);
if (expInt.has_value()) {
expirationInterval = *expInt;
auto exp_int = ExtractExpirationInterval(cc_resp);
if (exp_int.has_value()) {
expiration_interval = *exp_int;
} else {
expirationInterval = std::chrono::seconds(10);
expiration_interval = std::chrono::seconds(10);
}
}

auto nextHandshakeTime = sendTime + expirationInterval;
auto next_handshake_time = send_time + expiration_interval;

auto result = std::make_shared<Server>(Private(), std::move(as), std::move(cs),
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(fc),
client_options.ExtraHeaders(), std::move(sessionToken), expirationInterval, nextHandshakeTime);
std::move(ss), std::move(ts), std::move(cfs), std::move(its), std::move(*client_res),
client_options.ExtraHeaders(), std::move(session_token), expiration_interval, next_handshake_time);
result->keepAliveThread_ = std::thread(&SendKeepaliveMessages, result);
gpr_log(GPR_DEBUG,
"%s: "
"Server(%p) created, "
"target=%s",
"Server::CreateFromTarget",
(void*) result.get(),
static_cast<void*>(result.get()),
target.c_str());
return result;
}
Expand Down Expand Up @@ -461,7 +459,7 @@ void Server::ForEachHeaderNameAndValue(
namespace {
std::optional<std::chrono::milliseconds> ExtractExpirationInterval(
const ConfigurationConstantsResponse &cc_resp) {
auto ip2 = cc_resp.config_values().find(timeoutKey);
auto ip2 = cc_resp.config_values().find(kTimeoutKey);
if (ip2 == cc_resp.config_values().end() || !ip2->second.has_string_value()) {
return {};
}
Expand Down
5 changes: 4 additions & 1 deletion cpp-client/deephaven/tests/cython_support_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
#include <array>
#include <type_traits>
#include "deephaven/dhcore/chunk/chunk.h"
#include "deephaven/dhcore/column/array_column_source.h"
Expand Down Expand Up @@ -28,7 +29,9 @@ TEST_CASE("CreateStringColumnsSource", "[cython]") {
static_assert((kNumElements + 7) / 8 == kValidity.size());

auto result = CythonSupport::CreateStringColumnSource(kText, kText + kTextSize,
kOffsets.begin(), kOffsets.end(), kValidity.begin(), kValidity.end(), kNumElements);
kOffsets.data(), kOffsets.data() + kOffsets.size(),
kValidity.data(), kValidity.data() + kValidity.size(),
kNumElements);

auto rs = RowSequence::CreateSequential(0, kNumElements);
auto data = dhcore::chunk::StringChunk::Create(kNumElements);
Expand Down
11 changes: 6 additions & 5 deletions cpp-client/deephaven/tests/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ Client TableMakerForTests::CreateClient(const ClientOptions &options) {
}

TableMakerForTests::TableMakerForTests(TableMakerForTests::ClientType &&client,
TableHandle &&test_table, ColumnNamesForTests &&column_names, ColumnDataForTests &&columnData) :
TableHandle &&test_table, ColumnNamesForTests &&column_names, ColumnDataForTests &&column_data) :
client_(std::move(client)),
testTable_(std::move(test_table)), columnNames_(std::move(column_names)),
columnData_(std::move(columnData)) {}
columnData_(std::move(column_data)) {}

TableMakerForTests::TableMakerForTests(TableMakerForTests &&) noexcept = default;
TableMakerForTests &TableMakerForTests::operator=(TableMakerForTests &&) noexcept = default;
Expand Down Expand Up @@ -208,16 +208,17 @@ void CompareTableHelper(int depth, const std::shared_ptr<arrow::Table> &table,

std::shared_ptr<arrow::Table> BasicValidate(const deephaven::client::TableHandle &table, int expected_columns) {
auto fsr = table.GetFlightStreamReader();
std::shared_ptr<arrow::Table> arrow_table;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsr->ReadAll(&arrow_table)));
auto table_res = fsr->ToTable();
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(table_res));

auto &arrow_table = *table_res;
if (expected_columns != arrow_table->num_columns()) {
auto message = Stringf("Expected %o columns, but Table actually has %o columns",
expected_columns, arrow_table->num_columns());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}

return arrow_table;
return std::move(arrow_table);
}
} // namespace internal
} // namespace deephaven::client::tests
Loading