Skip to content

Commit

Permalink
feat(cpp-client): Add support for LocalDate and LocalTime
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Sep 20, 2024
1 parent aff19e7 commit 771db37
Show file tree
Hide file tree
Showing 17 changed files with 466 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ namespace internal {
// null-ness by determining whether the optional has a value.
// kTimestamp is its own special case, where nullness is determined by the underlying nanos
// being equal to Deephaven's NULL_LONG.
enum class ArrowProcessingStyle { kNormal, kBooleanOrString, kTimestamp };
// kLocalDate and kLocalTime are like kTimestamp except they resolve to different data types.
enum class ArrowProcessingStyle { kNormal, kBooleanOrString, kTimestamp, kLocalDate, kLocalTime };

template<ArrowProcessingStyle Style, typename TColumnSourceBase, typename TArrowArray, typename TChunk>
class GenericArrowColumnSource final : public TColumnSourceBase {
using BooleanChunk = deephaven::dhcore::chunk::BooleanChunk;
using Chunk = deephaven::dhcore::chunk::Chunk;
using ColumnSourceVisitor = deephaven::dhcore::column::ColumnSourceVisitor;
using DateTime = deephaven::dhcore::DateTime;
using LocalDate = deephaven::dhcore::LocalDate;
using LocalTime = deephaven::dhcore::LocalTime;
using RowSequence = deephaven::dhcore::container::RowSequence;
using UInt64Chunk = deephaven::dhcore::chunk::UInt64Chunk;

Expand Down Expand Up @@ -98,6 +101,14 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
auto relative_end = min_end - src_segment_begin;
const auto &innerp = *outerp;

static_assert(
Style == ArrowProcessingStyle::kNormal ||
Style == ArrowProcessingStyle::kBooleanOrString ||
Style == ArrowProcessingStyle::kTimestamp ||
Style == ArrowProcessingStyle::kLocalDate ||
Style == ArrowProcessingStyle::kLocalTime,
"Unexpected ArrowProcessingStyle");

if constexpr (Style == ArrowProcessingStyle::kNormal) {
// Process these types using pointer operations and the Deephaven Null convention
const auto *src_beginp = innerp->raw_values() + relative_begin;
Expand Down Expand Up @@ -139,6 +150,34 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
*destp = DateTime::FromNanos(*ip);
++destp;

if (null_destp != nullptr) {
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
++null_destp;
}
}
} else if constexpr (Style == ArrowProcessingStyle::kLocalDate) {
// Process these types using pointer operations and the Deephaven Null convention
const auto *src_beginp = innerp->raw_values() + relative_begin;
const auto *src_endp = innerp->raw_values() + relative_end;

for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
*destp = LocalDate::FromMillis(*ip);
++destp;

if (null_destp != nullptr) {
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
++null_destp;
}
}
} else if constexpr (Style == ArrowProcessingStyle::kLocalTime) {
// Process these types using pointer operations and the Deephaven Null convention
const auto *src_beginp = innerp->raw_values() + relative_begin;
const auto *src_endp = innerp->raw_values() + relative_end;

for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
*destp = LocalTime::FromNanos(*ip);
++destp;

if (null_destp != nullptr) {
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
++null_destp;
Expand Down Expand Up @@ -223,4 +262,16 @@ using DateTimeArrowColumnSource = internal::GenericArrowColumnSource<
deephaven::dhcore::column::DateTimeColumnSource,
arrow::TimestampArray,
deephaven::dhcore::chunk::DateTimeChunk>;

using LocalDateArrowColumnSource = internal::GenericArrowColumnSource<
internal::ArrowProcessingStyle::kLocalDate,
deephaven::dhcore::column::LocalDateColumnSource,
arrow::Date64Array,
deephaven::dhcore::chunk::LocalDateChunk>;

using LocalTimeArrowColumnSource = internal::GenericArrowColumnSource<
internal::ArrowProcessingStyle::kLocalTime,
deephaven::dhcore::column::LocalTimeColumnSource,
arrow::Time64Array,
deephaven::dhcore::chunk::LocalTimeChunk>;
} // namespace deephaven::client::arrowutil
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ template<typename T>
struct TypeConverterTraits {
// The below assert fires when this class is instantiated; i.e. when none of the specializations
// match. It needs to be written this way (with "is_same<T,T>") because for technical reasons it
// needso be dependent on T, even if degenerately so.
// needs to be dependent on T, even if degenerately so.
static_assert(!std::is_same_v<T, T>, "TableMaker doesn't know how to work with this type");
};

Expand Down Expand Up @@ -307,6 +307,38 @@ struct TypeConverterTraits<deephaven::dhcore::DateTime> {
}
};

