Skip to content

Commit

Permalink
Fix small transaction (#586)
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Oct 6, 2023
1 parent f83ce71 commit dd3c7b5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
9 changes: 4 additions & 5 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ def _upload_chunk(self, final=False):
head = {}
l = len(data)

if (l < GCS_MIN_BLOCK_SIZE) and not final:
if (l < GCS_MIN_BLOCK_SIZE) and (not final or not self.autocommit):
# either flush() was called, but we don't have enough to
# push, or we split a big upload, and have less left than one
# block. If this is the final part, OK to violate those
Expand Down Expand Up @@ -1818,13 +1818,12 @@ def discard(self):
"""
if self.location is None:
return
uid = re.findall("upload_id=([^&=?]+)", self.location)
self.gcsfs.call(
"DELETE",
f"{self.fs._location}/upload/storage/v1/b/{quote(self.bucket)}/o",
params={"uploadType": "resumable", "upload_id": uid},
json_out=True,
self.location,
)
self.location = None
self.closed = True

def _simple_upload(self):
"""One-shot upload, less than 5MB"""
Expand Down
3 changes: 2 additions & 1 deletion gcsfs/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def validate_response(status, content, path, args=None):
r: requests response object
path: associated URL path, for error messages
"""
if status >= 400:
if status >= 400 and status != 499:
# 499 is special "upload was cancelled" status
if args:
from .core import quote

Expand Down
23 changes: 23 additions & 0 deletions gcsfs/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,29 @@ def test_copy_cache_invalidated(gcs):
assert gcs.isfile(target_file2)


def test_transaction(gcs):
# https://github.com/fsspec/gcsfs/issues/389
if not gcs.on_google:
pytest.skip()
try:
with gcs.transaction:
with gcs.open(f"{TEST_BUCKET}/foo", "wb") as f:
f.write(b"This is a test string")
f.discard()
assert not gcs.exists(f"{TEST_BUCKET}/foo")
raise ZeroDivisionError
except ZeroDivisionError:
...
assert not gcs.exists(f"{TEST_BUCKET}/foo")

with gcs.transaction:
with gcs.open(f"{TEST_BUCKET}/foo", "wb") as f:
f.write(b"This is a test string")
assert not gcs.exists(f"{TEST_BUCKET}/foo")

assert gcs.cat(f"{TEST_BUCKET}/foo") == b"This is a test string"


def test_find_maxdepth(gcs):
assert gcs.find(f"{TEST_BUCKET}/nested", maxdepth=None) == [
f"{TEST_BUCKET}/nested/file1",
Expand Down

0 comments on commit dd3c7b5

Please sign in to comment.