Skip to content

Commit

Permalink
C++ client: process Barrage shifts properly (#5285)
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Mar 22, 2024
1 parent 1b728f0 commit dcfc794
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct ImmerColumnSourceImpls {
* flags. On the other hand if this pointer is null, then the caller doesn't care about null flags
* and we don't have to do any special work to determine nullness.
*/

template<typename T>
static void FillChunk(const immer::flex_vector<T> &src_data,
const immer::flex_vector<bool> *src_null_flags,
Expand All @@ -41,59 +40,81 @@ struct ImmerColumnSourceImpls {

constexpr bool kTypeIsNumeric = deephaven::dhcore::DeephavenTraits<T>::kIsNumeric;

TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_data.size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= typed_dest->Size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(optional_dest_null_flags == nullptr ||
rows.Size() <= optional_dest_null_flags->Size()));
if (!kTypeIsNumeric) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(src_null_flags != nullptr));
} else {
// avoid CLion warning about unused variable.
(void)src_null_flags;
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_null_flags->size()));
}
auto *dest_datap = typed_dest->data();
auto *dest_nullp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;

auto copy_data_inner = [&dest_datap, &dest_nullp](const T *data_begin, const T *data_end) {
for (const T *current = data_begin; current != data_end; ++current) {
auto value = *current;
*dest_datap++ = value;
if constexpr(deephaven::dhcore::DeephavenTraits<T>::kIsNumeric) {
if (dest_nullp != nullptr) {
*dest_nullp++ = value == deephaven::dhcore::DeephavenTraits<T>::kNullValue;
}
} else {
// avoid clang complaining about unused variables
(void)dest_nullp;
}
}
};
if (optional_dest_null_flags != nullptr) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= optional_dest_null_flags->Size()));
}
(void) src_null_flags; // avoid CLion warning about unused variable.

auto copy_nulls_inner = [&dest_nullp](const bool *null_begin, const bool *null_end) {
for (const bool *current = null_begin; current != null_end; ++current) {
*dest_nullp++ = *current;
auto *dest_datap = typed_dest->data();
// We have a nested loop here, represented by two lambdas. This code invokes
// RowSequence::ForEachInterval which takes contiguous ranges from 'rows' and feeds them
// to 'copy_data_outer'. Then 'copy_data_outer' turns that contiguous range into a
// pair of [begin, end) Immer iterators. But then, rather than store into that iterator range
// directly, those Immer iterators are passed to immer::for_each_chunk. This breaks down the
// Immer range into subranges of plain data, and invokes the copy_data_inner lambda. Then,
// 'copy_data_inner' just copies data in the normal C++ way.
auto copy_data_inner = [&dest_datap](const T *src_beginp, const T *src_endp) {
for (const T *current = src_beginp; current != src_endp; ++current) {
*dest_datap++ = *current;
}
};

auto copy_outer = [&src_data, src_null_flags, dest_nullp, &copy_data_inner,
&copy_nulls_inner](uint64_t src_begin, uint64_t src_end) {
auto copy_data_outer = [&src_data, &copy_data_inner](uint64_t src_begin, uint64_t src_end) {
auto src_beginp = src_data.begin() + src_begin;
auto src_endp = src_data.begin() + src_end;
immer::for_each_chunk(src_beginp, src_endp, copy_data_inner);
};

rows.ForEachInterval(copy_data_outer);

// If the caller has opted out of getting null flags, we are done.
if (optional_dest_null_flags == nullptr) {
return;
}

if constexpr(!deephaven::dhcore::DeephavenTraits<T>::kIsNumeric) {
if (dest_nullp != nullptr) {
auto nulls_begin = src_null_flags->begin() + src_begin;
auto nulls_end = src_null_flags->begin() + src_end;
immer::for_each_chunk(nulls_begin, nulls_end, copy_nulls_inner);
// Otherwise (if the caller wants null flags), we do a similar algorithm to copy null flags.
// The one complication is that the column source only stores null flags explicitly for
// non-numeric types. For numeric types, the column source uses the Deephaven convention
// for nullness. To handle this, we have two different forms of the operation,
// one which supports the numeric convention and one which supports the non-numeric convention.
auto *dest_nullp = optional_dest_null_flags->data();

if constexpr (kTypeIsNumeric) {
auto copy_nulls_inner = [&dest_nullp](const T *data_begin, const T *data_end) {
for (const T *current = data_begin; current != data_end; ++current) {
auto is_null = *current == deephaven::dhcore::DeephavenTraits<T>::kNullValue;
*dest_nullp++ = is_null;
}
} else {
// avoid clang complaining about unused variables.
(void)src_null_flags;
(void)dest_nullp;
(void)copy_nulls_inner;
}
};
rows.ForEachInterval(copy_outer);
};

auto copy_nulls_outer = [&src_data, src_null_flags, &copy_nulls_inner](uint64_t src_begin,
uint64_t src_end) {
auto src_beginp = src_data.begin() + src_begin;
auto src_endp = src_data.begin() + src_end;
immer::for_each_chunk(src_beginp, src_endp, copy_nulls_inner);
};
rows.ForEachInterval(copy_nulls_outer);
} else {
auto copy_nulls_inner = [&dest_nullp](const bool *null_begin, const bool *null_end) {
for (const bool *current = null_begin; current != null_end; ++current) {
*dest_nullp++ = *current;
}
};

auto copy_nulls_outer = [&src_data, src_null_flags, &copy_nulls_inner](uint64_t src_begin,
uint64_t src_end) {
auto nulls_begin = src_null_flags->begin() + src_begin;
auto nulls_end = src_null_flags->begin() + src_end;
immer::for_each_chunk(nulls_begin, nulls_end, copy_nulls_inner);
};
rows.ForEachInterval(copy_nulls_outer);
}
}

template<typename T>
Expand All @@ -109,12 +130,17 @@ struct ImmerColumnSourceImpls {
constexpr bool kTypeIsNumeric = deephaven::dhcore::DeephavenTraits<T>::kIsNumeric;

auto *typed_dest = VerboseCast<chunkType_t *>(DEEPHAVEN_LOCATION_EXPR(dest_data));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_data.size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= typed_dest->Size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(optional_dest_null_flags == nullptr ||
rows.Size() <= optional_dest_null_flags->Size()));
if (!kTypeIsNumeric) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(src_null_flags != nullptr));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_null_flags->size()));
}
if (optional_dest_null_flags != nullptr) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= optional_dest_null_flags->Size()));
}
(void) src_null_flags; // avoid CLion warning about unused variable.