template<>
struct TypeConverterTraits<deephaven::dhcore::LocalDate> {
static std::shared_ptr<arrow::DataType> GetDataType() {
return arrow::date64();
}
static arrow::Date64Builder GetBuilder() {
return arrow::Date64Builder();
}
static int64_t Reinterpret(const deephaven::dhcore::LocalDate &o) {
return o.Millis();
}
static std::string_view GetDeephavenTypeName() {
return "java.time.LocalDate";
}
};

template<>
struct TypeConverterTraits<deephaven::dhcore::LocalTime> {
static std::shared_ptr<arrow::DataType> GetDataType() {
return arrow::time64(arrow::TimeUnit::NANO);
}
static arrow::Time64Builder GetBuilder() {
return arrow::Time64Builder(GetDataType(), arrow::default_memory_pool());
}
static int64_t Reinterpret(const deephaven::dhcore::LocalTime &o) {
return o.Nanos();
}
static std::string_view GetDeephavenTypeName() {
return "java.time.LocalTime";
}
};

template<typename T>
struct TypeConverterTraits<std::optional<T>> {
using inner_t = TypeConverterTraits<T>;
Expand Down
12 changes: 12 additions & 0 deletions cpp-client/deephaven/dhclient/src/arrowutil/arrow_client_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ struct Visitor final : public arrow::TypeVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Date64Type &/*type*/) final {
auto arrays = DowncastChunks<arrow::Date64Array>(chunked_array_);
result_ = LocalDateArrowColumnSource::OfArrowArrayVec(std::move(arrays));
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Time64Type &/*type*/) final {
auto arrays = DowncastChunks<arrow::Time64Array>(chunked_array_);
result_ = LocalTimeArrowColumnSource::OfArrowArrayVec(std::move(arrays));
return arrow::Status::OK();
}

const arrow::ChunkedArray &chunked_array_;
std::shared_ptr<ColumnSource> result_;
};
Expand Down
14 changes: 14 additions & 0 deletions cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ using deephaven::client::arrowutil::Int8ArrowColumnSource;
using deephaven::client::arrowutil::Int16ArrowColumnSource;
using deephaven::client::arrowutil::Int32ArrowColumnSource;
using deephaven::client::arrowutil::Int64ArrowColumnSource;
using deephaven::client::arrowutil::LocalDateArrowColumnSource;
using deephaven::client::arrowutil::LocalTimeArrowColumnSource;
using deephaven::client::arrowutil::StringArrowColumnSource;
using deephaven::client::utility::Executor;
using deephaven::client::utility::OkOrThrow;
Expand Down Expand Up @@ -323,6 +325,18 @@ struct ArrayToColumnSourceVisitor final : public arrow::ArrayVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Date64Array &/*array*/) final {
auto typed_array = std::dynamic_pointer_cast<arrow::Date64Array>(array_);
result_ = LocalDateArrowColumnSource::OfArrowArray(std::move(typed_array));
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Time64Array &/*array*/) final {
auto typed_array = std::dynamic_pointer_cast<arrow::Time64Array>(array_);
result_ = LocalTimeArrowColumnSource::OfArrowArray(std::move(typed_array));
return arrow::Status::OK();
}

const std::shared_ptr<arrow::Array> &array_;
std::shared_ptr<ColumnSource> result_;
};
Expand Down
27 changes: 26 additions & 1 deletion cpp-client/deephaven/dhclient/src/utility/arrow_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::TimestampType &/*type*/) final {
arrow::Status Visit(const arrow::TimestampType &type) final {
if (type.unit() != arrow::TimeUnit::NANO) {
auto message = fmt::format("Expected TimestampType with nano units, got {}",
type.ToString());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
type_id_ = ElementTypeId::kTimestamp;
return arrow::Status::OK();
}
Expand All @@ -91,6 +96,26 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Time64Type &type) final {
if (type.unit() != arrow::TimeUnit::NANO) {
auto message = fmt::format("Expected Time64Type with nano units, got {}",
type.ToString());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
type_id_ = ElementTypeId::kLocalTime;
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Date64Type &type) final {
if (type.unit() != arrow::DateUnit::MILLI) {
auto message = fmt::format("Expected Date64Type with milli units, got {}",
type.ToString());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
type_id_ = ElementTypeId::kLocalDate;
return arrow::Status::OK();
}

ElementTypeId::Enum type_id_ = ElementTypeId::kInt8; // arbitrary initializer
};
} // namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,61 +160,70 @@ class GenericChunk final : public Chunk {
};

