Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge from main: s3 listing strategy has been fixed (#9499) #9821

Open
wants to merge 1 commit into
base: q-stable-2024-07-08
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& option

namespace {

TString ParseBasePath(const TString& path) {
TString basePath = TString{TStringBuf{path}.RBefore('/')};
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
}

using namespace NThreading;
using namespace NS3Lister;

Expand Down Expand Up @@ -492,15 +497,10 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
return NextDirectoryListeningChunk;
}

static TString ParseBasePath(const TString& path) {
TString basePath = TString{TStringBuf{path}.RBefore('/')};
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
}

void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) {
result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)});
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
DirectoryPrefixQueue.clear();
}
Expand All @@ -518,10 +518,10 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
}
} else {
for (auto& directoryPrefix : listingResult.Directories) {
result.Directories.push_back({.Path = directoryPrefix.Path});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix.Path)});
}
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
DirectoryPrefixQueue.clear();
}
Expand Down Expand Up @@ -766,10 +766,10 @@ class TConcurrentBFSDirectoryResolverIterator : public IS3Lister {
// TODO: add verification
auto result = TListEntries{.Objects = Objects};
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
for (auto& directoryPrefix: InProgressPaths) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
for (auto& directoryEntry : Directories) {
result.Directories.push_back(directoryEntry);
Expand Down
4 changes: 3 additions & 1 deletion ydb/tests/fq/s3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
from ydb.tests.tools.fq_runner.kikimr_utils import AddInflightExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddAllowConcurrentListingsExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddDataInflightExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddFormatSizeLimitExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddInflightExtension
from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
Expand Down Expand Up @@ -88,6 +89,7 @@ def kikimr_params(request: pytest.FixtureRequest):
def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external_ydb_endpoint):
return [
AddInflightExtension(),
AddAllowConcurrentListingsExtension(),
AddDataInflightExtension(),
AddFormatSizeLimitExtension(),
DefaultConfigExtension(s3.s3_url),
Expand Down
78 changes: 78 additions & 0 deletions ydb/tests/fq/s3/test_s3_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,3 +557,81 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre
assert result_set.rows[5].items[1].int32_value == 15
assert result_set.rows[5].items[2].int32_value == 33
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("runtime_listing", ["false", "true"])
@pytest.mark.parametrize("kikimr_params", [{"allow_concurrent_listings": True}], indirect=True)
def test_top_level_listing_2(self, kikimr, s3, client, runtime_listing, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = '''Fruit,Price,Weight
Banana,3,100
Apple,2,22
Pear,15,33'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-09.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-09-08.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-08.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/a/2024-08-08.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/b/test.csv', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "test_top_level_listing_2"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
pragma s3.UseRuntimeListing="{runtime_listing}";

SELECT *
FROM `{storage_connection_name}`.`/2024-08-*`
WITH (format=csv_with_names, SCHEMA (
Fruit String NOT NULL,
Price Int NOT NULL,
Weight Int NOT NULL
)
);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 3
assert result_set.columns[0].name == "Fruit"
assert result_set.columns[0].type.type_id == ydb.Type.STRING
assert result_set.columns[1].name == "Price"
assert result_set.columns[1].type.type_id == ydb.Type.INT32
assert result_set.columns[2].name == "Weight"
assert result_set.columns[2].type.type_id == ydb.Type.INT32
assert len(result_set.rows) == 6
assert result_set.rows[0].items[0].bytes_value == b"Banana"
assert result_set.rows[0].items[1].int32_value == 3
assert result_set.rows[0].items[2].int32_value == 100
assert result_set.rows[1].items[0].bytes_value == b"Apple"
assert result_set.rows[1].items[1].int32_value == 2
assert result_set.rows[1].items[2].int32_value == 22
assert result_set.rows[2].items[0].bytes_value == b"Pear"
assert result_set.rows[2].items[1].int32_value == 15
assert result_set.rows[2].items[2].int32_value == 33
assert result_set.rows[3].items[0].bytes_value == b"Banana"
assert result_set.rows[3].items[1].int32_value == 3
assert result_set.rows[3].items[2].int32_value == 100
assert result_set.rows[4].items[0].bytes_value == b"Apple"
assert result_set.rows[4].items[1].int32_value == 2
assert result_set.rows[4].items[2].int32_value == 22
assert result_set.rows[5].items[0].bytes_value == b"Pear"
assert result_set.rows[5].items[1].int32_value == 15
assert result_set.rows[5].items[2].int32_value == 33
assert sum(kikimr.control_plane.get_metering(1)) == 10
12 changes: 12 additions & 0 deletions ydb/tests/tools/fq_runner/kikimr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ def apply_to_kikimr(self, request, kikimr):
del request.param["inflight"]


class AddAllowConcurrentListingsExtension(ExtensionPoint):
def is_applicable(self, request):
return (hasattr(request, 'param')
and isinstance(request.param, dict)
and "allow_concurrent_listings" in request.param)

def apply_to_kikimr(self, request, kikimr):
kikimr.allow_concurrent_listings = request.param["allow_concurrent_listings"]
kikimr.compute_plane.fq_config['gateways']['s3']['allow_concurrent_listings'] = kikimr.allow_concurrent_listings
del request.param["allow_concurrent_listings"]


class AddDataInflightExtension(ExtensionPoint):
def is_applicable(self, request):
return (hasattr(request, 'param')
Expand Down
Loading