Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement info() and ls() using trustless gateway spec #33

Merged
merged 10 commits into from
Sep 19, 2024
2 changes: 1 addition & 1 deletion .github/workflows/local_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
max-parallel: 4
matrix:
python-version: ["3.8", "3.9", "3.10"]
ipfs-version: ["0.27.0"] # this is the latest IPFS version supporting /api/v0, see issue #28
ipfs-version: ["0.30.0"]
steps:
- uses: actions/checkout@v1
- name: Set up Python ${{ matrix.python-version }}
Expand Down
191 changes: 98 additions & 93 deletions ipfsspec/async_ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,104 +6,34 @@
from pathlib import Path
import warnings

import asyncio
import aiohttp

from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec.exceptions import FSTimeoutError

from multiformats import CID, multicodec
from . import unixfsv1

import logging

logger = logging.getLogger("ipfsspec")

DagPbCodec = multicodec.get("dag-pb")
RawCodec = multicodec.get("raw")

class RequestsTooQuick(OSError):
def __init__(self, retry_after=None):
self.retry_after = retry_after


class AsyncIPFSGatewayBase:
async def stat(self, path, session):
res = await self.api_get("files/stat", session, arg=path)
self._raise_not_found_for_status(res, path)
return await res.json()

async def file_info(self, path, session):
info = {"name": path}

headers = {"Accept-Encoding": "identity"} # this ensures correct file size
res = await self.cid_head(path, session, headers=headers, allow_redirects=True)

async with res:
self._raise_not_found_for_status(res, path)
if res.status != 200:
# TODO: maybe handle 301 here
raise FileNotFoundError(path)
if "Content-Length" in res.headers:
info["size"] = int(res.headers["Content-Length"])
elif "Content-Range" in res.headers:
info["size"] = int(res.headers["Content-Range"].split("/")[1])

if "ETag" in res.headers:
etag = res.headers["ETag"].strip("\"")
info["ETag"] = etag
if etag.startswith("DirIndex"):
info["type"] = "directory"
info["CID"] = etag.split("-")[-1]
else:
info["type"] = "file"
info["CID"] = etag

return info

async def cat(self, path, session):
res = await self.cid_get(path, session)
async with res:
self._raise_not_found_for_status(res, path)
if res.status != 200:
raise FileNotFoundError(path)
return await res.read()

async def ls(self, path, session):
res = await self.api_get("ls", session, arg=path)
self._raise_not_found_for_status(res, path)
resdata = await res.json()
types = {1: "directory", 2: "file"}
return [{
"name": path + "/" + link["Name"],
"CID": link["Hash"],
"type": types[link["Type"]],
"size": link["Size"],
}
for link in resdata["Objects"][0]["Links"]]

def _raise_not_found_for_status(self, response, url):
"""
Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
"""
if response.status == 404:
raise FileNotFoundError(url)
elif response.status == 400:
raise FileNotFoundError(url)
response.raise_for_status()


class AsyncIPFSGateway(AsyncIPFSGatewayBase):
class AsyncIPFSGateway:
resolution = "path"

def __init__(self, url, protocol="ipfs"):
self.url = url
self.protocol = protocol

async def api_get(self, endpoint, session, **kwargs):
res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url})
self._raise_requests_too_quick(res)
return res

async def api_post(self, endpoint, session, **kwargs):
res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url})
self._raise_requests_too_quick(res)
return res

async def _cid_req(self, method, path, headers=None, **kwargs):
headers = headers or {}
if self.resolution == "path":
Expand All @@ -116,17 +46,12 @@ async def _cid_req(self, method, path, headers=None, **kwargs):
self._raise_requests_too_quick(res)
return res

async def cid_head(self, path, session, headers=None, **kwargs):
async def head(self, path, session, headers=None, **kwargs):
return await self._cid_req(session.head, path, headers=headers, **kwargs)

async def cid_get(self, path, session, headers=None, **kwargs):
async def get(self, path, session, headers=None, **kwargs):
return await self._cid_req(session.get, path, headers=headers, **kwargs)

