Skip to content

Commit

Permalink
Refactor get_adapter to accept optional data_source_id.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Feb 23, 2024
1 parent 4809867 commit dbec6f7
Showing 1 changed file with 65 additions and 53 deletions.
118 changes: 65 additions & 53 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,63 +409,69 @@ async def lookup_adapter(
self.context, node, access_policy=self.access_policy
)

async def get_adapter(self):
async def get_adapter(self, data_source_id=None):
num_data_sources = len(self.data_sources)
if num_data_sources > 1:
raise NotImplementedError
if num_data_sources == 1:
(data_source,) = self.data_sources
try:
adapter_factory = self.context.adapters_by_mimetype[
data_source.mimetype
]
except KeyError:
raise RuntimeError(
f"Server configuration has no adapter for mimetype {data_source.mimetype!r}"
if data_source_id is not None:
for data_source in self.data_sources:
if data_source_id == data_source.id:
break
else:
raise ValueError(
f"No such data_source_id {data_source_id} on this node"
)
parameters = collections.defaultdict(list)
for asset in data_source.assets:
if asset.parameter is None:
continue
scheme = urlparse(asset.data_uri).scheme
if scheme != "file":
raise NotImplementedError(
f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}"
)
if scheme == "file":
# Protect against misbehaving clients reading from unintended
# parts of the filesystem.
asset_path = path_from_uri(asset.data_uri)
for readable_storage in self.context.readable_storage:
if Path(
os.path.commonpath(
[path_from_uri(readable_storage), asset_path]
)
) == path_from_uri(readable_storage):
break
else:
raise RuntimeError(
f"Refusing to serve {asset.data_uri} because it is outside "
"the readable storage area for this server."
elif num_data_sources > 1:
raise ValueError(
"A data_source_id is required because this node "
f"has {num_data_sources} data sources"
)
(data_source,) = self.data_sources
try:
adapter_factory = self.context.adapters_by_mimetype[data_source.mimetype]
except KeyError:
raise RuntimeError(
f"Server configuration has no adapter for mimetype {data_source.mimetype!r}"
)
parameters = collections.defaultdict(list)
for asset in data_source.assets:
if asset.parameter is None:
continue
scheme = urlparse(asset.data_uri).scheme
if scheme != "file":
raise NotImplementedError(
f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}"
)
if scheme == "file":
# Protect against misbehaving clients reading from unintended
# parts of the filesystem.
asset_path = path_from_uri(asset.data_uri)
for readable_storage in self.context.readable_storage:
if Path(
os.path.commonpath(
[path_from_uri(readable_storage), asset_path]
)
if asset.num is None:
parameters[asset.parameter] = asset.data_uri
) == path_from_uri(readable_storage):
break
else:
parameters[asset.parameter].append(asset.data_uri)
adapter_kwargs = dict(parameters)
adapter_kwargs.update(data_source.parameters)
adapter_kwargs["specs"] = self.node.specs
adapter_kwargs["metadata"] = self.node.metadata_
adapter_kwargs["structure"] = data_source.structure
adapter_kwargs["access_policy"] = self.access_policy
adapter = await anyio.to_thread.run_sync(
partial(adapter_factory, **adapter_kwargs)
)
for query in self.queries:
adapter = adapter.search(query)
return adapter
else: # num_data_sources == 0
assert False
raise RuntimeError(
f"Refusing to serve {asset.data_uri} because it is outside "
"the readable storage area for this server."
)
if asset.num is None:
parameters[asset.parameter] = asset.data_uri
else:
parameters[asset.parameter].append(asset.data_uri)
adapter_kwargs = dict(parameters)
adapter_kwargs.update(data_source.parameters)
adapter_kwargs["specs"] = self.node.specs
adapter_kwargs["metadata"] = self.node.metadata_
adapter_kwargs["structure"] = data_source.structure
adapter_kwargs["access_policy"] = self.access_policy
adapter = await anyio.to_thread.run_sync(
partial(adapter_factory, **adapter_kwargs)
)
for query in self.queries:
adapter = adapter.search(query)
return adapter

def new_variation(
self,
Expand Down Expand Up @@ -958,6 +964,11 @@ async def write_partition(self, *args, **kwargs):
)


class CatalogUnionAdapter(CatalogNodeAdapter):
# This does not support direct reading or writing.
pass


def delete_asset(data_uri, is_directory):
url = urlparse(data_uri)
if url.scheme == "file":
Expand Down Expand Up @@ -1301,4 +1312,5 @@ def specs_array_to_json(specs):
StructureFamily.container: CatalogContainerAdapter,
StructureFamily.sparse: CatalogSparseAdapter,
StructureFamily.table: CatalogTableAdapter,
StructureFamily.union: CatalogUnionAdapter,
}

0 comments on commit dbec6f7

Please sign in to comment.