Skip to content

Commit

Permalink
Fix recursive delete on hierarchical namespace accounts (#454)
Browse files Browse the repository at this point in the history
Fix recursive delete on hierarchical namespace accounts
  • Loading branch information
Tom-Newton authored Jan 28, 2024
1 parent 488fdf0 commit 18ea349
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 14 deletions.
65 changes: 55 additions & 10 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import os
import re
import typing
import warnings
import weakref
from collections import defaultdict
Expand Down Expand Up @@ -1124,11 +1125,11 @@ def makedir(self, path, exist_ok=False):

async def _rm(
self,
path,
recursive=False,
maxdepth=None,
delimiter="/",
expand_path=True,
path: typing.Union[str, typing.List[str]],
recursive: bool = False,
maxdepth: typing.Optional[int] = None,
delimiter: str = "/",
expand_path: bool = True,
**kwargs,
):
"""Delete files.
Expand Down Expand Up @@ -1185,29 +1186,73 @@ async def _rm(

rm = sync_wrapper(_rm)

async def _rm_files(self, container_name, files, **kwargs):
async def _rm_files(
self, container_name: str, file_paths: typing.Iterable[str], **kwargs
):
"""
Delete the given file(s)
Parameters
----------
path: str or list of str
file_paths: iterable of str
File(s) to delete.
"""
async with self.service_client.get_container_client(
container=container_name
) as cc:
exs = await asyncio.gather(
(
files,
directory_markers,
) = await self._separate_directory_markers_for_non_empty_directories(
file_paths
)

# Files and directory markers of empty directories can be deleted in any order. We delete them all
# asynchronously for performance reasons.
file_exs = await asyncio.gather(
*([cc.delete_blob(file) for file in files]), return_exceptions=True
)
for ex in exs:
for ex in file_exs:
if ex is not None:
raise ex
for file in files:

# Directory markers of non-empty directories must be deleted in reverse order to avoid deleting a directory
# marker before the directory is empty. If these are deleted out of order we will get
# `This operation is not permitted on a non-empty directory.` on hierarchical namespace storage accounts.
for directory_marker in reversed(directory_markers):
cc.delete_blob(directory_marker)

for file in file_paths:
self.invalidate_cache(self._parent(file))

sync_wrapper(_rm_files)

async def _separate_directory_markers_for_non_empty_directories(
self, file_paths: typing.Iterable[str]
) -> typing.Tuple[typing.List[str], typing.List[str]]:
"""
Distinguish directory markers of non-empty directories from files and directory markers for empty directories.
A directory marker is an empty blob who's name is the path of the directory.
"""
unique_sorted_file_paths = sorted(set(file_paths)) # Remove duplicates and sort
directory_markers = []
files = [
unique_sorted_file_paths[-1]
] # The last file lexographically cannot be a directory marker for a non-empty directory.

for file, next_file in zip(
unique_sorted_file_paths, unique_sorted_file_paths[1:]
):
# /path/to/directory -- directory marker
# /path/to/directory/file -- file in directory
# /path/to/directory2/file -- file in different directory
if next_file.startswith(file + "/"):
directory_markers.append(file)
else:
files.append(file)

return files, directory_markers

def rmdir(self, path: str, delimiter="/", **kwargs):
sync(self.loop, self._rmdir, path, delimiter=delimiter, **kwargs)

Expand Down
4 changes: 3 additions & 1 deletion adlfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def host(request):
return request.config.getoption("--host")


@pytest.fixture(scope="session")
@pytest.fixture(scope="function")
def storage(host):
"""
Create blob using azurite.
Expand Down Expand Up @@ -56,6 +56,8 @@ def storage(host):
container_client.upload_blob("root/e+f/file2.txt", data)
yield bbs

bbs.delete_container("data")


@pytest.fixture(scope="session")
def event_loop():
Expand Down
53 changes: 50 additions & 3 deletions adlfs/tests/test_spec.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime
import tempfile
from unittest import mock

import azure.storage.blob
import azure.storage.blob.aio
import dask.dataframe as dd
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -401,10 +403,14 @@ def test_time_info(storage):
)

creation_time = fs.created("data/root/d/file_with_metadata.txt")
assert creation_time == storage.insert_time
assert_almost_equal(
creation_time, storage.insert_time, datetime.timedelta(seconds=1)
)

modified_time = fs.modified("data/root/d/file_with_metadata.txt")
assert modified_time == storage.insert_time
assert_almost_equal(
modified_time, storage.insert_time, datetime.timedelta(seconds=1)
)


def test_find(storage):
Expand Down Expand Up @@ -721,7 +727,13 @@ def test_rm(storage):
fs.ls("/data/root/a/file.txt", refresh=True)


def test_rm_recursive(storage):
@mock.patch.object(
azure.storage.blob.aio.ContainerClient,
"delete_blob",
side_effect=azure.storage.blob.aio.ContainerClient.delete_blob,
autospec=True,
)
def test_rm_recursive(mock_delete_blob, storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR
)
Expand All @@ -738,6 +750,41 @@ def test_rm_recursive(storage):
with pytest.raises(FileNotFoundError):
fs.ls("data/root/c")

assert mock_delete_blob.mock_calls[-1] == mock.call(
mock.ANY, "root/c"
), "The directory deletion should be the last call"


@mock.patch.object(
azure.storage.blob.aio.ContainerClient,
"delete_blob",
side_effect=azure.storage.blob.aio.ContainerClient.delete_blob,
autospec=True,
)
def test_rm_recursive2(mock_delete_blob, storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR
)

assert "data/root" in fs.ls("/data")

fs.rm("data/root", recursive=True)
assert "data/root" not in fs.ls("/data")

with pytest.raises(FileNotFoundError):
fs.ls("data/root")

last_deleted_paths = [call.args[1] for call in mock_delete_blob.mock_calls[-7:]]
assert last_deleted_paths == [
"root/e+f",
"root/d",
"root/c",
"root/b",
"root/a1",
"root/a",
"root",
], "The directory deletion should be in reverse lexographical order"


def test_rm_multiple_items(storage):
fs = AzureBlobFileSystem(
Expand Down

0 comments on commit 18ea349

Please sign in to comment.