async def version(self, session):
res = await self.api_get("version", session)
res.raise_for_status()
return await res.json()

@staticmethod
def _raise_requests_too_quick(response):
if response.status == 429:
Expand All @@ -139,6 +64,90 @@ def _raise_requests_too_quick(response):
def __str__(self):
return f"GW({self.url})"

async def info(self, path, session):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
self._raise_not_found_for_status(res, path)
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
resdata = await res.read()

if cid.codec == RawCodec:
return {
"name": path,
"CID": str(cid),
"type": "file",
"size": len(resdata),
}
elif cid.codec == DagPbCodec:
node = unixfsv1.PBNode.loads(resdata)
data = unixfsv1.Data.loads(node.Data)
if data.Type == unixfsv1.DataType.Raw:
raise FileNotFoundError(path) # this is not a file, it's only a part of it
elif data.Type == unixfsv1.DataType.Directory:
return {
"name": path,
"CID": str(cid),
"type": "directory",
"islink": False,
}
elif data.Type == unixfsv1.DataType.File:
return {
"name": path,
"CID": str(cid),
"type": "file",
"size": data.filesize,
"islink": False,
}
elif data.Type == unixfsv1.DataType.Metadata:
raise NotImplementedError(f"The path '{path}' contains a Metadata node, this is currently not implemented")
elif data.Type == unixfsv1.DataType.Symlink:
return {
"name": path,
"CID": str(cid),
"type": "other", # TODO: maybe we should have directory or file as returning type, but that probably would require resolving at least another level of blocks
"islink": True,
}
elif data.Type == unixfsv1.DataType.HAMTShard:
raise NotImplementedError(f"The path '{path}' contains a HAMTSharded directory, this is currently not implemented")
else:
raise FileNotFoundError(path) # it exists, but is not a UNIXFSv1 object, so it's not a file

async def cat(self, path, session):
res = await self.get(path, session)
async with res:
self._raise_not_found_for_status(res, path)
return await res.read()

async def ls(self, path, session, detail=False):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
self._raise_not_found_for_status(res, path)
resdata = await res.read()
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
assert cid.codec == DagPbCodec, "this is not a directory"
node = unixfsv1.PBNode.loads(resdata)
data = unixfsv1.Data.loads(node.Data)
if data.Type != unixfsv1.DataType.Directory:
# TODO: we might need support for HAMTShard here (for large directories)
raise NotADirectoryError(path)

if detail:
return await asyncio.gather(*(
self.info(path + "/" + link.Name, session)
for link in node.Links))
else:
return [path + "/" + link.Name for link in node.Links]

def _raise_not_found_for_status(self, response, url):
"""
Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
"""
if response.status == 404: # returned for known missing files
raise FileNotFoundError(url)
elif response.status == 400: # return for invalid requests, so it's also certainly not there
raise FileNotFoundError(url)
response.raise_for_status()




async def get_client(**kwargs):
timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5)
Expand Down Expand Up @@ -167,14 +176,14 @@ def get_gateway(protocol="ipfs"):
ipfs_gateway = os.environ.get("IPFS_GATEWAY", "")
if ipfs_gateway:
logger.debug("using IPFS gateway from IPFS_GATEWAY environment variable: %s", ipfs_gateway)
return AsyncGateway(ipfs_gateway, protocol)
return AsyncIPFSGateway(ipfs_gateway, protocol)

# internal configuration: accept IPFSSPEC_GATEWAYS for backwards compatibility
if ipfsspec_gateways := os.environ.get("IPFSSPEC_GATEWAYS", ""):
ipfs_gateway = ipfsspec_gateways.split()[0]
logger.debug("using IPFS gateway from IPFSSPEC_GATEWAYS environment variable: %s", ipfs_gateway)
warnings.warn("The IPFSSPEC_GATEWAYS environment variable is deprecated, please configure your IPFS Gateway according to IPIP-280, e.g. by using the IPFS_GATEWAY environment variable or using the ~/.ipfs/gateway file.", DeprecationWarning)
return AsyncGateway(ipfs_gateway, protocol)
return AsyncIPFSGateway(ipfs_gateway, protocol)

