Skip to content

Commit

Permalink
Feature : AlephDNS (#47)
Browse files Browse the repository at this point in the history
* Feature : AlephDNS
add instances support
add ipfs support
add program support

---------

Co-authored-by: aliel <[email protected]>
  • Loading branch information
1yam and aliel authored Aug 25, 2023
1 parent 6705f95 commit 86df08f
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ testing =
black
isort
flake8
aiodns
mqtt =
aiomqtt<=0.1.3
certifi
Expand Down
7 changes: 7 additions & 0 deletions src/aleph/sdk/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class Settings(BaseSettings):

CODE_USES_SQUASHFS: bool = which("mksquashfs") is not None # True if command exists

# Dns resolver
DNS_IPFS_DOMAIN = "ipfs.public.aleph.sh"
DNS_PROGRAM_DOMAIN = "program.public.aleph.sh"
DNS_INSTANCE_DOMAIN = "instance.public.aleph.sh"
DNS_ROOT_DOMAIN = "static.public.aleph.sh"
DNS_RESOLVERS = ["1.1.1.1", "1.0.0.1"]

class Config:
env_prefix = "ALEPH_"
case_sensitive = False
Expand Down
135 changes: 135 additions & 0 deletions src/aleph/sdk/domain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import aiodns
import re
from .conf import settings
from typing import Optional
from aleph.sdk.exceptions import DomainConfigurationError


class AlephDNS:
def __init__(self):
self.resolver = aiodns.DNSResolver(servers=settings.DNS_RESOLVERS)
self.fqdn_matcher = re.compile(r"https?://?")

async def query(self, name: str, query_type: str):
try:
return await self.resolver.query(name, query_type)
except Exception as e:
print(e)
return None

def url_to_domain(self, url):
return self.fqdn_matcher.sub("", url).strip().strip("/")

async def get_ipv6_address(self, url: str):
domain = self.url_to_domain(url)
ipv6 = []
query = await self.query(domain, "AAAA")
if query:
for entry in query:
ipv6.append(entry.host)
return ipv6

async def get_dnslink(self, url: str):
domain = self.url_to_domain(url)
query = await self.query(f"_dnslink.{domain}", "TXT")
if query is not None and len(query) > 0:
return query[0].text

async def check_domain_configured(self, domain, target, owner):
try:
print("Check...", target)
return await self.check_domain(domain, target, owner)
except Exception as error:
raise DomainConfigurationError(error)

async def check_domain(
self, url: str, target: str, owner: Optional[str] = None
):
status = {"cname": False, "owner_proof": False}

target = target.lower()
domain = self.url_to_domain(url)

dns_rules = self.get_required_dns_rules(url, target, owner)

for dns_rule in dns_rules:
status[dns_rule["rule_name"]] = False

record_name = dns_rule["dns"]["name"]
record_type = dns_rule["dns"]["type"]
record_value = dns_rule["dns"]["value"]

res = await self.query(record_name, record_type.upper())

if record_type == "txt":
found = False

for _res in res:
if hasattr(_res, "text") and _res.text == record_value:
found = True

if found == False:
raise DomainConfigurationError(
(dns_rule["info"], dns_rule["on_error"], status)
)

elif res is None or not hasattr(res, record_type) or getattr(res, record_type) != record_value:
raise DomainConfigurationError(
(dns_rule["info"], dns_rule["on_error"], status)
)

status[dns_rule["rule_name"]] = True

return status

def get_required_dns_rules(self, url, target, owner: Optional[str] = None):
domain = self.url_to_domain(url)
target = target.lower()
dns_rules = []

if target == "ipfs":
cname_value = settings.DNS_IPFS_DOMAIN
elif target == "program":
cname_value = settings.DNS_PROGRAM_DOMAIN
elif target == "instance":
cname_value = f"{domain}.{settings.DNS_INSTANCE_DOMAIN}"

# cname rule
dns_rules.append({
"rule_name": "cname",
"dns": {
"type": "cname",
"name": domain,
"value": cname_value
},
"info": f"Create a CNAME record for {domain} with value {cname_value}",
"on_error": f"CNAME record not found: {domain}"
})

if target == "ipfs":
# ipfs rule
dns_rules.append({
"rule_name": "delegation",
"dns": {
"type": "cname",
"name": f"_dnslink.{domain}",
"value": f"_dnslink.{domain}.{settings.DNS_ROOT_DOMAIN}"
},
"info": f"Create a CNAME record for _dnslink.{domain} with value _dnslink.{domain}.{settings.DNS_ROOT_DOMAIN}",
"on_error": f"CNAME record not found: _dnslink.{domain}"
})

if owner:
# ownership rule
dns_rules.append({
"rule_name": "owner_proof",
"dns": {
"type": "txt",
"name": f"_control.{domain}",
"value": owner
},
"info": f"Create a TXT record for _control.{domain} with value = owner address",
"on_error": f"Owner address mismatch"
})

return dns_rules
5 changes: 5 additions & 0 deletions src/aleph/sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ class FileTooLarge(Exception):
"""

pass


class DomainConfigurationError(Exception):
"Raised when the domain checks are not satisfied"
pass
50 changes: 50 additions & 0 deletions tests/unit/test_domains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pytest
import asyncio

from aleph.sdk.domain import AlephDNS
from aleph.sdk.exceptions import DomainConfigurationError


@pytest.mark.asyncio
async def test_url_to_domain():
alephdns = AlephDNS()
domain = alephdns.url_to_domain("https://aleph.im")
query = await alephdns.query(domain, "A")
assert query is not None
assert len(query) > 0
assert hasattr(query[0], "host")


@pytest.mark.asyncio
async def test_get_ipv6_address():
alephdns = AlephDNS()
url = "https://aleph.im"
ipv6_address = await alephdns.get_ipv6_address(url)
assert ipv6_address is not None
assert len(ipv6_address) > 0
assert ":" in ipv6_address[0]


@pytest.mark.asyncio
async def test_dnslink():
alephdns = AlephDNS()
url = "https://aleph.im"
dnslink = await alephdns.get_dnslink(url)
assert dnslink is not None


@pytest.mark.asyncio
async def test_configured_domain():
alephdns = AlephDNS()
url = 'https://custom-domain-unit-test.aleph.sh'
status = await alephdns.check_domain(url, "ipfs", "0xfakeaddress")
assert type(status) is dict


@pytest.mark.asyncio
async def test_not_configured_domain():
alephdns = AlephDNS()
url = 'https://not-configured-domain.aleph.sh'
with pytest.raises(DomainConfigurationError):
status = await alephdns.check_domain(url, "ipfs", "0xfakeaddress")

0 comments on commit 86df08f

Please sign in to comment.