diff --git a/iyp/__init__.py b/iyp/__init__.py index 2194c73..f9fccef 100644 --- a/iyp/__init__.py +++ b/iyp/__init__.py @@ -8,6 +8,7 @@ from typing import Optional import requests +from github import Github from neo4j import GraphDatabase BATCH_SIZE = 50000 @@ -79,6 +80,34 @@ def dict2str(d, eq=':', pfx=''): return '{' + ','.join(data) + '}' +def get_commit_datetime(repo, file_path): + """Get the datetime of the latest commit modifying a file in a GitHub repository. + + repo: The name of the repository in org/repo format, e.g., + "InternetHealthReport/internet-yellow-pages" + file_path: The path to the file relative to the repository root, e.g., + "iyp/__init__.py" + """ + return Github().get_repo(repo).get_commits(path=file_path)[0].commit.committer.date + + +def set_modification_time_from_last_modified_header(reference, response): + """Set the reference_time_modification field of the specified reference dict to the + datetime parsed from the Last-Modified header of the specified response if + possible.""" + try: + last_modified_str = response.headers['Last-Modified'] + # All HTTP dates are in UTC: + # https://www.rfc-editor.org/rfc/rfc2616#section-3.3.1 + last_modified = datetime.strptime(last_modified_str, + '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=timezone.utc) + reference['reference_time_modification'] = last_modified + except KeyError: + logging.warning('No Last-Modified header; will not set modification time.') + except ValueError as e: + logging.error(f'Failed to parse Last-Modified header "{last_modified_str}": {e}') + + class RequestStatusError(requests.HTTPError): def __init__(self, message): self.message = message @@ -109,6 +138,12 @@ def __init__(self, message): super().__init__(self.message) +class DataNotAvailableError(Exception): + def __init__(self, message): + self.message = message + super().__init__(self.message) + + class IYP(object): def __init__(self): @@ -548,9 +583,9 @@ def add_links(self, src_node, links): for i, (type, dst_node, prop) in enumerate(links): assert 'reference_org' in prop - assert 'reference_url' in prop + assert 'reference_url_data' in prop assert 'reference_name' in prop - assert 'reference_time' in prop + assert 'reference_time_fetch' in prop prop = format_properties(prop) @@ -589,10 +624,12 @@ def __init__(self): """IYP and references initialization.""" self.reference = { - 'reference_org': 'Internet Yellow Pages', - 'reference_url': 'https://iyp.iijlab.net', 'reference_name': 'iyp', - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) + 'reference_org': 'Internet Yellow Pages', + 'reference_url_data': 'https://iyp.iijlab.net', + 'reference_url_info': str(), + 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc), + 'reference_time_modification': None } # connection to IYP database @@ -617,8 +654,10 @@ def __init__(self, organization, url, name): self.reference = { 'reference_name': name, 'reference_org': organization, - 'reference_url': url, - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) + 'reference_url_data': url, + 'reference_url_info': str(), + 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc), + 'reference_time_modification': None } # connection to IYP database diff --git a/iyp/crawlers/alice_lg/__init__.py b/iyp/crawlers/alice_lg/__init__.py index 1f88a24..d7bbdfe 100644 --- a/iyp/crawlers/alice_lg/__init__.py +++ b/iyp/crawlers/alice_lg/__init__.py @@ -1,7 +1,6 @@ import ipaddress import logging import os -import sys from collections import defaultdict from concurrent.futures import as_completed from datetime import datetime @@ -84,6 +83,10 @@ def __init__(self, # URLs to the API url = url.rstrip('/') + if url.endswith('/api/v1'): + self.reference['reference_url_info'] = url[:-len('/api/v1')] + else: + logging.warning(f'Data URL does not end with "/api/v1", will not set info URL: {url}') self.urls = { 'routeservers': f'{url}/routeservers', 'neighbors': url + '/routeservers/{rs}/neighbors', @@ -97,6 +100,8 @@ def __init__(self, # List of neighbor dicts. Each dict contains information about the route server, # so we do not keep track of that separately. self.neighbors = list() + # Dict mapping routeserver_id to the cache time of that server. + self.routeserver_cached_at = dict() # Dict mapping (routeserver_id, neighbor_id) tuple to a list of route dicts. self.routes = dict() # If routes should be fetched or not. @@ -123,8 +128,6 @@ def decode_json(resp: Response, *args, **kwargs) -> None: try: resp.data = resp.json() except JSONDecodeError as e: - print(f'Failed to retrieve data for {resp.url}', file=sys.stderr) - print(f'Error while reading json data: {e}', file=sys.stderr) logging.error(f'Error while reading json data: {e}') logging.error(resp.status_code) logging.error(resp.headers) @@ -160,8 +163,6 @@ def fetch_urls(self, urls: list, additional_data=list()) -> Iterable: except Exception as e: logging.error(f'Failed to retrieve data for {future}') logging.error(e) - print(f'Failed to retrieve data for {future}', file=sys.stderr) - print(e, file=sys.stderr) return False, dict(), None def fetch_url(self, url: str) -> Tuple[bool, dict]: @@ -177,7 +178,6 @@ def __fetch_routeservers(self) -> None: logging.info('Using cached route server information.') self.routeservers = self.cache_handler.load_cached_object(routeserver_object_name) else: - print(f'Fetching route servers from {self.urls["routeservers"]}') logging.info(f'Fetching route servers from {self.urls["routeservers"]}') is_ok, routeservers_root = self.fetch_url(self.urls['routeservers']) if not is_ok: @@ -190,17 +190,49 @@ def __fetch_neighbors(self) -> None: neighbor_object_name = 'neighbors' if self.cache_handler.cached_object_exists(neighbor_object_name): logging.info('Using cached neighbor information.') - self.neighbors = self.cache_handler.load_cached_object(neighbor_object_name) + neighbor_object = self.cache_handler.load_cached_object(neighbor_object_name) + self.routeserver_cached_at = neighbor_object['routeserver_cached_at'] + self.neighbors = neighbor_object['neighbors'] else: - print(f'Fetching neighbor information from {len(self.routeservers)} route servers.') logging.info(f'Fetching neighbor information from {len(self.routeservers)} route servers.') neighbor_urls = [self.urls['neighbors'].format(rs=rs['id']) for rs in self.routeservers] failed_routeservers = list() - for is_ok, neighbor_list_root, routeserver_id in self.fetch_urls(neighbor_urls, - additional_data=self.routeservers): + for is_ok, neighbor_list_root, routeserver in self.fetch_urls(neighbor_urls, + additional_data=self.routeservers): + routeserver_id = routeserver['id'] if not is_ok: failed_routeservers.append(routeserver_id) continue + try: + cached_at_str = neighbor_list_root['api']['cache_status']['cached_at'] + except KeyError: + cached_at_str = str() + if cached_at_str: + cached_at = None + # Alice-LG uses nanosecond-granularity timestamps, which are not + # valid ISO format... + try: + pre, suf = cached_at_str.rsplit('.', maxsplit=1) + if suf.endswith('Z'): + # UTC + frac_seconds = suf[:-1] + tz_suffix = '+00:00' + elif '+' in suf: + # Hopefully a timezone identifier of form +HH:MM + frac_seconds, tz_suffix = suf.split('+') + tz_suffix = '+' + tz_suffix + else: + raise ValueError(f'Failed to get timezone from timestamp :{cached_at_str}') + if not frac_seconds.isdigit(): + raise ValueError(f'Fractional seconds are not digits: {cached_at_str}') + # Reduce to six digits (ms). + frac_seconds = frac_seconds[:6] + cached_at_str = f'{pre}.{frac_seconds}{tz_suffix}' + cached_at = datetime.fromisoformat(cached_at_str) + except ValueError as e: + logging.warning(f'Failed to get cached_at timestamp for routeserver "{routeserver_id}": {e}') + if cached_at: + self.routeserver_cached_at[routeserver_id] = cached_at # Spelling of neighbors/neighbours field is not consistent... if 'neighbors' in neighbor_list_root: neighbor_list = neighbor_list_root['neighbors'] @@ -208,10 +240,11 @@ def __fetch_neighbors(self) -> None: neighbor_list = neighbor_list_root['neighbours'] else: logging.error(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}') - print(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}', file=sys.stderr) continue self.neighbors += neighbor_list - self.cache_handler.save_cached_object(neighbor_object_name, self.neighbors) + neighbor_object = {'routeserver_cached_at': self.routeserver_cached_at, + 'neighbors': self.neighbors} + self.cache_handler.save_cached_object(neighbor_object_name, neighbor_object) if failed_routeservers: logging.warning(f'Failed to get neighbor information for {len(failed_routeservers)} routeservers: ' f'{failed_routeservers}') @@ -343,7 +376,15 @@ def run(self) -> None: if ('details:route_changes' in flattened_neighbor and isinstance(flattened_neighbor['details:route_changes'], flatdict.FlatDict)): flattened_neighbor.pop('details:route_changes') - self.reference['reference_url'] = self.urls['neighbors'].format(rs=neighbor['routeserver_id']) + routeserver_id = neighbor['routeserver_id'] + self.reference['reference_url_data'] = self.urls['neighbors'].format(rs=routeserver_id) + if routeserver_id in self.routeserver_cached_at: + self.reference['reference_time_modification'] = self.routeserver_cached_at[routeserver_id] + else: + logging.info(f'No modification time for routeserver: {routeserver_id}') + # Set to None to not reuse value of previous loop iteration. + self.reference['reference_time_modification'] = None + member_of_rels.append({'src_id': member_asn, # Translate to QID later. 'dst_id': n.data['ixp_qid'], 'props': [flattened_neighbor, self.reference.copy()]}) @@ -354,7 +395,8 @@ def run(self) -> None: if self.fetch_routes: logging.info('Iterating routes.') for (routeserver_id, neighbor_id), routes in self.routes.items(): - self.reference['reference_url'] = self.urls['routes'].format(rs=routeserver_id, neighbor=neighbor_id) + self.reference['reference_url_data'] = self.urls['routes'].format(rs=routeserver_id, + neighbor=neighbor_id) for route in routes: prefix = ipaddress.ip_network(route['network']).compressed origin_asn = route['bgp']['as_path'][-1] diff --git a/iyp/crawlers/bgpkit/__init__.py b/iyp/crawlers/bgpkit/__init__.py index 1894490..e5d8e3b 100644 --- a/iyp/crawlers/bgpkit/__init__.py +++ b/iyp/crawlers/bgpkit/__init__.py @@ -3,15 +3,16 @@ import requests -from iyp import BaseCrawler, RequestStatusError +from iyp import (BaseCrawler, RequestStatusError, + set_modification_time_from_last_modified_header) class AS2RelCrawler(BaseCrawler): def __init__(self, organization, url, name, af): """Initialization: set the address family attribute (af)""" - - self.af = af super().__init__(organization, url, name) + self.af = af + self.reference['reference_url_info'] = 'https://data.bgpkit.com/as2rel/README.txt' def run(self): """Fetch the AS relationship file from BGPKIT website and process lines one by @@ -19,7 +20,9 @@ def run(self): req = requests.get(self.url, stream=True) if req.status_code != 200: - raise RequestStatusError('Error while fetching AS relationships') + raise RequestStatusError(f'Error while fetching AS relationships: {req.status_code}') + + set_modification_time_from_last_modified_header(self.reference, req) rels = [] asns = set() diff --git a/iyp/crawlers/bgpkit/peerstats.py b/iyp/crawlers/bgpkit/peerstats.py index 6f0b54f..58cf8cb 100644 --- a/iyp/crawlers/bgpkit/peerstats.py +++ b/iyp/crawlers/bgpkit/peerstats.py @@ -17,6 +17,9 @@ class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://data.bgpkit.com/peer-stats/README.md' def run(self): """Fetch peer stats for each collector.""" @@ -49,6 +52,7 @@ def run(self): prev_day -= timedelta(days=1) logging.warning("Today's data not yet available!") + self.reference['reference_time_modification'] = self.now for collector in collectors: url = URL.format(collector=collector, year=self.now.year, month=self.now.month, day=self.now.day, @@ -65,7 +69,7 @@ def run(self): 'BGPCollector', {'name': stats['collector'], 'project': stats['project']} ) - self.reference['reference_url'] = url + self.reference['reference_url_data'] = url asns = set() diff --git a/iyp/crawlers/bgpkit/pfx2asn.py b/iyp/crawlers/bgpkit/pfx2asn.py index 89f5a93..76d0b69 100644 --- a/iyp/crawlers/bgpkit/pfx2asn.py +++ b/iyp/crawlers/bgpkit/pfx2asn.py @@ -7,7 +7,8 @@ import requests -from iyp import BaseCrawler, RequestStatusError +from iyp import (BaseCrawler, RequestStatusError, + set_modification_time_from_last_modified_header) URL = 'https://data.bgpkit.com/pfx2as/pfx2as-latest.json.bz2' ORG = 'BGPKIT' @@ -22,7 +23,9 @@ def run(self): req = requests.get(URL, stream=True) if req.status_code != 200: - raise RequestStatusError('Error while fetching pfx2as relationships') + raise RequestStatusError(f'Error while fetching pfx2as relationships: {req.status_code}') + + set_modification_time_from_last_modified_header(self.reference, req) entries = [] asns = set() @@ -35,7 +38,7 @@ def run(self): req.close() - logging.info('Pushing nodes to neo4j...\n') + logging.info('Pushing nodes to neo4j...') # get ASNs and prefixes IDs self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes) @@ -48,7 +51,7 @@ def run(self): links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]}) # Set AS name - logging.info('Pushing links to neo4j...\n') + logging.info('Pushing links to neo4j...') # Push all links to IYP self.iyp.batch_add_links('ORIGINATE', links) diff --git a/iyp/crawlers/bgptools/anycast_prefixes.py b/iyp/crawlers/bgptools/anycast_prefixes.py index cbf13b2..e096262 100644 --- a/iyp/crawlers/bgptools/anycast_prefixes.py +++ b/iyp/crawlers/bgptools/anycast_prefixes.py @@ -6,7 +6,8 @@ import requests -from iyp import BaseCrawler, ConnectionError, RequestStatusError +from iyp import (BaseCrawler, ConnectionError, RequestStatusError, + get_commit_datetime) # Organization name and URL to data ORG = 'BGP.Tools' @@ -38,6 +39,12 @@ def fetch_dataset(url: str): class Crawler(BaseCrawler): # Base Crawler provides access to IYP via self.iyp # and setup a dictionary with the org/url/today's date in self.reference + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.repo = 'bgptools/anycast-prefixes' + self.v4_file = 'anycatch-v4-prefixes.txt' + self.v6_file = 'anycatch-v6-prefixes.txt' + self.reference['reference_url_info'] = 'https://bgp.tools/kb/anycatch' def run(self): ipv4_prefixes_url = get_dataset_url(URL, 4) @@ -51,13 +58,16 @@ def run(self): ipv6_prefixes_filename = os.path.join(tmpdir, 'anycast_ipv6_prefixes.txt') # Fetch data and push to IYP. - self.reference['reference_url'] = ipv4_prefixes_url # Overriding the reference_url according to prefixes + # Overriding the reference_url_data according to prefixes + self.reference['reference_url_data'] = ipv4_prefixes_url + self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v4_file) ipv4_prefixes_response = fetch_dataset(ipv4_prefixes_url) logging.info('IPv4 prefixes fetched successfully.') self.update(ipv4_prefixes_response, ipv4_prefixes_filename) logging.info('IPv4 prefixes pushed to IYP.') - self.reference['reference_url'] = ipv6_prefixes_url + self.reference['reference_url_data'] = ipv6_prefixes_url + self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v6_file) ipv6_prefixes_response = fetch_dataset(ipv6_prefixes_url) logging.info('IPv6 prefixes fetched successfully.') self.update(ipv6_prefixes_response, ipv6_prefixes_filename) diff --git a/iyp/crawlers/bgptools/as_names.py b/iyp/crawlers/bgptools/as_names.py index e14adc3..53854a2 100644 --- a/iyp/crawlers/bgptools/as_names.py +++ b/iyp/crawlers/bgptools/as_names.py @@ -15,13 +15,13 @@ class Crawler(BaseCrawler): def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://bgp.tools/kb/api' self.headers = { 'user-agent': 'IIJ/Internet Health Report - admin@ihr.live' } - super().__init__(organization, url, name) - def run(self): """Fetch the AS name file from BGP.Tools website and push it to IYP.""" diff --git a/iyp/crawlers/bgptools/tags.py b/iyp/crawlers/bgptools/tags.py index e4e5b26..3a20fa1 100644 --- a/iyp/crawlers/bgptools/tags.py +++ b/iyp/crawlers/bgptools/tags.py @@ -37,13 +37,13 @@ class Crawler(BaseCrawler): def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://bgp.tools/kb/api' self.headers = { 'user-agent': 'IIJ/Internet Health Report - admin@ihr.live' } - super().__init__(organization, url, name) - def run(self): """Fetch the AS name file from BGP.Tools website and process lines one by one.""" @@ -53,9 +53,9 @@ def run(self): # Reference information for data pushed to the wikibase self.reference = { 'reference_org': ORG, - 'reference_url': url, + 'reference_url_data': url, 'reference_name': NAME, - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) + 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc) } req = requests.get(url, headers=self.headers) diff --git a/iyp/crawlers/caida/asrank.py b/iyp/crawlers/caida/asrank.py index c565182..3cc8d9c 100644 --- a/iyp/crawlers/caida/asrank.py +++ b/iyp/crawlers/caida/asrank.py @@ -16,6 +16,9 @@ class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://asrank.caida.org/' def run(self): """Fetch networks information from ASRank and push to IYP.""" diff --git a/iyp/crawlers/caida/ix_asns.py b/iyp/crawlers/caida/ix_asns.py index a84ff02..6fbb86e 100644 --- a/iyp/crawlers/caida/ix_asns.py +++ b/iyp/crawlers/caida/ix_asns.py @@ -3,6 +3,7 @@ import logging import os import sys +from datetime import datetime, timezone import arrow import flatdict @@ -35,9 +36,22 @@ def __init__(self, organization, url, name): else: # for loop was not 'broken', no file available raise Exception('No recent CAIDA ix-asns file available') + date = date.datetime.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) logging.info('going to use this URL: ' + url) super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt' + self.reference['reference_time_modification'] = date + + def __set_modification_time_from_metadata_line(self, line): + try: + date_str = json.loads(line.lstrip('#'))['date'] + date = datetime.strptime(date_str, '%Y.%m.%d %H:%M:%S').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except (json.JSONDecodeError, KeyError, ValueError) as e: + logging.warning(f'Failed to get modification date from metadata line: {line.strip()}') + logging.warning(e) + logging.warning('Using date from filename.') def run(self): """Fetch the latest file and process lines one by one.""" @@ -52,6 +66,7 @@ def run(self): # Find all possible values and create corresponding nodes for line in req.text.splitlines(): if line.startswith('#'): + self.__set_modification_time_from_metadata_line(line) continue ix = json.loads(line) diff --git a/iyp/crawlers/caida/ixs.py b/iyp/crawlers/caida/ixs.py index fdc3f31..2301c09 100644 --- a/iyp/crawlers/caida/ixs.py +++ b/iyp/crawlers/caida/ixs.py @@ -4,6 +4,7 @@ import logging import os import sys +from datetime import datetime, timezone import arrow import requests @@ -36,9 +37,22 @@ def __init__(self, organization, url, name): else: # for loop was not 'broken', no file available raise Exception('No recent CAIDA ix-asns file available') + date = date.datetime.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) logging.info('going to use this URL: ' + url) super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt' + self.reference['reference_time_modification'] = date + + def __set_modification_time_from_metadata_line(self, line): + try: + date_str = json.loads(line.lstrip('#'))['date'] + date = datetime.strptime(date_str, '%Y.%m.%d %H:%M:%S').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except (json.JSONDecodeError, KeyError, ValueError) as e: + logging.warning(f'Failed to get modification date from metadata line: {line.strip()}') + logging.warning(e) + logging.warning('Using date from filename.') def run(self): """Fetch the latest file and process lines one by one.""" @@ -58,6 +72,7 @@ def run(self): # Find all possible values and create corresponding nodes for line in req.text.splitlines(): if line.startswith('#'): + self.__set_modification_time_from_metadata_line(line) continue ix = json.loads(line) diff --git a/iyp/crawlers/cisco/umbrella_top1M.py b/iyp/crawlers/cisco/umbrella_top1M.py index 714681b..5b607f5 100644 --- a/iyp/crawlers/cisco/umbrella_top1M.py +++ b/iyp/crawlers/cisco/umbrella_top1M.py @@ -3,6 +3,7 @@ import logging import os import sys +from datetime import datetime, timedelta, timezone from zipfile import ZipFile import requests @@ -17,6 +18,33 @@ class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://s3-us-west-1.amazonaws.com/umbrella-static/index.html' + + def __set_modification_time(self): + """Set the modification time by looking for the last available historical file. + The current (non-historical) file is created on the next day. + + For example, if a file for 2024-02-13 is available, it means the current file + was created on 2024-02-14. + """ + hist_url = 'http://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-%Y-%m-%d.csv.zip' + date = datetime.now(tz=timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) + for attempt in range(7): + r = requests.head(date.strftime(hist_url)) + if r.ok: + break + date -= timedelta(days=1) + else: + logging.warning(f'Failed to find historical list within search interval (>{date}); ' + 'Will not set modification time.') + return + + # date now points to the last available historical file , which means the + # current file is the day after this date. + self.reference['reference_time_modification'] = date + timedelta(days=1) + logging.info(self.reference) def run(self): """Fetch Umbrella top 1M and push to IYP.""" @@ -26,7 +54,9 @@ def run(self): logging.info('Downloading latest list...') req = requests.get(URL) if req.status_code != 200: - raise RequestStatusError('Error while fetching Cisco Umbrella Top 1M csv file') + raise RequestStatusError(f'Error while fetching Cisco Umbrella Top 1M csv file: {req.status_code}') + + self.__set_modification_time() links = [] # open zip file and read top list diff --git a/iyp/crawlers/citizenlab/urldb.py b/iyp/crawlers/citizenlab/urldb.py index 109b3aa..ebd1837 100644 --- a/iyp/crawlers/citizenlab/urldb.py +++ b/iyp/crawlers/citizenlab/urldb.py @@ -23,6 +23,9 @@ def generate_url(suffix): class Crawler(BaseCrawler): # Base Crawler provides access to IYP via self.iyp # and set up a dictionary with the org/url/today's date in self.reference + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://github.com/citizenlab/test-lists' def run(self): # Fetch country codes to generate urls diff --git a/iyp/crawlers/cloudflare/dns_top_locations.py b/iyp/crawlers/cloudflare/dns_top_locations.py index 46e46a0..8f40586 100644 --- a/iyp/crawlers/cloudflare/dns_top_locations.py +++ b/iyp/crawlers/cloudflare/dns_top_locations.py @@ -9,6 +9,7 @@ import logging import os import sys +from datetime import datetime, timezone import flatdict import requests @@ -42,6 +43,12 @@ def __init__(self, organization, url, name): # Initialize IYP connection super().__init__(organization, url, name) + # Not super elegant. + if name == 'cloudflare.dns_top_ases': + self.reference['reference_url_info'] = 'https://developers.cloudflare.com/api/operations/radar-get-dns-top-ases' # noqa: E501 + elif name == 'cloudflare.dns_top_locations': + self.reference['reference_url_info'] = 'https://developers.cloudflare.com/radar/investigate/dns/#top-locations' # noqa: E501 + # Fetch domain names registered in IYP existing_dn = self.iyp.tx.run( f"""MATCH (dn:DomainName)-[r:RANK]-(:Ranking) @@ -117,7 +124,17 @@ def run(self): for i, file in enumerate(files): with open(file, 'rb') as fp: # Process line one after the other - for domain_top in json.load(fp)['result'].items(): + results = json.load(fp)['result'] + if not self.reference['reference_time_modification']: + # Get the reference time from the first file. + try: + date_str = results['meta']['dateRange'][0]['endTime'] + date = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except (KeyError, ValueError, TypeError) as e: + logging.warning(f'Failed to get modification time: {e}') + + for domain_top in results.items(): self.compute_link(domain_top) if i % 100 == 0: diff --git a/iyp/crawlers/cloudflare/ranking_bucket.py b/iyp/crawlers/cloudflare/ranking_bucket.py index fbea3f0..6d9a02e 100644 --- a/iyp/crawlers/cloudflare/ranking_bucket.py +++ b/iyp/crawlers/cloudflare/ranking_bucket.py @@ -3,6 +3,7 @@ import logging import os import sys +from datetime import datetime, timezone import requests from requests.adapters import HTTPAdapter, Retry @@ -12,7 +13,7 @@ # Organization name and URL to data ORG = 'Cloudflare' URL_DATASETS = 'https://api.cloudflare.com/client/v4/radar/datasets?limit=10&offset=0&datasetType=RANKING_BUCKET&format=json' # noqa: E501 -URL = '' +URL = 'https://api.cloudflare.com/client/v4/radar/datasets' URL_DL = 'https://api.cloudflare.com/client/v4/radar/datasets/download' NAME = 'cloudflare.ranking_bucket' @@ -27,6 +28,9 @@ class Crawler(BaseCrawler): # # Cloudflare ranks second and third level domain names (not host names). # See https://blog.cloudflare.com/radar-domain-rankings/ + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://developers.cloudflare.com/radar/investigate/domain-ranking-datasets/' # noqa: E501 def run(self): """Fetch data and push to IYP.""" @@ -61,6 +65,16 @@ def run(self): datasets = list() all_domains = set() for dataset in datasets_json['result']['datasets']: + if not self.reference['reference_time_modification']: + # Get modification time from first dataset. Should be the same for all + # datasets. + try: + date_str = dataset['meta']['targetDateEnd'] + date = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except (KeyError, ValueError, TypeError) as e: + logging.warning(f'Failed to get modification time: {e}') + # Get the dataset URL req = req_session.post(URL_DL, json={'datasetId': dataset['id']}) if req.status_code != 200: @@ -92,7 +106,6 @@ def run(self): dataset_title = f'Cloudflare {dataset["title"]}' logging.info(f'Processing dataset: {dataset_title}') print(f'Processing dataset: {dataset_title}') - self.reference['reference_url'] = dataset['url'] ranking_id = self.iyp.get_node('Ranking', { 'name': dataset_title, diff --git a/iyp/crawlers/cloudflare/top100.py b/iyp/crawlers/cloudflare/top100.py index d189da8..2a6b63d 100644 --- a/iyp/crawlers/cloudflare/top100.py +++ b/iyp/crawlers/cloudflare/top100.py @@ -3,6 +3,7 @@ import logging import os import sys +from datetime import datetime, timezone import requests @@ -24,6 +25,9 @@ class Crawler(BaseCrawler): # # Cloudflare ranks second and third level domain names (not host names). # See https://blog.cloudflare.com/radar-domain-rankings/ + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://developers.cloudflare.com/radar/investigate/domain-ranking-datasets/' # noqa: E501 def run(self): """Fetch data and push to IYP.""" @@ -37,13 +41,22 @@ def run(self): 'Content-Type': 'application/json' } - req = requests.get(self.reference['reference_url'], headers=headers) + req = requests.get(self.reference['reference_url_data'], headers=headers) if req.status_code != 200: print(f'Cannot download data {req.status_code}: {req.text}') - raise RequestStatusError('Error while fetching data file') + raise RequestStatusError(f'Error while fetching data file: {req.status_code}') + + results = req.json()['result'] + + try: + date_str = results['meta']['dateRange'][0]['endTime'] + date = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except (KeyError, ValueError, TypeError) as e: + logging.warning(f'Failed to get modification time: {e}') # Process line one after the other - for i, _ in enumerate(map(self.update, req.json()['result']['top'])): + for i, _ in enumerate(map(self.update, results['top'])): sys.stderr.write(f'\rProcessed {i} lines') sys.stderr.write('\n') diff --git a/iyp/crawlers/emileaben/as_names.py b/iyp/crawlers/emileaben/as_names.py index cebe71c..66d4b10 100644 --- a/iyp/crawlers/emileaben/as_names.py +++ b/iyp/crawlers/emileaben/as_names.py @@ -6,7 +6,7 @@ import requests -from iyp import BaseCrawler, RequestStatusError +from iyp import BaseCrawler, RequestStatusError, get_commit_datetime # Organization name and URL to data ORG = 'emileaben' @@ -17,6 +17,10 @@ class Crawler(BaseCrawler): # Base Crawler provides access to IYP via self.iyp # and setup a dictionary with the org/url/today's date in self.reference + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://github.com/emileaben/asnames' + self.reference['reference_time_modification'] = get_commit_datetime('emileaben/asnames', 'asnames.csv') def run(self): # Create a temporary directory diff --git a/iyp/crawlers/example/crawler.py b/iyp/crawlers/example/crawler.py index a329eec..63c94ed 100644 --- a/iyp/crawlers/example/crawler.py +++ b/iyp/crawlers/example/crawler.py @@ -21,7 +21,7 @@ def run(self): """Fetch data and push to IYP.""" # Fetch data - req = requests.get(self.reference['reference_url']) + req = requests.get(self.reference['reference_url_data']) if req.status_code != 200: logging.error('Cannot download data {req.status_code}: {req.text}') raise RequestStatusError('Error while fetching data file') diff --git a/iyp/crawlers/ihr/__init__.py b/iyp/crawlers/ihr/__init__.py index c8fa727..9ff8517 100644 --- a/iyp/crawlers/ihr/__init__.py +++ b/iyp/crawlers/ihr/__init__.py @@ -1,6 +1,6 @@ import csv import os -from datetime import datetime, time, timezone +from datetime import timezone import arrow import lz4.frame @@ -34,6 +34,7 @@ class HegemonyCrawler(BaseCrawler): def __init__(self, organization, url, name, af): self.af = af super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://ihr.iijlab.net/ihr/en-us/documentation#AS_dependency' def run(self): """Fetch data from file and push to IYP.""" @@ -50,12 +51,12 @@ def run(self): url = self.url.format(year=today.year, month=today.month, day=today.day) req = requests.head(url) - self.reference = { - 'reference_url': url, - 'reference_org': self.organization, - 'reference_name': self.name, - 'reference_time': datetime.combine(today.date(), time.min, timezone.utc) - } + self.reference['reference_url_data'] = url + self.reference['reference_time_modification'] = today.datetime.replace(hour=0, + minute=0, + second=0, + microsecond=0, + tzinfo=timezone.utc) os.makedirs('tmp/', exist_ok=True) os.system(f'wget {url} -P tmp/') diff --git a/iyp/crawlers/ihr/country_dependency.py b/iyp/crawlers/ihr/country_dependency.py index 3ff4125..1a8d4af 100644 --- a/iyp/crawlers/ihr/country_dependency.py +++ b/iyp/crawlers/ihr/country_dependency.py @@ -3,7 +3,7 @@ import logging import os import sys -from datetime import datetime, time, timezone +from datetime import datetime, timezone import arrow import iso3166 @@ -37,6 +37,7 @@ def __init__(self, organization, url, name): self.http_session.mount('https://', HTTPAdapter(max_retries=retries)) super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://ihr.iijlab.net/ihr/en-us/documentation#Country_s_network_dependency' # noqa: E501 def run(self): """Fetch data from API and push to IYP.""" @@ -49,14 +50,8 @@ def run(self): raise RequestStatusError('Error while fetching data for ' + cc) data = json.loads(req.text) ranking = data['results'] - - # Setup references - self.reference = { - 'reference_org': ORG, - 'reference_url': URL, - 'reference_name': NAME, - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) - } + if not ranking: + continue # Setup rankings' node country_qid = self.iyp.get_node('Country', @@ -65,15 +60,22 @@ def run(self): } ) - countryrank_statements = [] - if country_qid is not None: - countryrank_statements = [('COUNTRY', country_qid, self.reference)] - # Find the latest timebin in the data last_timebin = '1970-01-01' for r in ranking: if arrow.get(r['timebin']) > arrow.get(last_timebin): last_timebin = r['timebin'] + self.reference['reference_url_data'] = self.url + f'&timebin={last_timebin}' + self.reference['reference_time_modification'] = None + try: + date = datetime.strptime(last_timebin, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except ValueError as e: + logging.warning(f'Failed to get modification time: {e}') + + countryrank_statements = [] + if country_qid is not None: + countryrank_statements = [('COUNTRY', country_qid, self.reference.copy())] # Make ranking and push data links = [] @@ -106,7 +108,7 @@ def run(self): links.append({ 'src_id': self.asn_id[asn['asn']], 'dst_id': self.countryrank_qid, - 'props': [self.reference, asn] + 'props': [self.reference.copy(), asn] }) # Push links to IYP diff --git a/iyp/crawlers/ihr/rov.py b/iyp/crawlers/ihr/rov.py index 76c2857..4d5daf8 100644 --- a/iyp/crawlers/ihr/rov.py +++ b/iyp/crawlers/ihr/rov.py @@ -3,7 +3,7 @@ import logging import os import sys -from datetime import datetime, time, timezone +from datetime import timezone import arrow import lz4.frame @@ -45,6 +45,9 @@ def close(self): class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://ihr-archive.iijlab.net/ihr/rov/README.txt' def run(self): """Fetch data from file and push to IYP.""" @@ -60,12 +63,12 @@ def run(self): today = today.shift(days=-1) url = URL.format(year=today.year, month=today.month, day=today.day) - self.reference = { - 'reference_org': ORG, - 'reference_url': url, - 'reference_name': NAME, - 'reference_time': datetime.combine(today.date(), time.min, timezone.utc) - } + self.reference['reference_url_data'] = url + self.reference['reference_time_modification'] = today.datetime.replace(hour=0, + minute=0, + second=0, + microsecond=0, + tzinfo=timezone.utc) os.makedirs('tmp/', exist_ok=True) os.system(f'wget {url} -P tmp/') @@ -73,7 +76,7 @@ def run(self): local_filename = 'tmp/' + url.rpartition('/')[2] self.csv = lz4Csv(local_filename) - logging.warning('Getting node IDs from neo4j...\n') + logging.info('Getting node IDs from neo4j...') asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn') prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix') tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label') @@ -84,7 +87,7 @@ def run(self): dep_links = [] country_links = [] - logging.warning('Computing links...\n') + logging.info('Computing links...') for line in csv.reader(self.csv, quotechar='"', delimiter=',', skipinitialspace=True): # header # id, timebin, prefix, hege, af, visibility, rpki_status, irr_status, @@ -158,7 +161,7 @@ def run(self): self.csv.close() # Push links to IYP - logging.warning('Pushing links to neo4j...\n') + logging.info('Pushing links to neo4j...') self.iyp.batch_add_links('ORIGINATE', orig_links) self.iyp.batch_add_links('CATEGORIZED', tag_links) self.iyp.batch_add_links('DEPENDS_ON', dep_links) diff --git a/iyp/crawlers/inetintel/as_org.py b/iyp/crawlers/inetintel/as_org.py index 6faa657..71676d9 100644 --- a/iyp/crawlers/inetintel/as_org.py +++ b/iyp/crawlers/inetintel/as_org.py @@ -4,6 +4,7 @@ import sys import tempfile from collections import defaultdict +from datetime import datetime, timezone import pandas as pd import requests @@ -40,6 +41,21 @@ def get_latest_dataset_url(github_repo: str, data_dir: str, file_extension: str) class Crawler(BaseCrawler): # Base Crawler provides access to IYP via self.iyp # and set up a dictionary with the org/url/today's date in self.reference + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://github.com/InetIntel/Dataset-AS-to-Organization-Mapping' + self.__get_modification_time_from_url() + + def __get_modification_time_from_url(self): + expected_suffix = '.json' + try: + if not URL.endswith(expected_suffix): + raise ValueError(f'Expected "{expected_suffix}" file for data URL') + _, date_str = URL[:-len(expected_suffix)].rsplit('.', maxsplit=1) + date = datetime.strptime(date_str, '%Y-%m').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except ValueError as e: + logging.warning(f'Failed to set modification time: {e}') def run(self): """Fetch data and push to IYP.""" diff --git a/iyp/crawlers/manrs/members.py b/iyp/crawlers/manrs/members.py index b652008..22384bf 100644 --- a/iyp/crawlers/manrs/members.py +++ b/iyp/crawlers/manrs/members.py @@ -61,8 +61,8 @@ def __init__(self, organization, url, name): self.reference = { 'reference_name': NAME, 'reference_org': ORG, - 'reference_url': URL, - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) + 'reference_url_data': URL, + 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc) } def run(self): diff --git a/iyp/crawlers/nro/delegated_stats.py b/iyp/crawlers/nro/delegated_stats.py index ded0bbe..ea96368 100644 --- a/iyp/crawlers/nro/delegated_stats.py +++ b/iyp/crawlers/nro/delegated_stats.py @@ -4,6 +4,7 @@ import os import sys from collections import defaultdict +from datetime import datetime, timezone import requests @@ -18,6 +19,9 @@ class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://www.nro.net/wp-content/uploads/nro-extended-stats-readme5.txt' def run(self): """Fetch the delegated stat file from RIPE website and process lines one by @@ -43,8 +47,15 @@ def run(self): if line.strip().startswith('#'): continue - # skip version and summary lines fields_value = line.split('|') + # get modification time from version line + if len(fields_value) == 7 and fields_value[0].isdigit(): + try: + date = datetime.strptime(fields_value[5], '%Y%m%d').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except ValueError as e: + logging.warning(f'Failed to set modification time: {e}') + # skip summary lines if len(fields_value) < 8: continue @@ -65,7 +76,7 @@ def run(self): prefixes.add(prefix) # Create all nodes - logging.warning('Pushing nodes to neo4j...\n') + logging.warning('Pushing nodes to neo4j...') opaqueid_id = self.iyp.batch_get_nodes_by_single_prop('OpaqueID', 'id', opaqueids) prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes) country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) @@ -120,7 +131,7 @@ def run(self): status_links[rec['status'].upper()].append( {'src_id': prefix_qid, 'dst_id': opaqueid_qid, 'props': [reference]}) - logging.warning('Pusing links to neo4j...\n') + logging.warning('Pushing links to neo4j...') # Push all links to IYP self.iyp.batch_add_links('COUNTRY', country_links) for label, links in status_links.items(): diff --git a/iyp/crawlers/openintel/__init__.py b/iyp/crawlers/openintel/__init__.py index 01c2ff4..ddb5415 100644 --- a/iyp/crawlers/openintel/__init__.py +++ b/iyp/crawlers/openintel/__init__.py @@ -1,7 +1,6 @@ # Simple Python script to fetch domain name to IP address mappings from OpenINTEL data # OpenIntelCrawler is based on code from Mattijs Jonker -import argparse import json import logging import os @@ -15,7 +14,7 @@ import pandas as pd import requests -from iyp import BaseCrawler, RequestStatusError +from iyp import BaseCrawler, DataNotAvailableError TMP_DIR = './tmp' os.makedirs(TMP_DIR, exist_ok=True) @@ -29,13 +28,11 @@ OPENINTEL_ACCESS_KEY = config['openintel']['access_key'] OPENINTEL_SECRET_KEY = config['openintel']['secret_key'] - -def valid_date(s): - try: - return datetime.strptime(s, '%Y-%m-%d') - except ValueError: - msg = 'not a valid ISO 8601 date: {0!r}'.format(s) - raise argparse.ArgumentTypeError(msg) +# We use the AWS interface to get data, but can not provide AWS URLs as data source, so +# at least for the Tranco and Umbrella datasets we can point to the publicly available +# archives. +TRANCO_REFERENCE_URL_DATA_FMT = 'https://data.openintel.nl/data/tranco1m/%Y/openintel-tranco1m-%Y%m%d.tar' +UMBRELLA_REFERENCE_URL_DATA_FMT = 'https://data.openintel.nl/data/umbrella1m/%Y/openintel-umbrella1m-%Y%m%d.tar' class OpenIntelCrawler(BaseCrawler): @@ -45,6 +42,11 @@ def __init__(self, organization, url, name, dataset): self.dataset = dataset super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://www.openintel.nl/' + if dataset == 'tranco': + self.reference['reference_url_info'] = 'https://data.openintel.nl/data/tranco1m' + elif dataset == 'umbrella': + self.reference['reference_url_info'] = 'https://data.openintel.nl/data/umbrella1m' def get_parquet(self): """Fetch the forward DNS data, populate a data frame, and process lines one by @@ -72,48 +74,40 @@ def get_parquet(self): # OpenINTEL measurement data objects base prefix FDNS_WAREHOUSE_S3 = 'category=fdns/type=warehouse' - # check on the website if yesterday's data is available - yesterday = arrow.utcnow().shift(days=-1) - # FIXME Check at the proper place. Remove flake8 exception afterwards. - # flake8: noqa - # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) - # try: - # req = requests.head(url) - - # attempt = 3 - # while req.status_code != 200 and attempt > 0: - # print(req.status_code) - # attempt -= 1 - # yesterday = yesterday.shift(days=-1) - # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) - # req = requests.head(url) - - # except requests.exceptions.ConnectionError: - # logging.warning("Cannot reach OpenINTEL website, try yesterday's data") - # yesterday = arrow.utcnow().shift(days=-1) - # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) - - logging.warning(f'Fetching data for {yesterday}') + # Get latest available data. + date = arrow.utcnow() + for lookback_days in range(6): + objects = list(WAREHOUSE_BUCKET.objects.filter( + # Build a partition path for the given source and date + Prefix=os.path.join( + FDNS_WAREHOUSE_S3, + 'source={}'.format(self.dataset), + 'year={}'.format(date.year), + 'month={:02d}'.format(date.month), + 'day={:02d}'.format(date.day) + )).all()) + if len(objects) > 0: + break + date = date.shift(days=-1) + else: + logging.error('Failed to find data within the specified lookback interval.') + raise DataNotAvailableError('Failed to find data within the specified lookback interval.') + self.reference['reference_time_modification'] = \ + date.datetime.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) + if self.dataset == 'tranco': + self.reference['reference_url_data'] = date.strftime(TRANCO_REFERENCE_URL_DATA_FMT) + elif self.dataset == 'umbrella': + self.reference['reference_url_data'] = date.strftime(UMBRELLA_REFERENCE_URL_DATA_FMT) - # Start one day before ? # TODO remove this line? - yesterday = yesterday.shift(days=-1) + logging.info(f'Fetching data for {date.strftime("%Y-%m-%d")}') # Iterate objects in bucket with given (source, date)-partition prefix - for i_obj in WAREHOUSE_BUCKET.objects.filter( - # Build a partition path for the given source and date - Prefix=os.path.join( - FDNS_WAREHOUSE_S3, - 'source={}'.format(self.dataset), - 'year={}'.format(yesterday.year), - 'month={:02d}'.format(yesterday.month), - 'day={:02d}'.format(yesterday.day) - ) - ): + for i_obj in objects: # Open a temporary file to download the Parquet object into with tempfile.NamedTemporaryFile(mode='w+b', dir=TMP_DIR, - prefix='{}.'.format(yesterday.date().isoformat()), + prefix='{}.'.format(date.date().isoformat()), suffix='.parquet', delete=True) as tempFile: @@ -249,6 +243,7 @@ class DnsDependencyCrawler(BaseCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://dnsgraph.dacs.utwente.nl' @staticmethod def remove_root(name): @@ -272,14 +267,21 @@ def run(self): current_date = datetime.now(tz=timezone.utc) - timedelta(weeks=lookback) year = current_date.strftime('%Y') week = current_date.strftime('%U') - base_url = f'{self.reference["reference_url"]}/year={year}/week={week}' + base_url = f'{self.reference["reference_url_data"]}/year={year}/week={week}' probe_url = f'{base_url}/connections.json.gz' if requests.head(probe_url).ok: logging.info(f'Using year={year}/week={week} ({current_date.strftime("%Y-%m-%d")})') break else: logging.error('Failed to find data within the specified lookback interval.') - raise RequestStatusError('Failed to find data within the specified lookback interval.') + raise DataNotAvailableError('Failed to find data within the specified lookback interval.') + + # Shift to Monday and set to midnight. + mod_date = (current_date - timedelta(days=current_date.weekday())).replace(hour=0, + minute=0, + second=0, + microsecond=0) + self.reference['reference_time_modification'] = mod_date logging.info('Reading connections') connections = pd.read_json(f'{base_url}/connections.json.gz', lines=True) @@ -289,7 +291,7 @@ def run(self): # Currently there are only DOMAIN and HOSTNAME entries in from_nodeType, but # maybe that changes in the future. connections.loc[connections['from_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'from_nodeKey'] = \ - connections.loc[connections['from_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'from_nodeKey'].map(self.remove_root) + connections.loc[connections['from_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'from_nodeKey'].map(self.remove_root) # noqa: E501 connections.loc[connections['to_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'to_nodeKey'] = \ connections.loc[connections['to_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'to_nodeKey'].map(self.remove_root) # Normalize IPv6 addresses. diff --git a/iyp/crawlers/pch/__init__.py b/iyp/crawlers/pch/__init__.py index 7e61dfb..7483ef1 100644 --- a/iyp/crawlers/pch/__init__.py +++ b/iyp/crawlers/pch/__init__.py @@ -15,7 +15,8 @@ from requests_futures.sessions import FuturesSession from urllib3.util.retry import Retry -from iyp import AddressValueError, BaseCrawler, CacheHandler +from iyp import (AddressValueError, BaseCrawler, CacheHandler, + DataNotAvailableError) from iyp.crawlers.pch.show_bgp_parser import ShowBGPParser PARALLEL_DOWNLOADS = 8 @@ -61,6 +62,7 @@ def __init__(self, organization: str, url: str, name: str, af: int): self.collector_site_url = str() self.__initialize_session() super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://www.pch.net/resources/Routing_Data/' def __initialize_session(self) -> None: self.session = FuturesSession(max_workers=PARALLEL_DOWNLOADS) @@ -120,16 +122,14 @@ def fetch_collector_site(self) -> str: lookback = today - self.MAX_LOOKBACK if lookback.month == curr_month: logging.error('Failed to find current data.') - print('Failed to find current data.', file=sys.stderr) - return str() + raise DataNotAvailableError('Failed to find current data.') self.collector_site_url = self.url + today.strftime('%Y/%m/') resp: Response = self.session.get(self.collector_site_url).result() if resp.ok: return resp.text logging.warning(f'Failed to retrieve collector site from: {self.collector_site_url}') logging.error('Failed to find current data.') - print('Failed to find current data.', file=sys.stderr) - return str() + raise DataNotAvailableError('Failed to find current data.') @staticmethod def filter_route_collector_links(links: ResultSet) -> list: @@ -175,10 +175,9 @@ def probe_latest_set(self, collector_name: str) -> datetime: return curr_date curr_date -= timedelta(days=1) logging.error('Failed to find current data.') - print('Failed to find current data.', file=sys.stderr) - return None + raise DataNotAvailableError('Failed to find current data.') - def fetch(self) -> bool: + def fetch(self) -> None: """Fetch and cache all data. First get a list of collector names and their associated files. Then fetch the @@ -202,13 +201,11 @@ def fetch(self) -> bool: self.collector_site_url, collector_names = self.cache_handler.load_cached_object(collector_names_name) else: collector_site = self.fetch_collector_site() - if not collector_site: - return True soup = BeautifulSoup(collector_site, features='html.parser') links = soup.find_all('a') collector_names = self.filter_route_collector_links(links) self.cache_handler.save_cached_object(collector_names_name, (self.collector_site_url, collector_names)) - self.reference['reference_url'] = self.collector_site_url + self.reference['reference_url_data'] = self.collector_site_url # Get the date of the latest available dataset based on the # first collector in the list. @@ -218,8 +215,10 @@ def fetch(self) -> bool: # files (one per collector) if the data for the current date # is not yet available for all collectors. latest_available_date = self.probe_latest_set(collector_names[0]) - if latest_available_date is None: - return True + self.reference['reference_time_modification'] = latest_available_date.replace(hour=0, + minute=0, + second=0, + microsecond=0) curr_date = datetime.now(tz=timezone.utc) max_lookback = curr_date - self.MAX_LOOKBACK @@ -270,14 +269,11 @@ def fetch(self) -> bool: print(f'Failed to find current data for {len(failed_fetches)} collectors: {failed_fetches}', file=sys.stderr) - return False - def run(self) -> None: """Fetch data from PCH, parse the files, and push nodes and relationships to the database.""" # Pre-fetch all data. - if self.fetch(): - return + self.fetch() # Parse files in parallel. logging.info(f'Parsing {len(self.collector_files)} collector files.') diff --git a/iyp/crawlers/peeringdb/fac.py b/iyp/crawlers/peeringdb/fac.py index 834e96f..d86dbd6 100644 --- a/iyp/crawlers/peeringdb/fac.py +++ b/iyp/crawlers/peeringdb/fac.py @@ -10,7 +10,8 @@ import requests_cache from iyp import BaseCrawler -from iyp.crawlers.peeringdb.ix import handle_social_media +from iyp.crawlers.peeringdb.ix import (handle_social_media, + set_reference_time_from_metadata) # NOTES This script should be executed after peeringdb.org @@ -44,6 +45,7 @@ def __init__(self, organization, url, name): self.requests = requests_cache.CachedSession(os.path.join(CACHE_DIR, ORG), expire_after=CACHE_DURATION) super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20fac' def run(self): """Fetch facilities information from PeeringDB and push to IYP.""" @@ -54,7 +56,9 @@ def run(self): logging.error('Error while fetching peeringDB data') raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}') - facilities = json.loads(req.text)['data'] + result = req.json() + set_reference_time_from_metadata(self.reference, result) + facilities = result['data'] # compute nodes facs = set() diff --git a/iyp/crawlers/peeringdb/ix.py b/iyp/crawlers/peeringdb/ix.py index e1e0f78..7263166 100644 --- a/iyp/crawlers/peeringdb/ix.py +++ b/iyp/crawlers/peeringdb/ix.py @@ -58,6 +58,15 @@ def handle_social_media(d: dict, website_set: set = None): d[f'social_media_{service}'] = identifier +def set_reference_time_from_metadata(reference_dict, data): + try: + generated_timestamp = data['meta']['generated'] + date = datetime.fromtimestamp(generated_timestamp, tz=timezone.utc) + reference_dict['reference_time_modification'] = date + except (KeyError, ValueError) as e: + logging.warning(f'Failed to set modification time: {e}') + + class Crawler(BaseCrawler): def __init__(self, organization, url, name): """Initialisation for pushing peeringDB IXPs to IYP.""" @@ -67,22 +76,28 @@ def __init__(self, organization, url, name): self.reference_ix = { 'reference_org': ORG, 'reference_name': NAME, - 'reference_url': URL_PDB_IXS, - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) + 'reference_url_data': URL_PDB_IXS, + 'reference_url_info': 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20ix', + 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc), + 'reference_time_modification': None } self.reference_lan = { 'reference_org': ORG, 'reference_name': NAME, - 'reference_url': URL_PDB_LANS, - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) + 'reference_url_data': URL_PDB_LANS, + 'reference_url_info': 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20ixlan', + 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc), + 'reference_time_modification': None } self.reference_netfac = { 'reference_org': ORG, 'reference_name': NAME, - 'reference_url': URL_PDB_NETFAC, - 'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc) + 'reference_url_data': URL_PDB_NETFAC, + 'reference_url_info': 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20netfac', + 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc), + 'reference_time_modification': None } # keep track of added networks @@ -110,7 +125,9 @@ def run(self): logging.error(f'Error while fetching IXs data\n({req.status_code}) {req.text}') raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}') - self.ixs = json.loads(req.text)['data'] + result = req.json() + set_reference_time_from_metadata(self.reference_ix, result) + self.ixs = result['data'] # Register IXPs logging.warning('Pushing IXP info...') @@ -122,7 +139,9 @@ def run(self): logging.error(f'Error while fetching IXLANs data\n({req.status_code}) {req.text}') raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}') - ixlans = json.loads(req.text)['data'] + result = req.json() + set_reference_time_from_metadata(self.reference_lan, result) + ixlans = result['data'] # index ixlans by their id self.ixlans = {} @@ -139,7 +158,9 @@ def run(self): logging.error(f'Error while fetching IXLANs data\n({req.status_code}) {req.text}') raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}') - self.netfacs = json.loads(req.text)['data'] + result = req.json() + set_reference_time_from_metadata(self.reference_netfac, result) + self.netfacs = result['data'] self.register_net_fac() def register_net_fac(self): diff --git a/iyp/crawlers/peeringdb/org.py b/iyp/crawlers/peeringdb/org.py index c1d3301..f1f1beb 100644 --- a/iyp/crawlers/peeringdb/org.py +++ b/iyp/crawlers/peeringdb/org.py @@ -10,7 +10,8 @@ import requests_cache from iyp import BaseCrawler -from iyp.crawlers.peeringdb.ix import handle_social_media +from iyp.crawlers.peeringdb.ix import (handle_social_media, + set_reference_time_from_metadata) ORG = 'PeeringDB' @@ -41,6 +42,7 @@ def __init__(self, organization, url, name): self.requests = requests_cache.CachedSession(os.path.join(CACHE_DIR, ORG), expire_after=CACHE_DURATION) super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20org' def run(self): """Fetch organizations information from PeeringDB and push to IYP.""" @@ -50,7 +52,9 @@ def run(self): logging.error('Error while fetching peeringDB data') raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}') - organizations = json.loads(req.text)['data'] + result = req.json() + set_reference_time_from_metadata(self.reference, result) + organizations = result['data'] # compute nodes orgs = set() diff --git a/iyp/crawlers/ripe/as_names.py b/iyp/crawlers/ripe/as_names.py index bd4ea21..609cdca 100644 --- a/iyp/crawlers/ripe/as_names.py +++ b/iyp/crawlers/ripe/as_names.py @@ -5,7 +5,8 @@ import requests -from iyp import BaseCrawler, RequestStatusError +from iyp import (BaseCrawler, RequestStatusError, + set_modification_time_from_last_modified_header) URL = 'https://ftp.ripe.net/ripe/asnames/asn.txt' ORG = 'RIPE NCC' @@ -19,7 +20,9 @@ def run(self): req = requests.get(URL) if req.status_code != 200: - raise RequestStatusError('Error while fetching AS names') + raise RequestStatusError(f'Error while fetching AS names: {req.status_code}') + + set_modification_time_from_last_modified_header(self.reference, req) lines = [] asns = set() diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py index 79770f0..25c7b77 100644 --- a/iyp/crawlers/ripe/atlas_measurements.py +++ b/iyp/crawlers/ripe/atlas_measurements.py @@ -24,6 +24,9 @@ class Crawler(BaseCrawler): def __init__(self, organization, url, name): self.__initialize_session() super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://atlas.ripe.net/docs/apis/rest-api-manual/measurements/' + # Atlas API is real-time, i.e., we can use the same timestamp. + self.reference['reference_time_modification'] = self.reference['reference_time_fetch'] def __initialize_session(self) -> None: self.session = Session() @@ -217,7 +220,7 @@ def run(self): for probe_measurement in valid_probe_measurements: probe_measurement_qid = probe_measurement_ids[probe_measurement['id']] probe_measurement_reference = self.reference.copy() - probe_measurement_reference['reference_url'] = probe_measurement_reference['reference_url'] + \ + probe_measurement_reference['reference_url_data'] = probe_measurement_reference['reference_url_data'] + \ f'/{probe_measurement["id"]}' probe_measurement_asn = probe_measurement['target']['asn'] diff --git a/iyp/crawlers/ripe/atlas_probes.py b/iyp/crawlers/ripe/atlas_probes.py index 03c1ddb..2a94c61 100644 --- a/iyp/crawlers/ripe/atlas_probes.py +++ b/iyp/crawlers/ripe/atlas_probes.py @@ -24,6 +24,9 @@ class Crawler(BaseCrawler): def __init__(self, organization, url, name): self.__initialize_session() super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://atlas.ripe.net/docs/apis/rest-api-manual/probes/' + # Atlas API is real-time, i.e., we can use the same timestamp. + self.reference['reference_time_modification'] = self.reference['reference_time_fetch'] def __initialize_session(self) -> None: self.session = Session() diff --git a/iyp/crawlers/ripe/roa.py b/iyp/crawlers/ripe/roa.py index b4a62af..aeaae42 100644 --- a/iyp/crawlers/ripe/roa.py +++ b/iyp/crawlers/ripe/roa.py @@ -4,7 +4,7 @@ import os import sys from collections import defaultdict -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from io import BytesIO import requests @@ -23,7 +23,7 @@ class Crawler(BaseCrawler): def __init__(self, organization, url, name): """Initialize IYP and statements for pushed data.""" - now = datetime.utcnow() + now = datetime.now(tz=timezone.utc) self.date_path = f'{now.year}/{now.month:02d}/{now.day:02d}' # Check if today's data is available @@ -36,6 +36,8 @@ def __init__(self, organization, url, name): logging.warning("Using yesterday's data: " + self.date_path) super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://rpki-study.github.io/rpki-archive/' + self.reference['reference_time_modification'] = now.replace(hour=0, minute=0, second=0) def run(self): """Fetch data from RIPE and push to IYP.""" diff --git a/iyp/crawlers/stanford/asdb.py b/iyp/crawlers/stanford/asdb.py index 9aaecca..b32186c 100644 --- a/iyp/crawlers/stanford/asdb.py +++ b/iyp/crawlers/stanford/asdb.py @@ -4,7 +4,7 @@ import os import re import sys -from datetime import datetime +from datetime import datetime, timezone import bs4 import requests @@ -32,6 +32,19 @@ def get_latest_asdb_dataset_url(asdb_stanford_data_url: str, file_name_format: s class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://asdb.stanford.edu/' + self.__set_modification_time_from_url() + + def __set_modification_time_from_url(self): + fmt = 'https://asdb.stanford.edu/data/%Y-%m_categorized_ases.csv' + try: + date = datetime.strptime(URL, fmt).replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except ValueError as e: + logging.warning(f'Failed to set modification time: {e}') + def run(self): """Fetch the ASdb file and push it to IYP.""" diff --git a/iyp/crawlers/tranco/top1M.py b/iyp/crawlers/tranco/top1M.py index e4b1923..6b4eab8 100644 --- a/iyp/crawlers/tranco/top1M.py +++ b/iyp/crawlers/tranco/top1M.py @@ -7,7 +7,8 @@ import requests -from iyp import BaseCrawler, RequestStatusError +from iyp import (BaseCrawler, RequestStatusError, + set_modification_time_from_last_modified_header) # URL to Tranco top 1M URL = 'https://tranco-list.eu/top-1m.csv.zip' @@ -16,6 +17,19 @@ class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://tranco-list.eu/methodology' + + def __set_data_url(self): + """Set the data URL using the permanent ID of the current list, which stays + valid once the permalink is updated.""" + try: + res = requests.get('https://tranco-list.eu/top-1m-id') + res.raise_for_status() + self.reference['reference_url_data'] = f'https://tranco-list.eu/download_daily/{res.text}' + except requests.HTTPError as e: + logging.warning(f'Failed to update data URL: {e}') def run(self): """Fetch Tranco top 1M and push to IYP.""" @@ -27,6 +41,9 @@ def run(self): if req.status_code != 200: raise RequestStatusError('Error while fetching Tranco csv file') + set_modification_time_from_last_modified_header(self.reference, req) + self.__set_data_url() + links = [] domains = set() # open zip file and read top list diff --git a/iyp/crawlers/virginiatech/rovista.py b/iyp/crawlers/virginiatech/rovista.py index a11b4a9..b72e734 100644 --- a/iyp/crawlers/virginiatech/rovista.py +++ b/iyp/crawlers/virginiatech/rovista.py @@ -2,6 +2,7 @@ import logging import os import sys +from datetime import datetime, timezone import requests @@ -13,10 +14,21 @@ class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://rovista.netsecurelab.org/' + + def __set_modification_time(self, entry): + try: + date_str = entry['lastUpdatedDate'] + date = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = date + except (KeyError, ValueError) as e: + logging.warning(f'Failed to set modification time: {e}') def run(self): """Get RoVista data from their API.""" - batch_size = 1000 # Adjust batch size as needed + batch_size = 1000 offset = 0 entries = [] asns = set() @@ -25,26 +37,29 @@ def run(self): # Make a request with the current offset response = requests.get(URL, params={'offset': offset, 'count': batch_size}) if response.status_code != 200: - raise RequestStatusError('Error while fetching RoVista data') + raise RequestStatusError(f'Error while fetching RoVista data: {response.status_code}') data = response.json().get('data', []) for entry in data: + if not self.reference['reference_time_modification']: + self.__set_modification_time(entry) asns.add(entry['asn']) if entry['ratio'] > 0.5: - entries.append({'asn': entry['asn'], 'ratio': entry['ratio'], 'label': 'Validating RPKI ROV'}) + entries.append({'asn': entry['asn'], 'ratio': entry['ratio']}) else: - entries.append({'asn': entry['asn'], 'ratio': entry['ratio'], 'label': 'Not Validating RPKI ROV'}) + entries.append({'asn': entry['asn'], 'ratio': entry['ratio']}) # Move to the next page offset += 1 # Break the loop if there's no more data if len(data) < batch_size: break + logging.info('Pushing nodes to neo4j...') # get ASNs and prefixes IDs self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) - tag_id_not_valid = self.iyp.get_node('Tag', {'label': 'Not Validating RPKI ROV'}, create=True) - tag_id_valid = self.iyp.get_node('Tag', {'label': 'Validating RPKI ROV'}, create=True) + tag_id_not_valid = self.iyp.get_node('Tag', {'label': 'Not Validating RPKI ROV'}) + tag_id_valid = self.iyp.get_node('Tag', {'label': 'Validating RPKI ROV'}) # Compute links links = [] for entry in entries: diff --git a/public/www/guides/cypher_basics.html b/public/www/guides/cypher_basics.html index 4416c1e..1dfff4b 100644 --- a/public/www/guides/cypher_basics.html +++ b/public/www/guides/cypher_basics.html @@ -25,26 +25,26 @@

