Skip to content

Commit

Permalink
Update vendored DuckDB sources to 8626621
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Sep 20, 2024
1 parent 8626621 commit 6b9bd55
Show file tree
Hide file tree
Showing 24 changed files with 257 additions and 138 deletions.
14 changes: 11 additions & 3 deletions src/duckdb/extension/json/json_functions/json_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ static inline string_t ExtractFromVal(yyjson_val *val, yyjson_alc *alc, Vector &
return JSONCommon::WriteVal<yyjson_val>(val, alc);
}

static inline string_t ExtractStringFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &, idx_t) {
return yyjson_is_str(val) ? string_t(unsafe_yyjson_get_str(val), unsafe_yyjson_get_len(val))
: JSONCommon::WriteVal<yyjson_val>(val, alc);
static inline string_t ExtractStringFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &mask, idx_t idx) {
switch (yyjson_get_tag(val)) {
case YYJSON_TYPE_NULL | YYJSON_SUBTYPE_NONE:
mask.SetInvalid(idx);
return string_t {};
case YYJSON_TYPE_STR | YYJSON_SUBTYPE_NOESC:
case YYJSON_TYPE_STR | YYJSON_SUBTYPE_NONE:
return string_t(unsafe_yyjson_get_str(val), unsafe_yyjson_get_len(val));
default:
return JSONCommon::WriteVal<yyjson_val>(val, alc);
}
}

