my eye

Bootstrap

Committed fc2c7d

index 0000000..4711d41
--- /dev/null

+poetry.lock
+.coverage
+.test_coverage.xml
+.test_results.xml

index 0000000..ca7fa57
--- /dev/null

+[tool.poetry]
+name = "webagt"
+version = "0.1.2"
+description = "a web agent"
+homepage = "https://ragt.ag/code/webagt"
+authors = ["Angelo Gladding <angelo@ragt.ag>"]
+license = "BSD-2-Clause"
+
+[tool.poetry.scripts]
+webagt = "web.__main__:main"
+
+[tool.poetry.dependencies]
+python = ">=3.10,<3.11"
+txtint = ">=0.0.68"
+sqlyte = ">=0.0.50"
+easyuri = ">=0.0.12"
+microformats = ">=0.0.24"
+pendulum = "^2.1.2"
+certifi = "^2022.12.7"
+requests = {extras=["socks"], version="^2.28.2"}
+httpagentparser = "^1.9.5"
+mimeparse = "^0.1.3"
+lxml = "^4.9.2"
+cssselect = "^1.2.0"
+selenium = "^4.8.0"
+webdriver-manager = "^3.8.5"
+pyvirtualdisplay = "^3.0"
+pyscreenshot = "^3.0"
+
+[tool.poetry.group.dev.dependencies]
+pillow = "^9.2.0"
+gmpg = {path="../gmpg", develop=true}
+txtint = {path="../txtint", develop=true}
+sqlyte = {path="../sqlyte", develop=true}
+easyuri = {path="../easyuri", develop=true}
+microformats = {path="../python-microformats", develop=true}
+
+# [[tool.poetry.source]]
+# name = "main"
+# url = "https://ragt.ag/code/pypi"
+
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"

index 0000000..195765d
--- /dev/null