/**
* For convenience.
* Convenience using.
*/
using CharChunk = GenericChunk<char16_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using Int8Chunk = GenericChunk<int8_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using UInt8Chunk = GenericChunk<uint8_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using Int16Chunk = GenericChunk<int16_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using UInt16Chunk = GenericChunk<uint16_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using Int32Chunk = GenericChunk<int32_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using UInt32Chunk = GenericChunk<uint32_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using Int64Chunk = GenericChunk<int64_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using UInt64Chunk = GenericChunk<uint64_t>;
/**
* Convenience typedef.
* Convenience using.
*/
using FloatChunk = GenericChunk<float>;
/**
* Convenience typedef.
* Convenience using.
*/
using DoubleChunk = GenericChunk<double>;
/**
* Convenience typedef.
* Convenience using.
*/
using BooleanChunk = GenericChunk<bool>;
/**
* Convenience typedef.
* Convenience using.
*/
using StringChunk = GenericChunk<std::string>;
/**
* Convenience typedef.
* Convenience using.
*/
using DateTimeChunk = GenericChunk<deephaven::dhcore::DateTime>;
/**
* Convenience using.
*/
using LocalDateChunk = GenericChunk<deephaven::dhcore::LocalDate>;
/**
* Convenience using.
*/
using LocalTimeChunk = GenericChunk<deephaven::dhcore::LocalTime>;


/**
* Abstract base class that implements the visitor pattern for Chunk.
Expand Down Expand Up @@ -272,6 +281,11 @@ class ChunkVisitor {
/**
* Implements the visitor pattern.
*/
virtual void Visit(const LocalDateChunk &) = 0;
/**
* Implements the visitor pattern.
*/
virtual void Visit(const LocalTimeChunk &) = 0;
};

template<typename T>
Expand All @@ -286,7 +300,8 @@ void GenericChunk<T>::AcceptVisitor(ChunkVisitor *visitor) const {
*/
class AnyChunk {
using variant_t = std::variant<CharChunk, Int8Chunk, Int16Chunk, Int32Chunk, Int64Chunk,
UInt64Chunk, FloatChunk, DoubleChunk, BooleanChunk, StringChunk, DateTimeChunk>;
UInt64Chunk, FloatChunk, DoubleChunk, BooleanChunk, StringChunk, DateTimeChunk,
LocalDateChunk, LocalTimeChunk>;

public:
/**
Expand Down Expand Up @@ -351,8 +366,8 @@ GenericChunk<T> GenericChunk<T>::Create(size_t size) {

template<typename T>
GenericChunk<T> GenericChunk<T>::CreateView(T *data, size_t size) {
// GenericChunks allocated by create() point to an underlying heap-allocated buffer. On the other
// hand, GenericChunks created by createView() point to the caller's buffer. In the former case
// GenericChunks allocated by Create() point to an underlying heap-allocated buffer. On the other
// hand, GenericChunks created by CreateView() point to the caller's buffer. In the former case
// we own the buffer and need to delete it when there are no more shared_ptrs pointing to it. In
// the latter case the caller owns the buffer, and we should not try to deallocate it.
// One might think we have to use two different data structures to handle these two different
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,14 @@ template<>
struct TypeToChunk<deephaven::dhcore::DateTime> {
using type_t = deephaven::dhcore::chunk::DateTimeChunk;
};

template<>
struct TypeToChunk<deephaven::dhcore::LocalDate> {
using type_t = deephaven::dhcore::chunk::LocalDateChunk;
};

template<>
struct TypeToChunk<deephaven::dhcore::LocalTime> {
using type_t = deephaven::dhcore::chunk::LocalTimeChunk;
};
} // namespace deephaven::client::chunk
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ using StringColumnSource = GenericColumnSource<std::string>;
* Convenience using.
*/
using DateTimeColumnSource = GenericColumnSource<deephaven::dhcore::DateTime>;
/**
* Convenience using.
*/
using LocalDateColumnSource = GenericColumnSource<deephaven::dhcore::LocalDate>;
/**
* Convenience using.
*/
using LocalTimeColumnSource = GenericColumnSource<deephaven::dhcore::LocalTime>;

// the mutable per-type interfaces
template<typename T>
Expand Down Expand Up @@ -229,5 +237,13 @@ class ColumnSourceVisitor {
* Implements the visitor pattern.
*/
virtual void Visit(const DateTimeColumnSource &) = 0;
/**
* Implements the visitor pattern.
*/
virtual void Visit(const LocalDateColumnSource &) = 0;
/**
* Implements the visitor pattern.
*/
virtual void Visit(const LocalTimeColumnSource &) = 0;
};
} // namespace deephaven::dhcore::column
Loading

0 comments on commit 771db37

Please sign in to comment.