Skip to content

Commit

Permalink
Merge pull request #30 from fsspec/next
Browse files Browse the repository at this point in the history
getting rid of some old mess
  • Loading branch information
d70-t authored Sep 1, 2024
2 parents 412c542 + 100b14f commit 2abf718
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 690 deletions.
34 changes: 0 additions & 34 deletions .github/workflows/default_gateways.yml

This file was deleted.

7 changes: 1 addition & 6 deletions .github/workflows/local_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ jobs:
max-parallel: 4
matrix:
python-version: ["3.8", "3.9", "3.10"]
ipfs-version: ["0.12.0"]
include:
- python-version: "3.10"
ipfs-version: "0.9.1"
env:
IPFSSPEC_GATEWAYS: "http://127.0.0.1:8080" # use only localhost as gateway
ipfs-version: ["0.27.0"] # this is the latest IPFS version supporting /api/v0, see issue #28
steps:
- uses: actions/checkout@v1
- name: Set up Python ${{ matrix.python-version }}
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ with fsspec.open("ipfs://QmZ4tDuvesekSs4qM5ZBKpXiZGun7S2CYtEZRB3DYXkjGx", "r") a
print(f.read())
```

The current implementation uses a HTTP gateway to access the data. It tries to use a local one (which is expected to be found at `http://127.0.0.1:8080`) and falls back to `ipfs.io` if the local gateway is not available.
The current implementation uses a HTTP gateway to access the data. It uses [IPIP-280](https://github.com/ipfs/specs/pull/280) to determine which gateway to use. If you have a current installation of an IPFS node (e.g. kubo, IPFS Desktop etc...), you should be fine. In case you want to use a different gateway, you can use any of the methods specified in IPIP-280, e.g.:

You can modify the list of gateways using the space separated environment variable `IPFSSPEC_GATEWAYS`.
* create the file `~/.ipfs/gateway` with the gateway address as first line
* define the environment variable `IPFS_GATEWAY` to the gateway address
* create the file `/etc/ipfs/gateway` with the gateway address as first line

No matter which option you use, the gateway has to be specified as an HTTP(S) url, e.g.: `http://127.0.0.1:8080`.
3 changes: 1 addition & 2 deletions ipfsspec/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from .core import IPFSFileSystem
from .async_ipfs import AsyncIPFSFileSystem

from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

__all__ = ["__version__", "IPFSFileSystem", "AsyncIPFSFileSystem"]
__all__ = ["__version__", "AsyncIPFSFileSystem"]
206 changes: 88 additions & 118 deletions ipfsspec/async_ipfs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import io
import time
import os
import platform
import weakref
from functools import lru_cache
from pathlib import Path
import warnings

import asyncio
import aiohttp

from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec.exceptions import FSTimeoutError

from .utils import get_default_gateways

import logging

logger = logging.getLogger("ipfsspec")
Expand Down Expand Up @@ -138,129 +139,98 @@ def __str__(self):
return f"GW({self.url})"


class GatewayState:
def __init__(self):
self.reachable = True
self.next_request_time = 0
self.backoff_time = 0
self.start_backoff = 1e-5
self.max_backoff = 5

def schedule_next(self):
self.next_request_time = time.monotonic() + self.backoff_time

def backoff(self):
if self.backoff_time < self.start_backoff:
self.backoff_time = self.start_backoff
else:
self.backoff_time *= 2
self.reachable = True
self.schedule_next()

def speedup(self, not_below=0):
did_speed_up = False
if self.backoff_time > not_below:
self.backoff_time *= 0.9
did_speed_up = True
self.reachable = True
self.schedule_next()
return did_speed_up

def broken(self):
self.backoff_time = self.max_backoff
self.reachable = False
self.schedule_next()

def trying_to_reach(self):
self.next_request_time = time.monotonic() + 1


class MultiGateway(AsyncIPFSGatewayBase):
def __init__(self, gws, max_backoff_rounds=50):
self.gws = [(GatewayState(), gw) for gw in gws]
self.max_backoff_rounds = max_backoff_rounds

@property
def _gws_in_priority_order(self):
now = time.monotonic()
return sorted(self.gws, key=lambda x: max(now, x[0].next_request_time))

async def _gw_op(self, op):
for _ in range(self.max_backoff_rounds):
for state, gw in self._gws_in_priority_order:
not_before = state.next_request_time
if not state.reachable:
state.trying_to_reach()
else:
state.schedule_next()
now = time.monotonic()
if not_before > now:
await asyncio.sleep(not_before - now)
logger.debug("tring %s", gw)
try:
res = await op(gw)
if state.speedup(time.monotonic() - now):
logger.debug("%s speedup", gw)
return res
except FileNotFoundError: # early exit if object doesn't exist
raise
except (RequestsTooQuick, aiohttp.ClientResponseError, asyncio.TimeoutError) as e:
state.backoff()
logger.debug("%s backoff %s", gw, e)
break
except IOError as e:
exception = e
state.broken()
logger.debug("%s broken", gw)
continue
else:
raise exception
raise RequestsTooQuick()

async def api_get(self, endpoint, session, **kwargs):
return await self._gw_op(lambda gw: gw.api_get(endpoint, session, **kwargs))

async def api_post(self, endpoint, session, **kwargs):
return await self._gw_op(lambda gw: gw.api_post(endpoint, session, **kwargs))

async def cid_head(self, path, session, headers=None, **kwargs):
return await self._gw_op(lambda gw: gw.cid_head(path, session, headers=headers, **kwargs))

async def cid_get(self, path, session, headers=None, **kwargs):
return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs))

