Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read from Storage in file source #3869

Merged
merged 9 commits into from
Oct 5, 2023
Merged

Read from Storage in file source #3869

merged 9 commits into from
Oct 5, 2023

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Sep 22, 2023

Description

Closes #3868

How was this PR tested?

Running full test suite:

Code coverage

@rdettai rdettai self-assigned this Sep 22, 2023
@rdettai rdettai added the enhancement New feature or request label Sep 22, 2023
@rdettai
Copy link
Contributor Author

rdettai commented Sep 22, 2023

@fmassot (cc @trinity-1686a) commit 9a74f5a contains the scaffolding for the streaming storage. The central point is that StorageResolver now has a resolve_streamable() method. I left the existing resolve() method (reasons explained in the docstring). Before proceeding to the implementation for each and every store, I would like your opinion on the general setup:

  • are you okay with the overhead it adds? Should we re-consider adding the method to the StorageResolverStorage directly?
  • I'm not sure what the debouncer is used for. Is it okay to bypass it as I did for the streaming mode?

@guilload
Copy link
Member

I read the comment for the resolve_streamable method but don't understand it. Why can't the Storage trait have a get_stream method directly?

@rdettai
Copy link
Contributor Author

rdettai commented Sep 23, 2023

I read the comment for the resolve_streamable method but don't understand it.

Noted, I'll rephrase it if we move forward in this direction

Why can't the Storage trait have a get_stream method directly?

That's one of the questions (I actually made a typo in my previous comment, sorry!).
Pros: The idea behind composing traits instead of having one big Storage trait that does everything is to avoid leaving methods unimplemented. I believe the Storage trait should already be split into a ReadableStorage and a WriteableStorage, thus avoiding unimplemented methods in BundledStorage and StorageWIthCache. I didn't want to add yet another method that doesn't make sense in all cases. You will usually want to avoid get_stream when the actual data is already backed by a byte array (like the cached storage) as it implies an extra unnecessary copy (I know that's what the RamStorage does, but that's mostly for tests). It's implementation might also be a bit tricky on the BundledStorage, which is a shame because it will actually never be used 😅 .
Cons: the machinery of the supertrait is bit obscure (especially the upcasting but also some generics introduced by it), and most Storage implementations are streamable anyway.

@guilload
Copy link
Member

guilload commented Sep 25, 2023

The trade-off is adding a method that will remain unimplemented for some storage implementations vs adding two new traits, a new method to StorageResolver, and some "obscure machinery" for the supertrait/upcasting. I vote for 1.

@fulmicoton, you want to weigh in?

@fulmicoton
Copy link
Contributor

fulmicoton commented Sep 26, 2023

We can add a get_stream method implementation that has a default impl that calls get_all and avoid any unimplemented!().

Pros: The idea behind composing traits instead of having one big Storage trait that does everything is to avoid leaving methods unimplemented.

We can have a "best-effort" default impl that calls get_all and returns a stream that does nto really stream.

I believe the Storage trait should already be split into a ReadableStorage and a WriteableStorage, thus avoiding unimplemented methods in BundledStorage and StorageWIthCache. I didn't want to add yet another method that doesn't make sense in all cases. You will usually want to avoid get_stream when the actual data is already backed by a byte array (like the cached storage) as it implies an extra unnecessary copy (I know that's what the RamStorage does, but that's mostly for tests). It's implementation might also be a bit tricky on the BundledStorage, which is a shame because it will actually never be used 😅 .

