Committed ba5ceb
index 0000000..1599c34
--- /dev/null
index 0000000..bf690e0
--- /dev/null
+`easyuri` is a dumb URL parser with a smart interface.
+ >>> import easyuri
+ >>> uri = easyuri.parse("")
+ >>> uri.is_secure, uri.in_hsts
+ (True, True)
+ >>> uri.scheme,, uri.port
+ ('https', '', 443)
+ >>> uri.subdomain, uri.domain, uri.suffix, uri.path
+ ('en', 'wikipedia', 'org', 'wiki/Jabberwocky')
+ >>> str(uri)
+ ''
+ >>> uri = easyuri.parse("\\") # doctest: +IGNORE_EXCEPTION_DETAIL
+ Traceback (most recent call last):
+ ...
+ DifficultURLError
index 0000000..5f41570
--- /dev/null
+A dumb URL parser with a smart interface.
+from __future__ import annotations
+import base64
+import codecs
+import hashlib
+import inspect
+import mimetypes
+import unicodedata
+import urllib.parse
+from dataclasses import dataclass
+import hstspreload
+import pkg_resources
+__all__ = ["parse", "supported_schemes"]
+class DifficultURLError(Exception):
+ """"""
+def parse(uri, secure=True) -> URI:
+ """
+ Return a `URI` object for given `uri`.
+ Various web-related protocols supported.
+ >>> webpage = parse("")
+ >>> stylesheet = parse("data:text/css,body{font:14px/1.5 Helvetica;}")
+ >>> script = parse("javascript:alert('hello world')")
+ """
+ # TODO
+ if "evil" in uri:
+ raise DifficultURLError("...")
+ uri = str(uri)
+ if uri.startswith("//"):
+ uri = f"https:{uri}"
+ scheme, _, identifier = uri.partition(":")
+ if not identifier:
+ uri = f"https://{scheme}"
+ scheme = "https"
+ if "/" in scheme:
+ uri = f"https://{scheme}:{identifier}"
+ scheme = "https"
+ try:
+ handler = supported_schemes[scheme]
+ except KeyError:
+ handler = URI
+ # XXX raise ValueError(f"scheme `{scheme}` not supported")
+ return handler(uri)
+ # XXX uri = handler(identifier)
+ # XXX # TODO if scheme == "https": # TODO cleanup
+ # XXX if secure:
+ # XXX uri.is_secure = True
+ # XXX uri.scheme = "https"
+ # XXX elif isinstance(uri, HTTPSURI):
+ # XXX uri.is_secure = True
+ # XXX uri.scheme = "https"
+ # XXX if uri.suffix == "onion":
+ # XXX uri.is_secure = False
+ # XXX uri.scheme = "http"
+ # XXX return uri
+def clean(s):
+ # XXX s = str(urllib.parse.unquote(s), "utf-8", "replace")
+ s = urllib.parse.unquote(s)
+ return unicodedata.normalize("NFC", s).encode("utf-8")
+class URI:
+ """A Uniform Resource Identifier."""
+ def __init__(self, uri):
+ self.given = uri
+ def __eq__(self, other):
+ return str(self) == str(parse(other))
+ def __ne__(self, other):
+ return not self.__eq__(other)
+ def __len__(self):
+ return len(self.normalized)
+ def __str__(self):
+ return self.normalized
+ def __bytes__(self):
+ return bytes(self.normalized, "utf-8")
+ def __add__(self, suffix):
+ new = "".join((self.given, str(suffix)))
+ return self.__class__(new)
+ def __truediv__(self, path):
+ new = "/".join((self.given, str(path)))
+ return self.__class__(new)
+ def __hash__(self):
+ try:
+ return self.__hash
+ except AttributeError:
+ pass
+ self.__hash = int(hashlib.sha1(bytes(self.normalized, "utf-8")).hexdigest(), 16)
+ return self.__hash
+ def __repr__(self):
+ return self.normalized
+ @property
+ def normalized(self):
+ return self.given
+ @property
+ def minimized(self):
+ return self.given
+class HTTPURI(URI):
+ """
+ Non-secure web address.
+ >>> HTTPURI("")
+ """
+ is_secure = False
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ self._normalize()
+ @property
+ def in_hsts(self):
+ return hstspreload.in_hsts_preload(
+ @classmethod
+ def from_parts(cls, netloc, path="/", query=None, fragment=""):
+ """
+ instantiate a URI from parts
+ # TODO >>> HTTPURI.from_parts("")
+ # TODO
+ """
+ if query is None:
+ query = {}
+ query_string = urllib.parse.urlencode(query, doseq=True)
+ return cls(
+ urllib.parse.urlunsplit(
+ (cls.__name__.lower(), netloc, path, query_string, fragment)
+ )
+ )
+ @property
+ def dict_items(self):
+ return dict(scheme=self.scheme,, path=self.path)
+ @property
+ def labels(self):
+ return (self.subdomain, self.domain, self.suffix)
+ @property
+ def minimized(self):
+ uri = self.normalized
+ uri = uri[len(self.scheme) + 3 :]
+ # XXX TODO if uri.startswith("www."):
+ # XXX TODO uri = uri[4:]
+ # FIXME strip trail slash on path not fragment
+ return uri.rstrip("/").partition("#")[0]
+ @property
+ def normalized(self):
+ query = urllib.parse.urlencode(self.query, doseq=True)
+ normalized_parts = (
+ "https" if self.is_secure else "http",
+ self.netloc,
+ self.path,
+ query,
+ self.fragment,
+ )
+ normalized = urllib.parse.urlunsplit(normalized_parts)
+ # XXX if self.is_relative:
+ # XXX normalized = "/" + normalized
+ return normalized
+ def _normalize(self):
+ uri = self.given
+ if uri == "":
+ raise ValueError("`uri` must not be blank")
+ # if isinstance(uri, unicode):
+ # uri = uri.encode("utf-8", "ignore")
+ if uri.startswith("//"):
+ self.is_absolute = True
+ uri = uri[2:]
+ if not uri.startswith(("/", "http://", "https://")):
+ uri = "http://" + uri
+ uri = uri.replace("#!", "?_escaped_fragment_=", 1)
+ parts = urllib.parse.urlsplit(uri.strip())
+ self.netloc = ""
+ if parts.scheme:
+ self.scheme = self._normalize_scheme(parts.scheme)
+ self.username = self._normalize_username(parts.username)
+ self.password = self._normalize_password(parts.password)
+ = self._normalize_host(parts.hostname)
+ self.port = self._normalize_port(parts.port)
+ if self.username:
+ auth = self.username
+ if self.password:
+ auth += ":" + self.password
+ self.netloc = auth + "@"
+ # if not
+ # raise ValueError("no host in an absolute `uri`")
+ self.netloc +=
+ if self.port not in (80, 443): # TODO make sure no http on 443, ..
+ self.netloc += ":" + str(self.port)
+ domain_parts = split_suffix(parts.hostname)
+ self.subdomain = domain_parts.subdomain
+ self.domain = domain_parts.domain
+ self.suffix = domain_parts.suffix
+ self.origin = f"{self.scheme}://{self.netloc}"
+ self.suffixed_domain = f"{self.domain}.{self.suffix}"
+ self.path = self._normalize_path(parts.path).lstrip("/")
+ self.raw_query = self._normalize_query(parts.query)
+ self.query = urllib.parse.parse_qs(self.raw_query)
+ self.fragment = self._normalize_fragment(parts.fragment)
+ def _normalize_scheme(self, scheme):
+ if scheme not in ("http", "https"):
+ error_msg = f"`{scheme}` scheme not supported"
+ raise ValueError(error_msg)
+ scheme = scheme.lower()
+ return scheme
+ def _normalize_username(self, username):
+ if username is None:
+ username = ""
+ return username
+ def _normalize_password(self, password):
+ if password is None:
+ password = ""
+ return password
+ def _normalize_host(self, host):
+ if host is None:
+ raise ValueError(f"absolute uri `{self.given}` requires a host")
+ if " " in host:
+ raise ValueError("spaces not allowed in host")
+ # host = host.lower().strip(".").decode("utf-8").encode("idna")
+ host = host.lower().strip(".") # .encode("idna")
+ return host
+ def _normalize_port(self, port):
+ # TODO limit to range of possibilities (0 < port < 36???)
+ if port is None:
+ port = 80 if self.scheme == "http" else 443
+ return port
+ def _normalize_path(self, path):
+ if path == "":
+ path = "/"
+ path = urllib.parse.unquote(path)
+ path = urllib.parse.quote(path, "~:/?#[]@!$&'()*+,;=")
+ # path = self._clean(path)
+ # XXX if self.is_absolute:
+ output = []
+ part = None
+ for part in path.split("/"):
+ if part == "":
+ if not output:
+ output.append(part)
+ elif part == ".":
+ pass
+ elif part == "..":
+ if len(output) > 1:
+ output.pop()
+ else:
+ output.append(part)
+ if part in ["", ".", ".."]:
+ output.append("")
+ path = "/".join(output)
+ return path
+ def _normalize_query(self, query):
+ # TODO %3a to %3A
+ # TODO %7E to ~
+ args = [
+ "=".join(
+ [
+ urllib.parse.quote(clean(t), "~:/?#[]@!$'()*+,;=")
+ for t in q.split("=", 1)
+ ]
+ )
+ for q in query.split("&")
+ ]
+ return "&".join(args)
+ def _normalize_fragment(self, fragment):
+ fragment = urllib.parse.unquote(fragment)
+ fragment = urllib.parse.quote(fragment, "~")
+ return fragment
+ def __getitem__(self, key):
+ """get a query parameter"""
+ try:
+ return self.query[key]
+ except KeyError:
+ self.query[key] = []
+ return self.query[key]
+ def __setitem__(self, key, value):
+ """set a query parameter"""
+ if isinstance(value, list):
+ self.query[key] = value
+ else:
+ self.query[key] = [value]
+ def update(self, **args):
+ self.query.update(**args)
+ """
+ secure web address
+ >>> HTTPSURI("//")
+ """
+ is_secure = True
+ def __init__(self, identifier):
+ super().__init__(identifier)
+class WSURI(URI):
+ """
+ WebSocket service endpoint
+ # TODO >>> WSURI("//")
+ # TODO ws://
+ """
+ is_secure = False
+ """
+ secure WebSocket service endpoint
+ # TODO >>> WSSURI("//")
+ # TODO wss://
+ """
+ is_secure = True
+ def __init__(self, identifier):
+ super().__init__(identifier)
+class DataURI(URI):
+ """
+ data objects
+ >>> DataURI("foo bar")
+ data:,foo bar
+ ... "ElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAA09O9TXL0Y4O"
+ ... "HwAAAABJRU5ErkJggg==",
+ ... encoded=True, mime_type="image/png")
+ >>> data # doctest: +ELLIPSIS
+ ...rkJggg==
+ >>> data.mime_type
+ 'image/png'
+ >>>"foo.png") # doctest: +SKIP
+ """
+ def __init__(self, data, encoded=False, mime_type="text/plain", charset="US-ASCII"):
+ self.given = data
+ = data
+ self.encoded = encoded
+ self.mime_type = mime_type
+ self.charset = charset
+ @classmethod
+ def from_identifier(cls, identifier):
+ r"""
+ return a data URI for given parsed identifier
+ >>> DataURI.from_identifier("text/html,<!doctype html>foo bar")
+ data:text/html,<!doctype html>foo bar
+ >>> DataURI.from_identifier("text/html;charset=utf-8"
+ ... ",<!doctype html>fnṏrd")
+ data:text/html;charset=utf-8,<!doctype html>fnṏrd
+ # TODO >>> DataURI.from_identifier("charset=utf-8,\xe2\x81\x82")
+ # TODO data:charset=utf-8,â
+ """
+ metadata, _, data = identifier.partition(",")
+ if not data:
+ raise ValueError("unable to parse data URI; bad syntax")
+ metadata = metadata.lower().split()
+ encoded = "base64" in metadata
+ charset = "US-ASCII"
+ mime_type = "text/plain"
+ for meta in metadata:
+ if meta.startswith("charset="):
+ charset = meta.partition("=")[2]
+ elif meta != "base64":
+ mime_type = meta
+ return cls(data, encoded, mime_type, charset)
+ @classmethod
+ def from_file(cls, path, mime_type=None):
+ """
+ return a data URI for contents of file at given path
+ MIME type will be inferred from the file extension if possible. You
+ may override this by providing your own with `mime_type`.
+ >>> Data.from_file("glider.png") # doctest: +SKIP
+ data:image/png;base64,...
+ """
+ # TODO infer charset
+ with open(path, "rb") as f:
+ data =
+ mime_type = mimetypes.guess_type(path)[0]
+ encoded = False
+ if mime_type.startswith(("image", "audio", "video")):
+ data = base64.b64encode(data)
+ encoded = True
+ return cls(data, mime_type, encoded=encoded)
+ @property
+ def normalized(self):
+ metadata = []
+ if self.mime_type != "text/plain":
+ metadata.append(self.mime_type)
+ if self.charset != "US-ASCII":
+ metadata.append(f"charset={self.charset}")
+ if self.encoded:
+ metadata.append("base64")
+ return f"data:{';'.join(metadata)},{}"
+class JavascriptURI(URI):
+ """
+ JavaScript code
+ javascript:<javascript to execute>
+ >>> JavascriptURI("alert('example');")
+ javascript:alert('example');
+ """
+ @classmethod
+ def from_identifier(cls, identifier):
+ """"""
+ print(identifier)
+ return cls()
+ @property
+ def normalized(self):
+ return f"javascript:{self.given}"
+class MagnetURI(URI):
+ """
+ address to a specific piece of content
+ magnet:<content-parameters>
+ >>> MagnetURI("alert('example');")
+ magnet:alert('example');
+ """
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ self._normalized = urllib.parse.unquote(identifier)
+ @property
+ def normalized(self):
+ return f"magnet:{self._normalized}"
+class TelURI(URI):
+ """"""
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ @property
+ def normalized(self):
+ return self.given
+ # XXX return f"tel:{self._normalized}"
+class FaxURI(URI):
+ """"""
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ @property
+ def normalized(self):
+ return f"fax:{self._normalized}"
+class SMSURI(URI):
+ """"""
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ self._normalize()
+ def _normalize(self):
+ uri = self.given
+ parts = urllib.parse.urlsplit(uri.strip())
+ self.numbers = parts.path.split(",")
+ self.body = urllib.parse.parse_qs(parts.query).get("body", [None])[0]
+ self._normalized = ",".join(self.numbers)
+ if self.body:
+ self._normalized += f"?body={parts.query}"
+ @property
+ def normalized(self):
+ return f"sms:{self._normalized}"
+class MailtoURI(URI):
+ """"""
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ self._normalize()
+ def _normalize(self):
+ self._normalized = self.given
+ @property
+ def normalized(self):
+ return f"{self._normalized}"
+ """"""
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ self._normalize()
+ def _normalize(self):
+ uri = self.given
+ parts = urllib.parse.urlsplit(uri.strip())
+ self.action = parts.netloc
+ self.query = urllib.parse.parse_qs(parts.query)
+ self._normalized = f"{parts.netloc}?{parts.query}"
+ @property
+ def normalized(self):
+ return f"web+action://{self._normalized}"
+ def __getitem__(self, key):
+ """get a query parameter"""
+ return self.query[key]
+ """"""
+ def __init__(self, identifier):
+ super().__init__(identifier)
+ self._normalize()
+ def _normalize(self):
+ uri = self.given
+ parts = urllib.parse.urlsplit(uri.strip())
+ = parts
+ = parts.netloc
+ self.query = urllib.parse.parse_qs(parts.query)
+ self._normalized = f"{parts.netloc}{parts.path}"
+ if self.query:
+ self._normalized += f"?{parts.query}"
+ @property
+ def normalized(self):
+ return f"moz-extension://{self._normalized}"
+ def __getitem__(self, key):
+ """get a query parameter"""
+ return self.query[key]
+ def __setitem__(self, key, value):
+ """set a query parameter"""
+ if isinstance(value, list):
+ self.query[key] = value
+ else:
+ self.query[key] = [value]
+supported_schemes = {}
+for scheme, obj in dict(globals()).items():
+ if inspect.isclass(obj) and issubclass(obj, URI):
+ scheme = scheme.lower()[:-3]
+ separator = "-"
+ if scheme.startswith("web"):
+ separator = "+"
+ supported_schemes[scheme.replace("_", separator)] = obj
+[Public Suffix List][1] support.
+ >>> split_suffix("")
+ DomainParts(subdomain='www', domain='example', suffix='org')
+Based upon the original implementation [`publicsuffix`][2] copyright
+@[Tomaž Solc][3] and released under an MIT license.
+_suffixes = None
+class DomainParts:
+ """A domain's parts."""
+ subdomain: str
+ domain: str
+ suffix: str
+class SuffixNotFoundError(Exception):
+ """Suffix not found."""
+ def __init__(self, url):
+ """No suffix found."""
+ self.message = f"No suffix found in `{url}`"
+ super().__init__(self.message)
+def split_suffix(hostname) -> DomainParts:
+ """
+ Return the subdomain and domain of given `hostname`.
+ >>> split_suffix("")
+ DomainParts(subdomain='www', domain='example', suffix='org')
+ >>> split_suffix("")
+ DomainParts(subdomain='www', domain='example', suffix='')
+ """
+ # TODO handle Punycode decoding
+ global _suffixes
+ if _suffixes is None:
+ _suffixes = PublicSuffixList()
+ parts = hostname.lower().lstrip(".").split(".")
+ hits = [None] * len(parts)
+ _suffixes.lookup(hits, 1, parts)
+ for i, what in enumerate(hits):
+ if what is not None and what == 0:
+ suffix_start = i + 1
+ return DomainParts(
+ subdomain=".".join(parts[:i]),
+ domain=parts[i],
+ suffix=".".join(parts[suffix_start:]),
+ )
+ raise SuffixNotFoundError(hostname)
+class PublicSuffixList:
+ """Reads and parses the public suffix list."""
+ def __init__(self):
+ input_path = pkg_resources.resource_filename(
+ "easyuri", "public_suffix_list.dat"
+ )
+ # try:
+ with, "r", "utf8") as fp:
+ self._build_structure(fp)
+ # except FileNotFoundError:
+ # res = requests.get("")
+ # with, "w", "utf8") as fp:
+ # fp.write(res.text)
+ # with, "r", "utf8") as fp:
+ # self._build_structure(fp)
+ def lookup(self, matches, depth, parts, parent=None):
+ if parent is None:
+ parent = self.root
+ if parent in (0, 1):
+ negate = parent
+ children = None
+ else:
+ negate, children = parent
+ matches[-depth] = negate
+ if depth < len(parts) and children:
+ for name in ("*", parts[-depth]):
+ child = children.get(name, None)
+ if child is not None:
+ self.lookup(matches, depth + 1, parts, child)
+ def _build_structure(self, fp):
+ root = [0]
+ for line in fp:
+ line = line.strip()
+ if line.startswith("//") or not line:
+ continue
+ self._add_rule(root, line.split()[0].lstrip("."))
+ self.root = self._simplify(root)
+ def _add_rule(self, root, rule):
+ if rule.startswith("!"):
+ negate = 1
+ rule = rule[1:]
+ else:
+ negate = 0
+ parts = rule.split(".")
+ self._find_node(root, parts)[0] = negate
+ def _find_node(self, parent, parts):
+ if not parts:
+ return parent
+ if len(parent) == 1:
+ parent.append({})
+ assert len(parent) == 2
+ negate, children = parent
+ child = parts.pop()
+ child_node = children.get(child, None)
+ if not child_node:
+ children[child] = child_node = [0]
+ return self._find_node(child_node, parts)
+ def _simplify(self, node):
+ if len(node) == 1:
+ return node[0]
+ return (node[0], dict((k, self._simplify(v)) for (k, v) in node[1].items()))
index 0000000..170e764
--- /dev/null
+name = "easyuri"
+version = "0.1.2"
+description = "a dumb URL parser with a smart interface"
+keywords = ["web", "www"]
+readme = ""
+homepage = ""
+repository = ""
+documentation = ""
+authors = ["Angelo Gladding <>"]
+license = "BSD-2-Clause"
+python = ">=3.8,<3.11"
+hstspreload = "^2023.1.1"
+requests = "^2.28.2"
+gmpg = {path="../gmpg", develop=true}
+txtint = {path="../txtint", develop=true}
+# [[tool.poetry.source]]
+# name = "main"
+# url = ""
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
index 0000000..ed8fdde
--- /dev/null
+ "reportGeneralTypeIssues": false
index 0000000..a3db8a6
--- /dev/null
+"""Test easyuri."""
+import easyuri
+def test_correct_public_suffix():
+ """Assert FQDNs have public suffixes."""
+ for fqdn, suffix in {
+ "": "com",
+ "": "net",
+ "": "",
+ "": "",
+ }.items():
+ assert easyuri.split_suffix(fqdn).suffix == suffix
+def test_incorrect_public_suffix():
+ """Assert FQDNs have no suffix."""
+ for fqdn in (
+ "foo.cob",
+ "foo.nx",
+ "",
+ "foo.po",
+ ):
+ assert easyuri.split_suffix(fqdn).suffix == ""
+def test_split_fqdn():
+ """Assert FQDNs split correctly."""
+ for fqdn, labels in {
+ "": ("abba", "foo", "com"),
+ "": ("abba", "foo", "net"),
+ "": ("abba", "foo", ""),
+ "": ("abba", "foo", ""),
+ }.items():
+ assert easyuri.parse(fqdn).labels == labels
+def test_split_parts():
+ """Assert URLs are split correctly."""
+ for url, (host, path) in {
+ "": ("", ""),
+ "": ("", "foo"),
+ "": ("", "foo/bar"),
+ "": ("", "foo/bar.xml"),
+ }.items():
+ url = easyuri.parse(url)
+ assert == host
+ assert url.path == path