Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43949: [C++] io::BufferedInput: Fix invalid state after SetBufferSize #44387

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class BufferedBase {
return !is_open_;
}

// Allocate buffer_ if needed, and resize it to buffer_size_ if required.
Status ResetBuffer() {
if (!buffer_) {
// On first invocation, or if the buffer has been released, we allocate a
Expand Down Expand Up @@ -283,18 +284,32 @@ class BufferedInputStream::Impl : public BufferedBase {
}

// Resize internal read buffer. Note that the internal buffer-size
// should be not larger than the raw_read_bound_.
// should not be larger than the raw_read_bound_.
// It might change the buffer_size_, but will not change buffer states
// buffer_pos_ and bytes_buffered_.
Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
}
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
return Status::Invalid("Cannot shrink read buffer if buffered data remains");
return Status::Invalid(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually occur using the public APIs? If yes, can we add a test?

Copy link
Member Author

@mapleFU mapleFU Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes actually Peek test tests this. Previously, PeekPastBufferedBytes will raise error here.

"Cannot shrink read buffer if buffered data remains, new_buffer_size: ",
new_buffer_size, ", buffer_pos: ", buffer_pos_,
", bytes_buffered: ", bytes_buffered_, ", buffer_size: ", buffer_size_);
}
if (raw_read_bound_ >= 0) {
// No need to reserve space for more than the total remaining number of bytes.
new_buffer_size = std::min(new_buffer_size,
bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
if (bytes_buffered_ == 0) {
// Special case: we can not keep the current buffer because it does not
// contain any required data.
new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_);
} else {
// We should keep the current buffer because it contains data that
// can be read.
new_buffer_size =
std::min(new_buffer_size,
buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
}
}
return ResizeBuffer(new_buffer_size);
}
Expand Down Expand Up @@ -350,7 +365,7 @@ class BufferedInputStream::Impl : public BufferedBase {
}

Status DoBuffer() {
// Fill buffer
// Fill the buffer from the raw stream with at most `buffer_size_` bytes.
if (!buffer_) {
RETURN_NOT_OK(ResetBuffer());
}
Expand Down Expand Up @@ -444,8 +459,8 @@ class BufferedInputStream::Impl : public BufferedBase {
// The default -1 indicates that it is unbounded
int64_t raw_read_bound_;

// Number of remaining bytes in the buffer, to be reduced on each read from
// the buffer
// Number of remaining valid bytes in the buffer, to be reduced on each read
// from the buffer.
int64_t bytes_buffered_;
};

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/buffered.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ARROW_EXPORT BufferedInputStream
int64_t raw_read_bound = -1);

/// \brief Resize internal read buffer; calls to Read(...) will read at least
/// this many bytes from the raw InputStream if possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand what's the intent of at least in the previous comment since it is incomplete. I suppose it should be will read at most new_buffer_size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both ok to me

/// \param[in] new_buffer_size the new read buffer size
/// \return Status
Status SetBufferSize(int64_t new_buffer_size);
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,29 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) {
}
}

TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) {
// GH-43949: Peek and SetBufferSize should not affect the
// buffered bytes.
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15);
ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(9));
EXPECT_EQ(std::string_view(*bytes), kExample1.substr(0, 9));
ASSERT_EQ(1, buffered_->bytes_buffered());
ASSERT_EQ(10, buffered_->buffer_size());
ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(3));
EXPECT_EQ(view, kExample1.substr(9, 3));
ASSERT_EQ(3, buffered_->bytes_buffered());
ASSERT_EQ(12, buffered_->buffer_size());
ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(10));
// Peek() cannot go past the `raw_read_bound`
EXPECT_EQ(view, kExample1.substr(9, 6));
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
// Do read
ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(6));
EXPECT_EQ(std::string_view(*bytes), kExample1.substr(9, 6));
ASSERT_EQ(0, buffered_->bytes_buffered());
}

class TestBufferedInputStreamBound : public ::testing::Test {
public:
void SetUp() { CreateExample(/*bounded=*/true); }
Expand Down
Loading