# check various well-known files for possible gateway configurations
if ipfs_path := os.environ.get("IPFS_PATH", ""):
Expand Down Expand Up @@ -274,11 +283,7 @@ async def set_session(self):
async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
session = await self.set_session()
res = await self.gateway.ls(path, session)
if detail:
return res
else:
return [r["name"] for r in res]
return await self.gateway.ls(path, session, detail=detail)

ls = sync_wrapper(_ls)

Expand All @@ -290,11 +295,11 @@ async def _cat_file(self, path, start=None, end=None, **kwargs):
async def _info(self, path, **kwargs):
path = self._strip_protocol(path)
session = await self.set_session()
return await self.gateway.file_info(path, session)
return await self.gateway.info(path, session)

def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs):
if mode != "rb":
raise NotImplementedError
raise NotImplementedError("opening modes other than read binary are not implemented")
data = self.cat_file(path) # load whole chunk into memory
return io.BytesIO(data)

Expand Down
109 changes: 109 additions & 0 deletions ipfsspec/unixfsv1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
from UNIXFS spec (https://github.com/ipfs/specs/blob/master/UNIXFS.md):

message Data {
enum DataType {
Raw = 0;
Directory = 1;
File = 2;
Metadata = 3;
Symlink = 4;
HAMTShard = 5;
}

required DataType Type = 1;
optional bytes Data = 2;
optional uint64 filesize = 3;
repeated uint64 blocksizes = 4;
optional uint64 hashType = 5;
optional uint64 fanout = 6;
optional uint32 mode = 7;
optional UnixTime mtime = 8;
}

message Metadata {
optional string MimeType = 1;
}

message UnixTime {
required int64 Seconds = 1;
optional fixed32 FractionalNanoseconds = 2;
}



from DAG-PB spec (https://ipld.io/specs/codecs/dag-pb/spec/):

message PBLink {
// binary CID (with no multibase prefix) of the target object
optional bytes Hash = 1;

// UTF-8 string name
optional string Name = 2;

// cumulative size of target object
optional uint64 Tsize = 3;
}

message PBNode {
// refs to other objects
repeated PBLink Links = 2;

// opaque user data
optional bytes Data = 1;
}
"""

from dataclasses import dataclass
from enum import IntEnum
from typing import List, Optional

from pure_protobuf.dataclasses_ import field, message # type: ignore
from pure_protobuf.types import uint32, uint64, int64, fixed32 # type: ignore

class DataType(IntEnum):
Raw = 0
Directory = 1
File = 2
Metadata = 3
Symlink = 4
HAMTShard = 5

@message
@dataclass
class UnixTime:
Seconds: int64 = field(1)
FractionalNanoseconds: Optional[fixed32] = field(2)

@message
@dataclass
class Data:
# pylint: disable=too-many-instance-attributes
Type: DataType = field(1)
Data: Optional[bytes] = field(2, default=None)
filesize: Optional[uint64] = field(3, default=None)
blocksizes: List[uint64] = field(4, default_factory=list, packed=False)
hashType: Optional[uint64] = field(5, default=None)
fanout: Optional[uint64] = field(6, default=None)
mode: Optional[uint32] = field(7, default=None)
mtime: Optional[UnixTime] = field(8, default=None)

@message
@dataclass
class Metadata:
MimeType: Optional[str] = field(1, default=None)


@message
@dataclass
class PBLink:
Hash: Optional[bytes] = field(1, default=None)
Name: Optional[str] = field(2, default=None)
Tsize: Optional[uint64] = field(3, default=None)

Data_ = Data
@message
@dataclass
class PBNode:
Links: List[PBLink] = field(2, default_factory=list)
Data: Optional[bytes] = field(1, default=None)
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
"fsspec>=0.9.0",
"requests",
"aiohttp",
"multiformats",
"dag-cbor >= 0.2.2",
"pure-protobuf >= 2.1.0, <3",
],
entry_points={
'fsspec.specs': [
Expand Down
Loading
Loading