auto *destp = typed_dest->data();
auto *dest_nullp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;

Expand Down
9 changes: 4 additions & 5 deletions cpp-client/deephaven/dhcore/src/container/row_sequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ RowSequenceIterator RowSequence::GetRowSequenceIterator() const {

std::ostream &operator<<(std::ostream &s, const RowSequence &o) {
s << '[';
auto iter = o.GetRowSequenceIterator();
const char *sep = "";
uint64_t item;
while (iter.TryGetNext(&item)) {
s << sep << item;
o.ForEachInterval([&](uint64_t start, uint64_t end) {
s << sep;
sep = ", ";
}
s << '[' << start << ',' << end << ')';
});
s << ']';
return s;
}
Expand Down
47 changes: 44 additions & 3 deletions cpp-client/deephaven/dhcore/src/ticking/space_mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using deephaven::dhcore::container::RowSequence;
using deephaven::dhcore::container::RowSequenceBuilder;
using deephaven::dhcore::utility::MakeReservedVector;
using deephaven::dhcore::utility::separatedList;

namespace deephaven::dhcore::ticking {
Expand All @@ -34,9 +35,47 @@ uint64_t SpaceMapper::EraseRange(uint64_t begin_key, uint64_t end_key) {
}

void SpaceMapper::ApplyShift(uint64_t begin_key, uint64_t end_key, uint64_t dest_key) {
auto size = end_key - begin_key;
// Shifts do not change the size of the set. So, note the original size as a sanity check.
auto original_size = set_.cardinality();

// Note that [begin_key, end_key) is potentially a superset of the keys we have.
// We need to remove all our keys in the range [begin_key, end_key),
// and then, for each key k that we removed, add a new key (k - begin_key + dest_key).

// As we scan the keys in our set, we build this vector which contains contiguous ranges.
std::vector<std::pair<uint64_t, uint64_t>> new_ranges;
auto it = set_.begin();
if (!it.move(begin_key)) {
// begin_key is bigger than any key in our set, so the shift request has no effect.
return;
}

while (it != set_.end() && *it < end_key) {
auto offset = *it - begin_key;
auto new_key = dest_key + offset;
if (!new_ranges.empty() && new_ranges.back().second == new_key) {
// This key is contiguous with the last range, so extend it by one.
++new_ranges.back().second;
} else {
// This key is not contiguous with the last range (or there is no last range), so
// start a new range here having size 1.
new_ranges.emplace_back(new_key, new_key + 1);
}
++it;
}

set_.removeRange(begin_key, end_key);
set_.addRange(dest_key, dest_key + size);
for (const auto &range : new_ranges) {
set_.addRange(range.first, range.second);
}

// Sanity check.
auto final_size = set_.cardinality();
if (original_size != final_size) {
auto message = fmt::format("Unexpected rowkey size change: from {} to {}", original_size,
final_size);
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
}

std::shared_ptr<RowSequence> SpaceMapper::AddKeys(const RowSequence &keys) {
Expand All @@ -59,7 +98,7 @@ std::shared_ptr<RowSequence> SpaceMapper::ConvertKeysToIndices(const RowSequence
auto convert_interval = [this, &builder](uint64_t begin_key, uint64_t end_key) {
auto beginp = set_.begin();
if (!beginp.move(begin_key)) {
auto message = fmt::format("begin key {} is not in the src map", begin_key);
auto message = fmt::format("begin key {} is too large for the src map", begin_key);
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
auto next_rank = ZeroBasedRank(begin_key);
Expand All @@ -72,6 +111,8 @@ std::shared_ptr<RowSequence> SpaceMapper::ConvertKeysToIndices(const RowSequence
}
++currentp;
}
// It is ok to add a chunk like this because rowkeys [begin_key, end_key) are contiguous;
// therefore their corresponding index space indices are also contiguous.
auto size = end_key - begin_key;
builder.AddInterval(next_rank, next_rank + size);
};
Expand Down

0 comments on commit dcfc794

Please sign in to comment.