async def cat(self, path, session):
return await self._gw_op(lambda gw: gw.cat(path, session))

async def ls(self, path, session):
return await self._gw_op(lambda gw: gw.ls(path, session))

def state_report(self):
return "\n".join(f"{s.next_request_time}, {gw}" for s, gw in self.gws)

def __str__(self):
return "Multi-GW(" + ", ".join(str(gw) for _, gw in self.gws) + ")"


async def get_client(**kwargs):
timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5)
kwargs = {"timeout": timeout, **kwargs}
return aiohttp.ClientSession(**kwargs)


DEFAULT_GATEWAY = None
def gateway_from_file(gateway_path):
if gateway_path.exists():
with open(gateway_path) as gw_file:
ipfs_gateway = gw_file.readline().strip()
logger.debug("using IPFS gateway from %s: %s", gateway_path, ipfs_gateway)
return AsyncIPFSGateway(ipfs_gateway)
return None


@lru_cache
def get_gateway():
global DEFAULT_GATEWAY
if DEFAULT_GATEWAY is None:
use_gateway(*get_default_gateways())
return DEFAULT_GATEWAY


def use_gateway(*urls):
global DEFAULT_GATEWAY
DEFAULT_GATEWAY = MultiGateway([AsyncIPFSGateway(url) for url in urls])
"""
Get IPFS gateway according to IPIP-280
see: https://github.com/ipfs/specs/pull/280
"""

# IPFS_GATEWAY environment variable should override everything
ipfs_gateway = os.environ.get("IPFS_GATEWAY", "")
if ipfs_gateway:
logger.debug("using IPFS gateway from IPFS_GATEWAY environment variable: %s", ipfs_gateway)
return AsyncIPFSGateway(ipfs_gateway)

# internal configuration: accept IPFSSPEC_GATEWAYS for backwards compatibility
if ipfsspec_gateways := os.environ.get("IPFSSPEC_GATEWAYS", ""):
ipfs_gateway = ipfsspec_gateways.split()[0]
logger.debug("using IPFS gateway from IPFSSPEC_GATEWAYS environment variable: %s", ipfs_gateway)
warnings.warn("The IPFSSPEC_GATEWAYS environment variable is deprecated, please configure your IPFS Gateway according to IPIP-280, e.g. by using the IPFS_GATEWAY environment variable or using the ~/.ipfs/gateway file.", DeprecationWarning)
return AsyncIPFSGateway(ipfs_gateway)

# check various well-known files for possible gateway configurations
if ipfs_path := os.environ.get("IPFS_PATH", ""):
if ipfs_gateway := gateway_from_file(Path(ipfs_path) / "gateway"):
return ipfs_gateway

if home := os.environ.get("HOME", ""):
if ipfs_gateway := gateway_from_file(Path(home) / ".ipfs" / "gateway"):
return ipfs_gateway

if config_home := os.environ.get("XDG_CONFIG_HOME", ""):
if ipfs_gateway := gateway_from_file(Path(config_home) / "ipfs" / "gateway"):
return ipfs_gateway

if ipfs_gateway := gateway_from_file(Path("/etc") / "ipfs" / "gateway"):
return ipfs_gateway

system = platform.system()

if system == "Windows":
candidates = [
Path(os.environ.get("LOCALAPPDATA")) / "ipfs" / "gateway",
Path(os.environ.get("APPDATA")) / "ipfs" / "gateway",
Path(os.environ.get("PROGRAMDATA")) / "ipfs" / "gateway",
]
elif system == "Darwin":
candidates = [
Path(os.environ.get("HOME")) / "Library" / "Application Support" / "ipfs" / "gateway",
Path("/Library") / "Application Support" / "ipfs" / "gateway",
]
elif system == "Linux":
candidates = [
Path(os.environ.get("HOME")) / ".config" / "ipfs" / "gateway",
Path("/etc") / "ipfs" / "gateway",
]
else:
candidates = []

for candidate in candidates:
if ipfs_gateway := gateway_from_file(candidate):
return ipfs_gateway

# if we reach this point, no gateway is configured
raise RuntimeError("IPFS Gateway could not be found!\n"
"In order to access IPFS, you must configure an "
"IPFS Gateway using a IPIP-280 configuration method. "
"Possible options are: \n"
" * set the environment variable IPFS_GATEWAY\n"
" * write a gateway in the first line of the file ~/.ipfs/gateway\n"
"\n"
"It's always best to run your own IPFS gateway, e.g. by using "
"IPFS Desktop (https://docs.ipfs.tech/install/ipfs-desktop/) or "
"the command line version Kubo (https://docs.ipfs.tech/install/command-line/). "
"If you can't run your own gateway, you may also try using the "
"public IPFS gateway at https://ipfs.io or https://dweb.link . "
"However, this is not recommended for productive use and you may experience "
"severe performance issues.")


class AsyncIPFSFileSystem(AsyncFileSystem):
Expand Down
Loading

0 comments on commit 2abf718

Please sign in to comment.