ReadableStorage/ WritableStorage is an interesting idea. The same idea exists on the Directory side. ({Read/Write}Directory.
I would wield the word "should" with more care however. I personally consider this change as a mildly on the overengineering side.

@rdettai
Copy link
Contributor Author

rdettai commented Sep 27, 2023

Ok, thanks for your insight! I'll go with a get_stream method directly on the Storage abstraction then.

@rdettai
Copy link
Contributor Author

rdettai commented Sep 28, 2023

This is taking shape. Before moving this PR out of draft, I have a few questions:

  1. Cache implementation: I added a naive implementation that uses get_slice and wraps it into a cursor. This implementation is actually never used. Should we rather leave it un-implemented and let it to the person who will first use it to come up with the implementation they want? (if we leave this dummy implementation, I'll obviously add a unit test for it)
  2. Retry on Azure: the Azure SDK opens a fully lazy stream, so nothing happens until the stream is first polled. This means that we cannot use the usual mechanism here. One workaround to enable the retry mechanism would be to peak into the stream and wrap that into a retry. Do you think it is worth it?
  3. Buffered reader: The S3 SDK documentation seems to suggest that the AsyncRead should be wrapped into a BufReader. The existing file source implementation was also performing this wrapping around the tokio::fs::File. Currently, it is still the file source that performs the wrapping, but my feeling is that it should be the Storage implementation that takes care of this when it is relevant. Any opinion on this?
  4. Debouncer: I'm not sure I fully understand the role of the ins an outs of the debouncer. I added a dummy implementation of get_slice_stream that bypasses it. Does that seem sound?

I also still need to look into the tests with external systems (Azure and S3)

@rdettai rdettai force-pushed the object-source branch 2 times, most recently from ee47ce8 to ae417ff Compare September 28, 2023 19:50
@fulmicoton
Copy link
Contributor

Cache implementation: I added a naive implementation that uses get_slice and wraps it into a cursor. This implementation is actually never used. Should we rather leave it un-implemented and let it to the person who will first use it to come up with the implementation they want? (if we leave this dummy implementation, I'll obviously add a unit test for it)

Yes unimplemented!() is ok here.

Retry on Azure: the Azure SDK opens a fully lazy stream, so nothing happens until the stream is first polled. This means that we cannot use the usual mechanism here. One workaround to enable the retry mechanism would be to peak into the stream and wrap that into a retry. Do you think it is worth it?

This means that we cannot use the usual mechanism here.
I'm not entirely sure what is not working in this case. If you want to make sure we poll the stream within the body of the get_stream method, a simple workaround could be to wrap the async reader in a tokio's async BufReader. It has a fill_buf method. Make sure you comment why you do that for future generations though.

Buffered reader: The S3 SDK documentation seems to suggest that the AsyncRead should be wrapped into a BufReader. The existing file source implementation was also performing this wrapping around the tokio::fs::File. Currently, it is still the file source that performs the wrapping, but my feeling is that it should be the Storage implementation that takes care of this
when it is relevant. Any opinion on this?

Which documentation?
I wonder if this is relevant here. Under the hood the s3 client wraps a stream of large Bytes of object, which size are independent from the way we use the AsyncRead. Does it prevent polling of the next block or something?

Debouncer: I'm not sure I fully understand the role of the ins an outs of the debouncer. I added a dummy implementation of get_slice_stream that bypasses it. Does that seem sound?

Yes that's ok. The debouncer detects that several queries currently in flight are actually fetching the same piece of data, and makes sure we only emit one GET request.

@rdettai
Copy link
Contributor Author

rdettai commented Sep 29, 2023

Cache implementation

ok, switching to unimplemented

Buffered reader

Which documentation?

Very sorry for this, I had the link ready but forgot to paste it in:

But now that I think about it, it's probably just for the convenience of having .lines() which comes from [AsyncBufReadExt].(https://docs.rs/tokio/latest/tokio/io/trait.AsyncBufReadExt.html).

I took a deeper look at the code, and the BufReader is actually required in the FileSource for the same reason as in the docs mentioned above: to go through the file line by line (here). So my proposed approach for now is to leave things as they are, i.e return the finer grained AsyncRead from the Storage (exactly as it is returned by the SDKs/tokio::fs::File) and wrap it with BufReader in the file source, if not for performance reasons at least to get the utility of the line by line iterator.

Retry on Azure

If you want to make sure we poll the stream within the body of the get_stream method, a simple workaround could be to wrap the async reader in a tokio's async BufReader. It has a fill_buf method

I feel it would be relevant to ensure that the stream is polled at least once in the body, as a sort of equivalent to "opening the file". This should rarely be useful in practice as connectivity checks seem to be usually performed beforehand, but it would make the behavior more consistent with with the other Storage implementations. To be consistent with the approach I proposed above regarding buffered readers and avoid wrapping the stream in multiple layers of BufReader, I'd rather not solve this with a BufReader here. If that's fine, I'll stick to my "peek into the stream" idea as it's pretty simple an zero cost.

Debouncer

👍

@rdettai rdettai marked this pull request as ready for review September 29, 2023 19:35
Copy link
Contributor

@fulmicoton fulmicoton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug in the local file storage. See comment inline. The other comments are nitpicks.

@codecov-commenter
Copy link

Codecov Report

Attention: 35 lines in your changes are missing coverage. Please review.

Comparison is base (213b3bf) 81.83% compared to head (a2bc35d) 81.83%.
Report is 21 commits behind head on main.

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3869      +/-   ##
==========================================
- Coverage   81.83%   81.83%   -0.01%     
==========================================
  Files         401      401              
  Lines      100563   100736     +173     
==========================================
+ Hits        82300    82433     +133     
- Misses      18263    18303      +40     
Files Coverage Δ
quickwit/quickwit-config/src/source_config/mod.rs 96.86% <100.00%> (ø)
quickwit/quickwit-index-management/src/index.rs 81.51% <100.00%> (ø)
.../quickwit-indexing/src/actors/indexing_pipeline.rs 92.23% <100.00%> (+0.05%) ⬆️
...t/quickwit-indexing/src/actors/indexing_service.rs 89.40% <100.00%> (+<0.01%) ⬆️
...uickwit/quickwit-indexing/src/source/ingest/mod.rs 87.83% <100.00%> (+0.06%) ⬆️
quickwit/quickwit-indexing/src/test_utils.rs 98.94% <100.00%> (ø)
quickwit/quickwit-storage/src/debouncer.rs 96.72% <100.00%> (+0.09%) ⬆️
quickwit/quickwit-storage/src/lib.rs 100.00% <100.00%> (ø)
...torage/src/object_storage/s3_compatible_storage.rs 91.23% <100.00%> (+0.20%) ⬆️
quickwit/quickwit-storage/src/ram_storage.rs 92.66% <100.00%> (+0.41%) ⬆️
... and 10 more

... and 14 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@rdettai rdettai enabled auto-merge (squash) October 5, 2023 07:50
@rdettai rdettai merged commit 4fc31e5 into main Oct 5, 2023
4 checks passed
@rdettai rdettai deleted the object-source branch October 5, 2023 13:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Object Storage in the file source
4 participants