+"""A web agent."""
+
+import json
+import pathlib
+import re
+import time
+from pprint import pprint
+
+import lxml.etree
+import lxml.html
+
+try:
+    import pyscreenshot
+except ImportError:  # legacy OSX 10.x
+    pass
+import mf
+import pyvirtualdisplay
+import requests
+import selenium
+import sqlyte
+import txt
+from easyuri import URI
+from easyuri import parse as uri
+from requests.exceptions import ConnectionError, SSLError
+from selenium import webdriver
+from selenium.webdriver.common.action_chains import ActionChains
+from selenium.webdriver.common.by import By
+from selenium.webdriver.firefox.service import Service as FirefoxService
+from selenium.webdriver.support import expected_conditions
+from selenium.webdriver.support.ui import WebDriverWait
+from webdriver_manager.firefox import GeckoDriverManager
+
+__all__ = [
+    "post",
+    "get",
+    "put",
+    "delete",
+    "parse",
+    "agent",
+    "Agent",
+    "firefox",
+    "cache",
+    "download",
+    "ConnectionError",
+    "SSLError",
+    "uri",
+    "URI",
+]
+
+main = txt.application("webagt", __doc__)
+displays = []
+browsers = []
+
+tor_proxies = {"http": "socks5h://localhost:9150", "https": "socks5h://localhost:9150"}
+
+
+def post(url, **kwargs):
+    """Post to the web."""
+    return Transaction(url, "post", **kwargs)
+
+
+def get(url, **kwargs):
+    """Get from the web."""
+    return Transaction(url, "get", **kwargs)
+
+
+def put(url, **kwargs):
+    """Put to the web."""
+    return Transaction(url, "put", **kwargs)
+
+
+def delete(url, **kwargs):
+    """Delete from the web."""
+    return Transaction(url, "delete", **kwargs)
+
+
+def download(url, filepath, chunk_size=1024):
+    """Download url to filepath."""
+    transaction = get(url, stream=True)
+    if transaction.ok:
+        with pathlib.Path(filepath).open("wb") as fp:
+            for chunk in transaction.response.iter_content(chunk_size=chunk_size):
+                if chunk:
+                    fp.write(chunk)
+    return transaction
+
+
+def request(method, url, session=None, **kwargs):
+    """
+    Return the response to dereferencing given `url` using given `method`.
+
+    Attempts to use HTTPS when accessing non-onion domains. Proxies
+    through Tor when accessing onion services. Optionally pass typical
+    `requests.Request` arguments as `kwargs`.
+
+    """
+    url = uri(url)
+    if url.suffix == "onion":
+        kwargs["proxies"] = tor_proxies
+    context = session if session else requests
+    # try:
+    #     response = context.request(
+    #         method, f"{preferred}://{url.minimized}", verify=False, **kwargs
+    #     )
+    # except (requests.exceptions.SSLError, requests.exceptions.ConnectionError):
+    #     if url.suffix != "onion":
+    #         try:
+    kwargs["timeout"] = 15
+    kwargs["headers"] = kwargs.get("headers", {})
+    normal_url = url.normalized.partition("://")[2]
+    try:
+        response = context.request(method, "https://" + normal_url, **kwargs)
+    # except requests.Timeout:
+    #     response = context.request(method, "https://www." + normal_url, **kwargs)
+    except (requests.exceptions.SSLError, requests.exceptions.ConnectionError):
+        response = context.request(method, "http://" + normal_url, **kwargs)
+        # url.is_secure = False
+        # url.scheme = "http"
+    return uri(response.url), response
+
+
+class Transaction:
+    """."""
+
+    def __init__(
+        self, url, method="get", fetch=True, session=None, headers=None, **kwargs
+    ):
+        self.url = str(url)
+        if fetch:
+            # XXX handler = getattr(requests, method)
+            # XXX self.response = handler(apply_dns(self.url), **kwargs)
+
+            firefox_version = "102.0"
+            user_agents = {
+                "wget": "Wget/1.21",
+                "firefox": " ".join(
+                    (
+                        f"Mozilla/5.0 (X11; Linux x86_64; rv:{firefox_version})",
+                        f"Gecko/20100101 Firefox/{firefox_version}",
+                    )
+                ),
+            }
+            _headers = {}
+            if not session or (
+                session and session.headers["User-Agent"].startswith("python-requests/")
+            ):
+                try:
+                    user_agent = user_agents[kwargs["user_agent"]]
+                except KeyError:
+                    user_agent = "webint/0"
+                _headers["user-agent"] = user_agent
+            accept_header = "*/*"
+            # accept_header = "text/html;q=0.9,*/*;q=0.8"
+            _headers["accept"] = accept_header
+            if headers:
+                _headers.update(headers)
+            self.url, self.response = request(
+                method, self.url, session=session, headers=_headers, **kwargs
+            )
+            self.status = self.response.status_code
+            self.ok = self.response.ok
+            self.text = self.response.text
+            self.headers = self.response.headers
+        # self.url = str(self.url)
+
+    def __repr__(self):
+        return f"<web.agent.Transaction object for {self.url}>"
+
+    @property
+    def location(self):
+        location = self.headers["location"]
+        if location.startswith("/"):
+            new_url = uri(self.response.url)
+            new_url.path = location
+            location = str(new_url)
+        return location
+
+    @property
+    def links(self):
+        return self.response.links
+
+    def link(self, name):
+        """
+        Fetch target and discover link `name` in HTML rels or HTTP Link header.
+
+        Skip HEAD request; use GET and attempt to cache.
+
+        """
+        try:
+            link = _get_header_link(self.headers["Link"], name)[0]
+        except (KeyError, IndexError):
+            try:
+                endpoint = uri(self.mf2json["rels"].get(name, [])[0])
+            except IndexError:
+                endpoint = None
+        else:
+            if link.startswith("/"):
+                endpoint = uri(self.url)
+                endpoint.path = link
+            else:
+                endpoint = uri(link)
+        return endpoint
+
+    @property
+    def xml(self):
+        try:
+            return lxml.etree.fromstring(self.text)
+        except ValueError:
+            return lxml.etree.fromstring(self.text.encode("utf-8"))
+
+    @property
+    def dom(self):
+        return Document(self.text, self.url)
+
+    @property
+    def json(self):
+        return json.loads(self.text)
+
+    @property
+    def mf2json(self):
+        return Semantics(mf.parse(Document(self.text, self.url).html, self.url))
+
+    @property
+    def card(self):
+        return Semantics(mf.representative_card(self.mf2json.data, str(self.url)))
+
+    @property
+    def feed(self):
+        # feed = mf.interpret_feed(self.mf2json.data, source_url=str(self.url))
+        # for entry in feed["entries"]:
+        #     entry["post-type"] = mf.discover_post_type(entry)
+        # return Semantics(feed)
+        return Semantics(
+            mf.representative_feed(self.mf2json.data, str(self.url), self.dom)
+        )
+
+    @property
+    def entry(self):
+        return Semantics(mf.interpret_entry(self.mf2json.data, str(self.url)))
+
+    @property
+    def event(self):
+        return Semantics(
+            mf.interpret_event(self.mf2json.data, source_url=str(self.url))
+        )
+
+    def mention(self, *target_urls):
+        return Semantics(
+            mf.interpret_comment(self.mf2json.data, str(self.url), target_urls)
+        )
+
+    # @property
+    # def jf2(self):
+    #     return Semantics(mf.interpret_feed(self.mf2json.data,
+    #                                             source_url=self.url))
+
+
+class Semantics:
+    def __init__(self, data):
+        self.data = data
+
+    def __getitem__(self, item):
+        return self.data[item]
+
+    def __repr__(self):
+        return dump(self.data, indent=2)
+        # XXX return json.dumps(self.data, indent=2)
+
+    def _repr_html_(self):
+        return solarized.highlight(dump(self.data, indent=2), ".json")
+        # XXX return solarized.highlight(json.dumps(self.data, indent=2),
+        # ".json")
+
+    def __bool__(self):
+        return bool(self.data)
+
+
+def _get_header_link(link_header: str, search_rel: str):
+    links = []
+    for link in link_header.split(","):
+        resource, _, rel = link.partition(";")
+        match = re.match("""rel=['"](.+)['"]""", rel.strip())
+        if match and match.groups()[0] == search_rel:
+            links.append(resource.strip(" <>"))
+    return links
+
+
+class RequestFailed(Exception):
+    """A request has failed."""
+
+
+class Cache:
+    """A dictionary-like cache of the web."""
+
+    model = sqlyte.model(
+        "WebCache",
+        resources={
+            "origin": "TEXT",
+            "path": "TEXT",
+            "data": "JSON",
+            "headers": "JSON",
+            "title": "TEXT",
+            "html": "TEXT",
+            "UNIQUE": "(origin, path)",
+        },
+        search={
+            "origin": "",
+            "path": "",
+            "text": "",
+            "FTS": True,
+        },
+    )
+
+    def __init__(self, origin=None, db=None):
+        self.origin = origin
+        if not db:
+            db = sqlyte.db("cache.db", self.model)
+        self.db = db
+
+    def add(self, url):
+        url = self._make_url(url)
+        resource = get(url)
+        try:
+            title = resource.dom.select("title")[0].text
+        except IndexError:
+            title = None
+        try:
+            self.db.insert(
+                "resources",
+                origin=url.origin,
+                path=url.path,
+                data=resource.mf2json.data,
+                headers=dict(resource.headers),
+                title=title,
+                html=resource.text,
+            )
+            self.db.insert(
+                "search",
+                origin=url.origin,
+                path=url.path,
+                text=str(resource.mf2json.data),
+            )
+        except self.db.IntegrityError:
+            self.db.update(
+                "resources",
+                data=resource.mf2json.data,
+                headers=dict(resource.headers),
+                title=title,
+                html=resource.text,
+                where="origin = ? AND path = ?",
+                vals=[url.origin, url.path],
+            )
+            self.db.update(
+                "search",
+                origin=url.origin,
+                path=url.path,
+                text=str(resource.mf2json.data),
+                where="origin = ? AND path = ?",
+                vals=[url.origin, url.path],
+            )
+        return url, resource
+
+    def search(self, query):
+        return self.db.select(
+            "search AS s",
+            what="r.*, s.text",
+            where="search MATCH ?",
+            vals=[query],
+            join="resources AS r ON r.origin = s.origin AND r.path = s.path",
+            order="rank",
+        )
+
+    @property
+    def domains(self):
+        return [
+            (uri(r["origin"]), r["data"])
+            for r in self.db.select(
+                "resources", what="origin, data", order="origin ASC", group="origin"
+            )
+        ]
+
+    def forget_domain(self, domain):
+        return self.db.delete(
+            "resources",
+            where="origin = ? OR origin = ?",
+            vals=[f"https://{domain}", f"http://{domain}"],
+        )
+
+    def get_pages(self, domain):
+        return self.db.select(
+            "resources",
+            where="origin = ? OR origin = ?",
+            vals=[f"https://{domain}", f"http://{domain}"],
+            order="path ASC",
+        )
+
+    # XXX @property
+    # XXX def graph(self):
+    # XXX     network = nx.DiGraph()
+    # XXX     for url, resource in self.cache.items():  # TODO iterate over db items
+    # XXX         # print(resource.links)
+    # XXX         network.add_node(url)
+    # XXX     return nx.draw(network, with_labels=True)
+
+    def _make_url(self, url):
+        if self.origin:
+            url = f"{self.origin}/{url}"
+        return uri(url)
+
+    def __getitem__(self, resource_url):
+        try:
+            url = self._make_url(resource_url)
+            resource_data = self.db.select(
+                "resources",
+                where="origin = ? AND path = ?",
+                vals=[url.origin, url.path],
+            )[0]
+            resource = Transaction(url, fetch=False)
+            resource.headers = resource_data["headers"]  # TODO case-insen
+            resource.text = resource_data["html"]
+        except (AttributeError, IndexError):
+            url, resource = self.add(resource_url)
+        return resource
+
+
+cache = Cache
+
+
+def parse(html):
+    """Return a document object for given html."""
+    return Document(html)
+
+
+# XXX def apply_dns(url):
+# XXX     if url.startswith("/"):
+# XXX         return url
+# XXX     url = easyuri.parse(url)
+# XXX     if url.host == "alice.example":
+# XXX         url = str(url).replace("http://alice.example", "http://127.0.0.1:8080")
+# XXX     elif url.host == "bob.example":
+# XXX         url = str(url).replace("http://bob.example", "http://127.0.0.1:8081")
+# XXX     elif url.host == "hello.example":
+# XXX         url = str(url).replace("http://hello.example", "http://127.0.0.1:8082")
+# XXX     else:
+# XXX         url = str(url)
+# XXX     return url
+
+
+# XXX def unapply_dns(url):
+# XXX     url = easyuri.parse(url)
+# XXX     if url.host == "127.0.0.1":
+# XXX         if url.port == 8080:
+# XXX             url = str(url).replace("http://127.0.0.1:8080", "http://alice.example")
+# XXX         elif url.port == 8081:
+# XXX             url = str(url).replace("http://127.0.0.1:8081", "http://bob.example")
+# XXX         elif url.port == 8082:
+# XXX             url = str(url).replace("http://127.0.0.1:8082", "http://hello.example")
+# XXX     else:
+# XXX         url = str(url)
+# XXX     return url
+
+
+class Document:
+
+    # TODO with html as dom: -- manipulate dom -- on exit html is modified
+
+    def __init__(self, html, url=None):
+        self.doc = lxml.html.fromstring(str(html).strip())
+        if url:
+            self.doc.make_links_absolute(str(url))
+
+    def select(self, selector):
+        els = []
+        for el in self.doc.cssselect(selector):
+            els.append(Element(el))
+        return els
+
+    @property
+    def children(self):
+        return self.doc.getchildren()
+
+    @property
+    def html(self):
+        return lxml.html.tostring(self.doc).decode()
+
+
+class Element:
+    def __init__(self, element):
+        self.element = element
+
+    def append(self, *html):
+        for _html in html:
+            self.element.append(_make_element(_html))
+
+    def select(self, selector):
+        els = []
+        for el in self.element.cssselect(selector):
+            els.append(Element(el))
+        return els
+
+    def replace(self, html):
+        self.element.getparent().replace(self.element, _make_element(html))
+
+    @property
+    def href(self):
+        try:
+            return self.element.attrib["href"]
+        except KeyError:
+            raise AttributeError("href")
+
+    @property
+    def text(self):
+        return self.element.text_content()
+
+
+def _make_element(html):
+    el = lxml.html.fromstring(f"<DOUGIE>{html}</DOUGIE>")
+    return el.cssselect("DOUGIE")[0].getchildren()[0]
+
+
+class Agent:
+    """"""
+
+    def __init__(self, user_agent=None, apps=None):
+        self.session = requests.Session()
+        if user_agent:
+            self.session.headers["User-Agent"] = user_agent
+
+    def get(self, url, **kwargs):
+        return Transaction(url, "GET", session=self.session, **kwargs)
+
+    def post(self, url, **kwargs):
+        return Transaction(url, "POST", session=self.session, **kwargs)
+
+
+agent = Agent
+
+
+class Firefox:
+    """Firefox via Selenium."""
+
+    By = By
+    EC = expected_conditions
+
+    def __init__(self, name=None, width=1024, height=768):
+        if not len(displays):
+            display = pyvirtualdisplay.Display(visible=False, size=(2048, 768))
+            display.start()
+            displays.append(display)
+        # profile = webdriver.FirefoxProfile()
+        # # profile.add_extension(
+        # #     extension="/home/gaea/canopy/var/identities/"
+        # #     "6c189616-4fe1-4f3f-84dc-c4a13ee9b155/"
+        # #     "asteria/asteria-dev.xpi"
+        # # )
+        # binary = "/home/gaea/firefox/firefox-bin"
+        # self.browser = webdriver.Firefox(
+        #     firefox_profile=profile,
+        #     firefox_binary=binary,
+        # )
+        self.browser = webdriver.Firefox(
+            service=FirefoxService(GeckoDriverManager().install())
+        )
+        count = len(browsers)
+        browsers.append(self)
+        self._top = 0
+        self._left = count * 1024
+        self._width = width
+        self._height = height
+        self._update_window()
+        self.name = name
+        self.shot_id = 0
+        self.locked = False
+
+    @property
+    def width(self):
+        return self._width
+
+    @width.setter
+    def width(self, value):
+        self._width = value
+        self._update_window()
+
+    @property
+    def height(self):
+        return self._height
+
+    @height.setter
+    def height(self, value):
+        self._height = value
+        self._update_window()
+
+    def _update_window(self):
+        self.browser.set_window_rect(self._left, self._top, self._width, self._height)
+
+    def go(self, *args, wait=0):
+        if len(args) == 1:
+            url = args[0]
+        elif len(args) == 2:
+            url = "/".join(args)
+        # XXX url = apply_dns(url)
+        self.browser.get(url)
+        if wait:
+            time.sleep(wait)
+        return self
+        # XXX self.browser.get(str(easyuri.parse(url)))
+
+    def wait(self, *conditions, duration=20):
+        for condition in conditions:
+            time.sleep(0.1)
+            wait = WebDriverWait(self.browser, duration)
+            wait.until(condition)
+
+    def wait_until_url_contains(self, url):
+        # XXX self.wait(self.EC.url_contains(apply_dns(url)))
+        self.wait(self.EC.url_contains(url))
+
+    def select(self, selector):
+        return self.browser.find_element(By.CSS_SELECTOR, selector)
+
+    def select_first(self, selector):
+        return self.browser.find_elements(By.CSS_SELECTOR, selector)
+
+    def action(self):
+        return ActionChains(self.browser)
+
+    def shot(self, path):
+        # TODO take in pieces & stitch together -- using way too much memory
+        # self._height = self.browser.execute_script("return document.body."
+        #                                            "scrollHeight;") + 100
+        # self._update_window()
+        self.browser.get_screenshot_as_file(f"{path}.png")
+
+    # XXX def shot_url(self):
+    # XXX     base64 = self.browser.get_screenshot_as_base64()
+    # XXX     return f"data:image/png;BASE64,{base64}"
+
+    def shot_url(self, filename=None):
+        # XXX grab = pyscreenshot.grab(bbox=(0, 0, 920, 920)).tobytes()
+        # XXX base64png = b"".join(base64.encodebytes(grab).splitlines())
+        self.shot_id += 1
+        if not filename:
+            filename = f"{self.name}-{self.shot_id}.png"
+        placement = browsers.index(self)
+        coords = (1024 * placement, 0, 1024 * (placement + 1), 768)
+        # import sh
+        # sh.Command("import")("-screen", "-window", "root", filename)
+        # time.sleep(2)
+        # sh.Command("import")("-window", "root", filename)
+        # sh.convert(sh.xwd("-root", "-screen"), "xwd:-", f"png:{filename}")
+        pyscreenshot.grab(bbox=coords).save(filename)
+        return filename
+
+    def quit(self):
+        try:
+            self.browser.quit()
+        except selenium.common.exceptions.WebDriverException:
+            pass
+        if displays:
+            try:
+                displays[0].stop()
+            except KeyError:  # raising during multi-user testing
+                pass
+
+    def __getattr__(self, attr):
+        return getattr(self.browser, attr)
+
+    def _repr_html_(self):
+        return f"<img class=screen src={self.shot_url()}>"
+        # url = unapply_dns(self.current_url)
+        # site_character = url.partition("//")[2].partition(".")[0]
+        # TODO FIXME remove hard-coded IndieWeb..
+        # return (f"<div class=screenshot>"
+        #         f"<div class=browser><small>{self.name}'s "
+        #         f"Browser</small></div>"
+        #         f"<div class=tab><img src=/IndieWeb/{site_character}16.png>"
+        #         f" {self.title}</div>"
+        #         f"<div class=address><small><code>{url}</code></small></div>"
+        #         f"<img class=screen src={self.shot_url()}></div>")
+
+
+firefox = Firefox
+
+
+# def shot(name, description):  # XXX , *browsers):
+#     test_case = inspect.stack()[1].function
+#     global shot_counter
+#     shot_id = "{:03}".format(shot_counter)
+#     dashed_name = name.replace(" ", "-")
+#     for user, browser in sorted(browsers.items()):
+#         shot_filename = "{}-{}-{}.png".format(shot_id, user, dashed_name)
+#         height = browser.execute_script("return document.body.scrollHeight;")
+#         browser.set_window_size(browser_width, height + 100)
+#         browser.get_screenshot_as_file(str(build_dir / "features" /
+#                                            shot_filename))
+#     features.append((test_case, shot_id, dashed_name, name, description))
+#     # XXX , shot_filename, [user for u in browsers.keys()]))
+#     shot_counter += 1
+
+
+@main.register()
+class Parse:
+    """Get a resource and parse it for Microformats."""
+
+    def setup(self, add_arg):
+        add_arg("uri", help="address of the resource")
+
+    def run(self, stdin, log):
+        pprint(webagt.get(self.uri).mf2json.data)
+        return 0