-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Full support for decompressive transcoding (e.g. Content-Encoding: gzip ) during reading #635
base: main
Are you sure you want to change the base?
Conversation
This fixes fsspec#461 and fsspec#233 without needing users to change their existing code. The bugs were caused by assumptions in fsspec about the veracity and non-ambiguity of 'size' information returned by AbstractBufferedFile subclasses like GCSFile and AbstractFileSystem subclasses like GCSFileSystem (e.g. `self.size = self.details["size"]` in `AbstractBufferedFile`, which is used by all base caches to truncate requests and responses). Since in GCS if compression-at-rest/compression transcoding is used there's no way to retrieve the real size of the object's *content* without decompressing the whole thing either server or client side, fixing these issues required overriding some behaviors in the underlying base classes. Care was taken to preserve behavior for storage objects not using compression at rest, however. This commit: 1) adds a read() implementation in GCSFile which allows calls to succeed even when size isn't well-defined. It's 2) adds a TranscodingReadAheadCache, which is mostly identical to the readahead cache that GCSFile already uses but allows end = None to read until the end of the file, while still handling cached data prefixes. 3) changes FileSystem _info() to set size = None if contentEncoding is gzip. 4) changes _cat_file() to fetch information on the object we want to cat, and if it uses compressive transcoding then the resulting GCSFile uses the GCSTranscodingReadAhead cache instead of the incompatible ReadAhead cache. We could probably use the new cache for everything since it should function equivalently for files which have a well-defined size, but this lowers the risk of having missed an edge case. The fix keeps the data handling for GCS files which do not use compression at rest/compressive trnscoding identical, while adding new control flow to detect when transcoding is done and adding some logic for handling those edge-cases. This did unfortunately mean implementing implementing variant methods with only minor changes to how they perform underlying operations (e.g. read() in GCSFile) which were previously just inherited from AbstractBufferedFile. It does introduce one new semantic to GCSFs. [In line with fsspec's ArchiveFileSystem](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.archive.AbstractArchiveFileSystem.info) semantics, GCSFs will return size = None when the file can not be determined fully in advance. This allows us to distinguish known zero size andunknown size, which was a major issue. The only new performance overhead seen by non-users of compressive decoding is a single info() call resulting in a HEAD request done before the point where we create the GCSFile object in GCSFilesystem, because we need to swap out the cache to one compatible with the lack of concrete file size but do not yet have the information to make that control flow decision. This means we make two requests instead of one. We can probably switch to the new transcoding cache wholesale in the future when we have faith it holds up to eliminate this call though, but I made it work this way to keep the data and control flow the same for the base case where users are not using compressive transcoding. Since the compression-at-rest case was completely broken, doing it this way means that even if these changes end up disasterous (it shouldn't though) it'll only break something which is already broken.
As an aside, this is my first pull request on GitHub to a broader community project, so let me know if there's anything I can do to polish this up. |
Before we merge I think we should add some unit tests for the transcoding case, the new cache and maybe beef up some test cases in the non-transcoding case (e.g. normally uploaded gzip files) to ensure we don't introduce some new issue or cause regressions in the future, but I could use some help in understanding how to develop for the test suite here, test them locally, test them against whatever dev buckets there are, etc. I'm familiar with pytest though. |
if self.closed: | ||
raise ValueError("I/O operation on closed file.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be at the top? If the file is closed, nothing else should be done
if end is not None: | ||
if start is None: | ||
start = 0 | ||
if end is None or end > self.size: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if end is None
will ever be true since in the outer block we mentioned if end is not None
|
||
name = "gcstranscodingreadahead" | ||
|
||
def __init__(self, blocksize: int, fetcher: fsspec.caching.Fetcher, size: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide some examples of how I can pass fetcher
? I see it is of type.
Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
what implementation can one pass? I agree that having unit tests would be good.
if start >= self.start and end <= self.end: | ||
# cache hit | ||
self.hit_count += 1 | ||
return self.cache[start - self.start: end - self.start] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this took me some time to understand. So the cache only contains one contiguous segment and cannot hold more than one segment, as evidenced by self.start
and self.end
. Maybe it should be named self.cache_start
and self.cache_end
?
The cache holds a subset of data as a byte array and stores the cursor location within the actual content. Will it make sense to have multi-segment cache?
""" | ||
length = -1 if length is None else int(length) | ||
if self.mode != "rb": | ||
raise ValueError("File not in read mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to provide more helpful error message
raise ValueError(f"File '{self.path}' is not in read mode")
There is quite a lot of code here, and I haven't yet had a chance to look through it, sorry.
Welcome, and thanks for getting involved. One comment on caching: only readahead and none caching make sense, since we can only really stream from the start when using gzip. Question:
|
I gotta look back into this a bit more extensively. I've noticed many subtle and not so subtle issues with my attempt at solving this and have to go back to the drawing board. Thanks for your patience |
This fixes #461 and #233 without needing users to change their existing code.
The bugs were caused by assumptions in fsspec about the veracity and non-ambiguity of 'size' information returned by AbstractBufferedFile subclasses like GCSFile and AbstractFileSystem subclasses like GCSFileSystem (e.g.
self.size = self.details["size"]
inAbstractBufferedFile
, which is used by all base caches to truncate requests and responses). Since in GCS if compression-at-rest/compression transcoding is used there's no way to retrieve the real size of the object's content without decompressing the whole thing either server or client side, fixing these issues required overriding some behaviors in the underlying base classes. Care was taken to preserve behavior for storage objects not using compression at rest, however.The fix keeps the data handling for GCS files which do not use compression at rest/compressive transcoding mostly identical by adding new control flow to detect when transcoding is done and adding some branch logic for handling those edge-cases. This did unfortunately mean implementing implementing variant methods to base classes with only minor changes to how they perform underlying operations (e.g. read() in GCSFile) which were previously just inherited from AbstractBufferedFile.
It does introduce one new semantic to GCSFs. In line with fsspec's ArchiveFileSystem semantics, GCSFile will return size = None when the file can not be determined fully in advance. This allows us to distinguish known zero size and unknown size, but on the plus side this also means end-users have greater guarantees that sizes they get back are meaningful.
The only new performance overhead seen by non-users of compressive decoding is a single info() call resulting in a HEAD request done before the point where we create the GCSFile object in GCSFilesystem on file open()s, because we need to swap out the cache to one compatible with the lack of concrete file size but do not yet have the information to make that control flow decision. We can probably switch to the new transcoding cache wholesale in the future when we have faith it holds up to eliminate this call though, but I made it work this way to keep the data and control flow the same for the base case where users are not using compressive transcoding.
Since the compression-at-rest case was completely broken, doing it this way means that even if these changes end up totally disastrous (they shouldn't though) they should only break things which were already broken and therefore this pull request should be very low risk.