References

The MATCH clause describes a pattern in the graph.

-

The pattern is given in a ASCII art representation - where nodes are depicted by a pair of parentheses, (), +

The pattern is given in a ASCII art representation + where nodes are depicted by a pair of parentheses, (), and relationship are depicted with two dashes -- sometimes including more information in square brackets -[]-.

Thus (iij:AS)-[:MEMBER_OF]-(ix:IXP) - describes a path that starts from a node we'll call iij + describes a path that starts from a node we'll call iij that connects to another node we'll call ix. - iij and ix are arbitrary identifiers that allow us to refer to a + iij and ix are arbitrary identifiers that allow us to refer to a certain node later on.

-

In IYP all nodes and relationships have a type (called +

In IYP all nodes and relationships have a type (called labels for nodes) that convey what the nodes and relationships represent. - The labels/types are given after the colon, for example (:AS) + The labels/types are given after the colon, for example (:AS) is a node representing an AS, and, -[:MEMBER_OF]- is relationship - of type member of. + of type member of.

The WHERE clause describe conditions for nodes - or relationship that match the pattern. Here we specify that the + or relationship that match the pattern. Here we specify that the node called iij should have a property asn that equals to 2497.

The RETURN clause describes the nodes and links we want to display.

@@ -90,8 +90,8 @@

Node and relationship properties

All relationships in IYP have at least these properties:

@@ -100,7 +100,7 @@

Node and relationship properties

Filter on properties

The previous example had a condition on the asn property of the AS node, you can also filter on the relationship properties. For example, this query look for IIJ memberships other than those given by PeeringDB.

-
 MATCH (iij:AS)-[mem:MEMBER_OF]-(something) 
+          
 MATCH (iij:AS)-[mem:MEMBER_OF]-(something)
 	  WHERE iij.asn = 2497 AND mem.reference_org<>'PeeringDB' RETURN iij, something