diff --git a/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/immerutil/immer_column_source.h b/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/immerutil/immer_column_source.h index 9c6c316a86d..b7e20876780 100644 --- a/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/immerutil/immer_column_source.h +++ b/cpp-client/deephaven/dhcore/include/private/deephaven/dhcore/immerutil/immer_column_source.h @@ -28,7 +28,6 @@ struct ImmerColumnSourceImpls { * flags. On the other hand if this pointer is null, then the caller doesn't care about null flags * and we don't have to do any special work to determine nullness. */ - template static void FillChunk(const immer::flex_vector &src_data, const immer::flex_vector *src_null_flags, @@ -41,59 +40,81 @@ struct ImmerColumnSourceImpls { constexpr bool kTypeIsNumeric = deephaven::dhcore::DeephavenTraits::kIsNumeric; + TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_data.size())); TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= typed_dest->Size())); - TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(optional_dest_null_flags == nullptr || - rows.Size() <= optional_dest_null_flags->Size())); if (!kTypeIsNumeric) { TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(src_null_flags != nullptr)); - } else { - // avoid CLion warning about unused variable. - (void)src_null_flags; + TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_null_flags->size())); } - auto *dest_datap = typed_dest->data(); - auto *dest_nullp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr; - - auto copy_data_inner = [&dest_datap, &dest_nullp](const T *data_begin, const T *data_end) { - for (const T *current = data_begin; current != data_end; ++current) { - auto value = *current; - *dest_datap++ = value; - if constexpr(deephaven::dhcore::DeephavenTraits::kIsNumeric) { - if (dest_nullp != nullptr) { - *dest_nullp++ = value == deephaven::dhcore::DeephavenTraits::kNullValue; - } - } else { - // avoid clang complaining about unused variables - (void)dest_nullp; - } - } - }; + if (optional_dest_null_flags != nullptr) { + TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= optional_dest_null_flags->Size())); + } + (void) src_null_flags; // avoid CLion warning about unused variable. - auto copy_nulls_inner = [&dest_nullp](const bool *null_begin, const bool *null_end) { - for (const bool *current = null_begin; current != null_end; ++current) { - *dest_nullp++ = *current; + auto *dest_datap = typed_dest->data(); + // We have a nested loop here, represented by two lambdas. This code invokes + // RowSequence::ForEachInterval which takes contiguous ranges from 'rows' and feeds them + // to 'copy_data_outer'. Then 'copy_data_outer' turns that contiguous range into a + // pair of [begin, end) Immer iterators. But then, rather than store into that iterator range + // directly, those Immer iterators are passed to immer::for_each_chunk. This breaks down the + // Immer range into subranges of plain data, and invokes the copy_data_inner lambda. Then, + // 'copy_data_inner' just copies data in the normal C++ way. + auto copy_data_inner = [&dest_datap](const T *src_beginp, const T *src_endp) { + for (const T *current = src_beginp; current != src_endp; ++current) { + *dest_datap++ = *current; } }; - auto copy_outer = [&src_data, src_null_flags, dest_nullp, ©_data_inner, - ©_nulls_inner](uint64_t src_begin, uint64_t src_end) { + auto copy_data_outer = [&src_data, ©_data_inner](uint64_t src_begin, uint64_t src_end) { auto src_beginp = src_data.begin() + src_begin; auto src_endp = src_data.begin() + src_end; immer::for_each_chunk(src_beginp, src_endp, copy_data_inner); + }; + + rows.ForEachInterval(copy_data_outer); + + // If the caller has opted out of getting null flags, we are done. + if (optional_dest_null_flags == nullptr) { + return; + } - if constexpr(!deephaven::dhcore::DeephavenTraits::kIsNumeric) { - if (dest_nullp != nullptr) { - auto nulls_begin = src_null_flags->begin() + src_begin; - auto nulls_end = src_null_flags->begin() + src_end; - immer::for_each_chunk(nulls_begin, nulls_end, copy_nulls_inner); + // Otherwise (if the caller wants null flags), we do a similar algorithm to copy null flags. + // The one complication is that the column source only stores null flags explicitly for + // non-numeric types. For numeric types, the column source uses the Deephaven convention + // for nullness. To handle this, we have two different forms of the operation, + // one which supports the numeric convention and one which supports the non-numeric convention. + auto *dest_nullp = optional_dest_null_flags->data(); + + if constexpr (kTypeIsNumeric) { + auto copy_nulls_inner = [&dest_nullp](const T *data_begin, const T *data_end) { + for (const T *current = data_begin; current != data_end; ++current) { + auto is_null = *current == deephaven::dhcore::DeephavenTraits::kNullValue; + *dest_nullp++ = is_null; } - } else { - // avoid clang complaining about unused variables. - (void)src_null_flags; - (void)dest_nullp; - (void)copy_nulls_inner; - } - }; - rows.ForEachInterval(copy_outer); + }; + + auto copy_nulls_outer = [&src_data, src_null_flags, ©_nulls_inner](uint64_t src_begin, + uint64_t src_end) { + auto src_beginp = src_data.begin() + src_begin; + auto src_endp = src_data.begin() + src_end; + immer::for_each_chunk(src_beginp, src_endp, copy_nulls_inner); + }; + rows.ForEachInterval(copy_nulls_outer); + } else { + auto copy_nulls_inner = [&dest_nullp](const bool *null_begin, const bool *null_end) { + for (const bool *current = null_begin; current != null_end; ++current) { + *dest_nullp++ = *current; + } + }; + + auto copy_nulls_outer = [&src_data, src_null_flags, ©_nulls_inner](uint64_t src_begin, + uint64_t src_end) { + auto nulls_begin = src_null_flags->begin() + src_begin; + auto nulls_end = src_null_flags->begin() + src_end; + immer::for_each_chunk(nulls_begin, nulls_end, copy_nulls_inner); + }; + rows.ForEachInterval(copy_nulls_outer); + } } template @@ -109,12 +130,17 @@ struct ImmerColumnSourceImpls { constexpr bool kTypeIsNumeric = deephaven::dhcore::DeephavenTraits::kIsNumeric; auto *typed_dest = VerboseCast(DEEPHAVEN_LOCATION_EXPR(dest_data)); + TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_data.size())); TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= typed_dest->Size())); - TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(optional_dest_null_flags == nullptr || - rows.Size() <= optional_dest_null_flags->Size())); if (!kTypeIsNumeric) { TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(src_null_flags != nullptr)); + TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_null_flags->size())); } + if (optional_dest_null_flags != nullptr) { + TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= optional_dest_null_flags->Size())); + } + (void) src_null_flags; // avoid CLion warning about unused variable. + auto *destp = typed_dest->data(); auto *dest_nullp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr; diff --git a/cpp-client/deephaven/dhcore/src/container/row_sequence.cc b/cpp-client/deephaven/dhcore/src/container/row_sequence.cc index ba0a35a061a..798fea74016 100644 --- a/cpp-client/deephaven/dhcore/src/container/row_sequence.cc +++ b/cpp-client/deephaven/dhcore/src/container/row_sequence.cc @@ -48,13 +48,12 @@ RowSequenceIterator RowSequence::GetRowSequenceIterator() const { std::ostream &operator<<(std::ostream &s, const RowSequence &o) { s << '['; - auto iter = o.GetRowSequenceIterator(); const char *sep = ""; - uint64_t item; - while (iter.TryGetNext(&item)) { - s << sep << item; + o.ForEachInterval([&](uint64_t start, uint64_t end) { + s << sep; sep = ", "; - } + s << '[' << start << ',' << end << ')'; + }); s << ']'; return s; } diff --git a/cpp-client/deephaven/dhcore/src/ticking/space_mapper.cc b/cpp-client/deephaven/dhcore/src/ticking/space_mapper.cc index dca6078a05d..1e67ee78fad 100644 --- a/cpp-client/deephaven/dhcore/src/ticking/space_mapper.cc +++ b/cpp-client/deephaven/dhcore/src/ticking/space_mapper.cc @@ -8,6 +8,7 @@ using deephaven::dhcore::container::RowSequence; using deephaven::dhcore::container::RowSequenceBuilder; +using deephaven::dhcore::utility::MakeReservedVector; using deephaven::dhcore::utility::separatedList; namespace deephaven::dhcore::ticking { @@ -34,9 +35,47 @@ uint64_t SpaceMapper::EraseRange(uint64_t begin_key, uint64_t end_key) { } void SpaceMapper::ApplyShift(uint64_t begin_key, uint64_t end_key, uint64_t dest_key) { - auto size = end_key - begin_key; + // Shifts do not change the size of the set. So, note the original size as a sanity check. + auto original_size = set_.cardinality(); + + // Note that [begin_key, end_key) is potentially a superset of the keys we have. + // We need to remove all our keys in the range [begin_key, end_key), + // and then, for each key k that we removed, add a new key (k - begin_key + dest_key). + + // As we scan the keys in our set, we build this vector which contains contiguous ranges. + std::vector> new_ranges; + auto it = set_.begin(); + if (!it.move(begin_key)) { + // begin_key is bigger than any key in our set, so the shift request has no effect. + return; + } + + while (it != set_.end() && *it < end_key) { + auto offset = *it - begin_key; + auto new_key = dest_key + offset; + if (!new_ranges.empty() && new_ranges.back().second == new_key) { + // This key is contiguous with the last range, so extend it by one. + ++new_ranges.back().second; + } else { + // This key is not contiguous with the last range (or there is no last range), so + // start a new range here having size 1. + new_ranges.emplace_back(new_key, new_key + 1); + } + ++it; + } + set_.removeRange(begin_key, end_key); - set_.addRange(dest_key, dest_key + size); + for (const auto &range : new_ranges) { + set_.addRange(range.first, range.second); + } + + // Sanity check. + auto final_size = set_.cardinality(); + if (original_size != final_size) { + auto message = fmt::format("Unexpected rowkey size change: from {} to {}", original_size, + final_size); + throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message)); + } } std::shared_ptr SpaceMapper::AddKeys(const RowSequence &keys) { @@ -59,7 +98,7 @@ std::shared_ptr SpaceMapper::ConvertKeysToIndices(const RowSequence auto convert_interval = [this, &builder](uint64_t begin_key, uint64_t end_key) { auto beginp = set_.begin(); if (!beginp.move(begin_key)) { - auto message = fmt::format("begin key {} is not in the src map", begin_key); + auto message = fmt::format("begin key {} is too large for the src map", begin_key); throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message)); } auto next_rank = ZeroBasedRank(begin_key); @@ -72,6 +111,8 @@ std::shared_ptr SpaceMapper::ConvertKeysToIndices(const RowSequence } ++currentp; } + // It is ok to add a chunk like this because rowkeys [begin_key, end_key) are contiguous; + // therefore their corresponding index space indices are also contiguous. auto size = end_key - begin_key; builder.AddInterval(next_rank, next_rank + size); };