static void ExtractFunction(DataChunk &args, ExpressionState &state, Vector &result) {
Expand Down
7 changes: 4 additions & 3 deletions src/duckdb/extension/json/json_functions/json_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace duckdb {

static inline string_t ValueFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &mask, idx_t idx) {
switch (yyjson_get_tag(val)) {
case YYJSON_TYPE_NULL | YYJSON_SUBTYPE_NONE:
case YYJSON_TYPE_ARR | YYJSON_SUBTYPE_NONE:
case YYJSON_TYPE_OBJ | YYJSON_SUBTYPE_NONE:
mask.SetInvalid(idx);
Expand All @@ -22,12 +23,12 @@ static void ValueManyFunction(DataChunk &args, ExpressionState &state, Vector &r
}

static void GetValueFunctionsInternal(ScalarFunctionSet &set, const LogicalType &input_type) {
set.AddFunction(ScalarFunction({input_type, LogicalType::BIGINT}, LogicalType::JSON(), ValueFunction,
set.AddFunction(ScalarFunction({input_type, LogicalType::BIGINT}, LogicalType::VARCHAR, ValueFunction,
JSONReadFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
set.AddFunction(ScalarFunction({input_type, LogicalType::VARCHAR}, LogicalType::JSON(), ValueFunction,
set.AddFunction(ScalarFunction({input_type, LogicalType::VARCHAR}, LogicalType::VARCHAR, ValueFunction,
JSONReadFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
set.AddFunction(ScalarFunction({input_type, LogicalType::LIST(LogicalType::VARCHAR)},
LogicalType::LIST(LogicalType::JSON()), ValueManyFunction,
LogicalType::LIST(LogicalType::VARCHAR), ValueManyFunction,
JSONReadManyFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
}

Expand Down
1 change: 1 addition & 0 deletions src/duckdb/src/common/arrow/arrow_merge_event.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "duckdb/common/arrow/arrow_merge_event.hpp"
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/storage/storage_info.hpp"

namespace duckdb {
Expand Down
60 changes: 60 additions & 0 deletions src/duckdb/src/common/arrow/arrow_util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/common/arrow/arrow_appender.hpp"
#include "duckdb/common/types/data_chunk.hpp"

namespace duckdb {

bool ArrowUtil::TryFetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t batch_size, ArrowArray *out,
idx_t &count, ErrorData &error) {
count = 0;
ArrowAppender appender(scan_state.Types(), batch_size, std::move(options));
auto remaining_tuples_in_chunk = scan_state.RemainingInChunk();
if (remaining_tuples_in_chunk) {
// We start by scanning the non-finished current chunk
idx_t cur_consumption = MinValue(remaining_tuples_in_chunk, batch_size);
count += cur_consumption;
auto &current_chunk = scan_state.CurrentChunk();
appender.Append(current_chunk, scan_state.CurrentOffset(), scan_state.CurrentOffset() + cur_consumption,
current_chunk.size());
scan_state.IncreaseOffset(cur_consumption);
}
while (count < batch_size) {
if (!scan_state.LoadNextChunk(error)) {
if (scan_state.HasError()) {
error = scan_state.GetError();
}
return false;
}
if (scan_state.ChunkIsEmpty()) {
// The scan was successful, but an empty chunk was returned
break;
}
auto &current_chunk = scan_state.CurrentChunk();
if (scan_state.Finished() || current_chunk.size() == 0) {
break;
}
// The amount we still need to append into this chunk
auto remaining = batch_size - count;

// The amount remaining, capped by the amount left in the current chunk
auto to_append_to_batch = MinValue(remaining, scan_state.RemainingInChunk());
appender.Append(current_chunk, 0, to_append_to_batch, current_chunk.size());
count += to_append_to_batch;
scan_state.IncreaseOffset(to_append_to_batch);
}
if (count > 0) {
*out = appender.Finalize();
}
return true;
}

idx_t ArrowUtil::FetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out) {
ErrorData error;
idx_t result_count;
if (!TryFetchChunk(scan_state, std::move(options), chunk_size, out, result_count, error)) {
error.Throw();
}
return result_count;
}

} // namespace duckdb
54 changes: 1 addition & 53 deletions src/duckdb/src/common/arrow/arrow_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "duckdb/common/arrow/arrow_wrapper.hpp"
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/common/arrow/arrow_converter.hpp"

#include "duckdb/common/assert.hpp"
Expand Down Expand Up @@ -176,57 +177,4 @@ ResultArrowArrayStreamWrapper::ResultArrowArrayStreamWrapper(unique_ptr<QueryRes
stream.get_last_error = ResultArrowArrayStreamWrapper::MyStreamGetLastError;
}

bool ArrowUtil::TryFetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t batch_size, ArrowArray *out,
idx_t &count, ErrorData &error) {
count = 0;
ArrowAppender appender(scan_state.Types(), batch_size, std::move(options));
auto remaining_tuples_in_chunk = scan_state.RemainingInChunk();
if (remaining_tuples_in_chunk) {
// We start by scanning the non-finished current chunk
idx_t cur_consumption = MinValue(remaining_tuples_in_chunk, batch_size);
count += cur_consumption;
auto &current_chunk = scan_state.CurrentChunk();
appender.Append(current_chunk, scan_state.CurrentOffset(), scan_state.CurrentOffset() + cur_consumption,
current_chunk.size());
scan_state.IncreaseOffset(cur_consumption);
}
while (count < batch_size) {
if (!scan_state.LoadNextChunk(error)) {
if (scan_state.HasError()) {
error = scan_state.GetError();
}
return false;
}
if (scan_state.ChunkIsEmpty()) {
// The scan was successful, but an empty chunk was returned
break;
}
auto &current_chunk = scan_state.CurrentChunk();
if (scan_state.Finished() || current_chunk.size() == 0) {
break;
}
// The amount we still need to append into this chunk
auto remaining = batch_size - count;

// The amount remaining, capped by the amount left in the current chunk
auto to_append_to_batch = MinValue(remaining, scan_state.RemainingInChunk());
appender.Append(current_chunk, 0, to_append_to_batch, current_chunk.size());
count += to_append_to_batch;
scan_state.IncreaseOffset(to_append_to_batch);
}
if (count > 0) {
*out = appender.Finalize();
}
return true;
}

idx_t ArrowUtil::FetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out) {
ErrorData error;
idx_t result_count;
if (!TryFetchChunk(scan_state, std::move(options), chunk_size, out, result_count, error)) {
error.Throw();
}
return result_count;
}

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,32 @@ void ColumnCountResult::AddValue(ColumnCountResult &result, idx_t buffer_pos) {
}

inline void ColumnCountResult::InternalAddRow() {
column_counts[result_position].number_of_columns = current_column_count + 1;
const idx_t column_count = current_column_count + 1;
column_counts[result_position].number_of_columns = column_count;
rows_per_column_count[column_count]++;
current_column_count = 0;
}

idx_t ColumnCountResult::GetMostFrequentColumnCount() const {
if (rows_per_column_count.empty()) {
return 1;
}
idx_t column_count = 0;
idx_t current_max = 0;
for (auto &rpc : rows_per_column_count) {
if (rpc.second > current_max) {
current_max = rpc.second;
column_count = rpc.first;
} else if (rpc.second == current_max) {
// We pick the largest to untie
if (rpc.first > column_count) {
column_count = rpc.first;
}
}
}
return column_count;
}

bool ColumnCountResult::AddRow(ColumnCountResult &result, idx_t buffer_pos) {
result.InternalAddRow();
if (!result.states.EmptyLastValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ SnifferResult CSVSniffer::MinimalSniff() {
for (idx_t col_idx = 0; col_idx < data_chunk.ColumnCount(); col_idx++) {
auto &cur_vector = data_chunk.data[col_idx];
auto vector_data = FlatVector::GetData<string_t>(cur_vector);
HeaderValue val(vector_data[0]);
auto &validity = FlatVector::Validity(cur_vector);
HeaderValue val;
if (validity.RowIsValid(0)) {
val = HeaderValue(vector_data[0]);
}
potential_header.emplace_back(val);
}
}
Expand Down Expand Up @@ -221,29 +225,35 @@ SnifferResult CSVSniffer::SniffCSV(bool force_match) {
// If the header exists it should match
string header_error = "The Column names set by the user do not match the ones found by the sniffer. \n";
auto &set_names = *set_columns.names;
for (idx_t i = 0; i < set_columns.Size(); i++) {
if (set_names[i] != names[i]) {
header_error += "Column at position: " + to_string(i) + " Set name: " + set_names[i] +
" Sniffed Name: " + names[i] + "\n";
match = false;
if (set_names.size() == names.size()) {
for (idx_t i = 0; i < set_columns.Size(); i++) {
if (set_names[i] != names[i]) {
header_error += "Column at position: " + to_string(i) + " Set name: " + set_names[i] +
" Sniffed Name: " + names[i] + "\n";
match = false;
}
}
}

if (!match) {
error += header_error;
}
}
match = true;
string type_error = "The Column types set by the user do not match the ones found by the sniffer. \n";
auto &set_types = *set_columns.types;
for (idx_t i = 0; i < set_columns.Size(); i++) {
if (set_types[i] != detected_types[i]) {
type_error += "Column at position: " + to_string(i) + " Set type: " + set_types[i].ToString() +
" Sniffed type: " + detected_types[i].ToString() + "\n";
detected_types[i] = set_types[i];
manually_set[i] = true;
match = false;
if (detected_types.size() == set_columns.Size()) {
for (idx_t i = 0; i < set_columns.Size(); i++) {
if (set_types[i] != detected_types[i]) {
type_error += "Column at position: " + to_string(i) + " Set type: " + set_types[i].ToString() +
" Sniffed type: " + detected_types[i].ToString() + "\n";
detected_types[i] = set_types[i];
manually_set[i] = true;
match = false;
}
}
}

if (!match) {
error += type_error;
}
Expand Down
Loading

0 comments on commit 6b9bd55

Please sign in to comment.