Bootstrap
Committed fc2c7d
index 0000000..4711d41
--- /dev/null
+poetry.lock
+.coverage
+.test_coverage.xml
+.test_results.xml
index 0000000..ca7fa57
--- /dev/null
+[tool.poetry]
+name = "webagt"
+version = "0.1.2"
+description = "a web agent"
+homepage = "https://ragt.ag/code/webagt"
+authors = ["Angelo Gladding <angelo@ragt.ag>"]
+license = "BSD-2-Clause"
+
+[tool.poetry.scripts]
+webagt = "web.__main__:main"
+
+[tool.poetry.dependencies]
+python = ">=3.10,<3.11"
+txtint = ">=0.0.68"
+sqlyte = ">=0.0.50"
+easyuri = ">=0.0.12"
+microformats = ">=0.0.24"
+pendulum = "^2.1.2"
+certifi = "^2022.12.7"
+requests = {extras=["socks"], version="^2.28.2"}
+httpagentparser = "^1.9.5"
+mimeparse = "^0.1.3"
+lxml = "^4.9.2"
+cssselect = "^1.2.0"
+selenium = "^4.8.0"
+webdriver-manager = "^3.8.5"
+pyvirtualdisplay = "^3.0"
+pyscreenshot = "^3.0"
+
+[tool.poetry.group.dev.dependencies]
+pillow = "^9.2.0"
+gmpg = {path="../gmpg", develop=true}
+txtint = {path="../txtint", develop=true}
+sqlyte = {path="../sqlyte", develop=true}
+easyuri = {path="../easyuri", develop=true}
+microformats = {path="../python-microformats", develop=true}
+
+# [[tool.poetry.source]]
+# name = "main"
+# url = "https://ragt.ag/code/pypi"
+
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
index 0000000..195765d
--- /dev/null
+"""A web agent."""
+
+import json
+import pathlib
+import re
+import time
+from pprint import pprint
+
+import lxml.etree
+import lxml.html
+
+try:
+ import pyscreenshot
+except ImportError: # legacy OSX 10.x
+ pass
+import mf
+import pyvirtualdisplay
+import requests
+import selenium
+import sqlyte
+import txt
+from easyuri import URI
+from easyuri import parse as uri
+from requests.exceptions import ConnectionError, SSLError
+from selenium import webdriver
+from selenium.webdriver.common.action_chains import ActionChains
+from selenium.webdriver.common.by import By
+from selenium.webdriver.firefox.service import Service as FirefoxService
+from selenium.webdriver.support import expected_conditions
+from selenium.webdriver.support.ui import WebDriverWait
+from webdriver_manager.firefox import GeckoDriverManager
+
+__all__ = [
+ "post",
+ "get",
+ "put",
+ "delete",
+ "parse",
+ "agent",
+ "Agent",
+ "firefox",
+ "cache",
+ "download",
+ "ConnectionError",
+ "SSLError",
+ "uri",
+ "URI",
+]
+
+main = txt.application("webagt", __doc__)
+displays = []
+browsers = []
+
+tor_proxies = {"http": "socks5h://localhost:9150", "https": "socks5h://localhost:9150"}
+
+
+def post(url, **kwargs):
+ """Post to the web."""
+ return Transaction(url, "post", **kwargs)
+
+
+def get(url, **kwargs):
+ """Get from the web."""
+ return Transaction(url, "get", **kwargs)
+
+
+def put(url, **kwargs):
+ """Put to the web."""
+ return Transaction(url, "put", **kwargs)
+
+
+def delete(url, **kwargs):
+ """Delete from the web."""
+ return Transaction(url, "delete", **kwargs)
+
+
+def download(url, filepath, chunk_size=1024):
+ """Download url to filepath."""
+ transaction = get(url, stream=True)
+ if transaction.ok:
+ with pathlib.Path(filepath).open("wb") as fp:
+ for chunk in transaction.response.iter_content(chunk_size=chunk_size):
+ if chunk:
+ fp.write(chunk)
+ return transaction
+
+
+def request(method, url, session=None, **kwargs):
+ """
+ Return the response to dereferencing given `url` using given `method`.
+
+ Attempts to use HTTPS when accessing non-onion domains. Proxies
+ through Tor when accessing onion services. Optionally pass typical
+ `requests.Request` arguments as `kwargs`.
+
+ """
+ url = uri(url)
+ if url.suffix == "onion":
+ kwargs["proxies"] = tor_proxies
+ context = session if session else requests
+ # try:
+ # response = context.request(
+ # method, f"{preferred}://{url.minimized}", verify=False, **kwargs
+ # )
+ # except (requests.exceptions.SSLError, requests.exceptions.ConnectionError):
+ # if url.suffix != "onion":
+ # try:
+ kwargs["timeout"] = 15
+ kwargs["headers"] = kwargs.get("headers", {})
+ normal_url = url.normalized.partition("://")[2]
+ try:
+ response = context.request(method, "https://" + normal_url, **kwargs)
+ # except requests.Timeout:
+ # response = context.request(method, "https://www." + normal_url, **kwargs)
+ except (requests.exceptions.SSLError, requests.exceptions.ConnectionError):
+ response = context.request(method, "http://" + normal_url, **kwargs)
+ # url.is_secure = False
+ # url.scheme = "http"
+ return uri(response.url), response
+
+
+class Transaction:
+ """."""
+
+ def __init__(
+ self, url, method="get", fetch=True, session=None, headers=None, **kwargs
+ ):
+ self.url = str(url)
+ if fetch:
+ # XXX handler = getattr(requests, method)
+ # XXX self.response = handler(apply_dns(self.url), **kwargs)
+
+ firefox_version = "102.0"
+ user_agents = {
+ "wget": "Wget/1.21",
+ "firefox": " ".join(
+ (
+ f"Mozilla/5.0 (X11; Linux x86_64; rv:{firefox_version})",
+ f"Gecko/20100101 Firefox/{firefox_version}",
+ )
+ ),
+ }
+ _headers = {}
+ if not session or (
+ session and session.headers["User-Agent"].startswith("python-requests/")
+ ):
+ try:
+ user_agent = user_agents[kwargs["user_agent"]]
+ except KeyError:
+ user_agent = "webint/0"
+ _headers["user-agent"] = user_agent
+ accept_header = "*/*"
+ # accept_header = "text/html;q=0.9,*/*;q=0.8"
+ _headers["accept"] = accept_header
+ if headers:
+ _headers.update(headers)
+ self.url, self.response = request(
+ method, self.url, session=session, headers=_headers, **kwargs
+ )
+ self.status = self.response.status_code
+ self.ok = self.response.ok
+ self.text = self.response.text
+ self.headers = self.response.headers
+ # self.url = str(self.url)
+
+ def __repr__(self):
+ return f"<web.agent.Transaction object for {self.url}>"
+
+ @property
+ def location(self):
+ location = self.headers["location"]
+ if location.startswith("/"):
+ new_url = uri(self.response.url)
+ new_url.path = location
+ location = str(new_url)
+ return location
+
+ @property
+ def links(self):
+ return self.response.links
+
+ def link(self, name):
+ """
+ Fetch target and discover link `name` in HTML rels or HTTP Link header.
+
+ Skip HEAD request; use GET and attempt to cache.
+
+ """
+ try:
+ link = _get_header_link(self.headers["Link"], name)[0]
+ except (KeyError, IndexError):
+ try:
+ endpoint = uri(self.mf2json["rels"].get(name, [])[0])
+ except IndexError:
+ endpoint = None
+ else:
+ if link.startswith("/"):
+ endpoint = uri(self.url)
+ endpoint.path = link
+ else:
+ endpoint = uri(link)
+ return endpoint
+
+ @property
+ def xml(self):
+ try:
+ return lxml.etree.fromstring(self.text)
+ except ValueError:
+ return lxml.etree.fromstring(self.text.encode("utf-8"))
+
+ @property
+ def dom(self):
+ return Document(self.text, self.url)
+
+ @property
+ def json(self):
+ return json.loads(self.text)
+
+ @property
+ def mf2json(self):
+ return Semantics(mf.parse(Document(self.text, self.url).html, self.url))
+
+ @property
+ def card(self):
+ return Semantics(mf.representative_card(self.mf2json.data, str(self.url)))
+
+ @property
+ def feed(self):
+ # feed = mf.interpret_feed(self.mf2json.data, source_url=str(self.url))
+ # for entry in feed["entries"]:
+ # entry["post-type"] = mf.discover_post_type(entry)
+ # return Semantics(feed)
+ return Semantics(
+ mf.representative_feed(self.mf2json.data, str(self.url), self.dom)
+ )
+
+ @property
+ def entry(self):
+ return Semantics(mf.interpret_entry(self.mf2json.data, str(self.url)))
+
+ @property
+ def event(self):
+ return Semantics(
+ mf.interpret_event(self.mf2json.data, source_url=str(self.url))
+ )
+
+ def mention(self, *target_urls):
+ return Semantics(
+ mf.interpret_comment(self.mf2json.data, str(self.url), target_urls)
+ )
+
+ # @property
+ # def jf2(self):
+ # return Semantics(mf.interpret_feed(self.mf2json.data,
+ # source_url=self.url))
+
+
+class Semantics:
+ def __init__(self, data):
+ self.data = data
+
+ def __getitem__(self, item):
+ return self.data[item]
+
+ def __repr__(self):
+ return dump(self.data, indent=2)
+ # XXX return json.dumps(self.data, indent=2)
+
+ def _repr_html_(self):
+ return solarized.highlight(dump(self.data, indent=2), ".json")
+ # XXX return solarized.highlight(json.dumps(self.data, indent=2),
+ # ".json")
+
+ def __bool__(self):
+ return bool(self.data)
+
+
+def _get_header_link(link_header: str, search_rel: str):
+ links = []
+ for link in link_header.split(","):
+ resource, _, rel = link.partition(";")
+ match = re.match("""rel=['"](.+)['"]""", rel.strip())
+ if match and match.groups()[0] == search_rel:
+ links.append(resource.strip(" <>"))
+ return links
+
+
+class RequestFailed(Exception):
+ """A request has failed."""
+
+
+class Cache:
+ """A dictionary-like cache of the web."""
+
+ model = sqlyte.model(
+ "WebCache",
+ resources={
+ "origin": "TEXT",
+ "path": "TEXT",
+ "data": "JSON",
+ "headers": "JSON",
+ "title": "TEXT",
+ "html": "TEXT",
+ "UNIQUE": "(origin, path)",
+ },
+ search={
+ "origin": "",
+ "path": "",
+ "text": "",
+ "FTS": True,
+ },
+ )
+
+ def __init__(self, origin=None, db=None):
+ self.origin = origin
+ if not db:
+ db = sqlyte.db("cache.db", self.model)
+ self.db = db
+
+ def add(self, url):
+ url = self._make_url(url)
+ resource = get(url)
+ try:
+ title = resource.dom.select("title")[0].text
+ except IndexError:
+ title = None
+ try:
+ self.db.insert(
+ "resources",
+ origin=url.origin,
+ path=url.path,
+ data=resource.mf2json.data,
+ headers=dict(resource.headers),
+ title=title,
+ html=resource.text,
+ )
+ self.db.insert(
+ "search",
+ origin=url.origin,
+ path=url.path,
+ text=str(resource.mf2json.data),
+ )
+ except self.db.IntegrityError:
+ self.db.update(
+ "resources",
+ data=resource.mf2json.data,
+ headers=dict(resource.headers),
+ title=title,
+ html=resource.text,
+ where="origin = ? AND path = ?",
+ vals=[url.origin, url.path],
+ )
+ self.db.update(
+ "search",
+ origin=url.origin,
+ path=url.path,
+ text=str(resource.mf2json.data),
+ where="origin = ? AND path = ?",
+ vals=[url.origin, url.path],
+ )
+ return url, resource
+
+ def search(self, query):
+ return self.db.select(
+ "search AS s",
+ what="r.*, s.text",
+ where="search MATCH ?",
+ vals=[query],
+ join="resources AS r ON r.origin = s.origin AND r.path = s.path",
+ order="rank",
+ )
+
+ @property
+ def domains(self):
+ return [
+ (uri(r["origin"]), r["data"])
+ for r in self.db.select(
+ "resources", what="origin, data", order="origin ASC", group="origin"
+ )
+ ]
+
+ def forget_domain(self, domain):
+ return self.db.delete(
+ "resources",
+ where="origin = ? OR origin = ?",
+ vals=[f"https://{domain}", f"http://{domain}"],
+ )
+
+ def get_pages(self, domain):
+ return self.db.select(
+ "resources",
+ where="origin = ? OR origin = ?",
+ vals=[f"https://{domain}", f"http://{domain}"],
+ order="path ASC",
+ )
+
+ # XXX @property
+ # XXX def graph(self):
+ # XXX network = nx.DiGraph()
+ # XXX for url, resource in self.cache.items(): # TODO iterate over db items
+ # XXX # print(resource.links)
+ # XXX network.add_node(url)
+ # XXX return nx.draw(network, with_labels=True)
+
+ def _make_url(self, url):
+ if self.origin:
+ url = f"{self.origin}/{url}"
+ return uri(url)
+
+ def __getitem__(self, resource_url):
+ try:
+ url = self._make_url(resource_url)
+ resource_data = self.db.select(
+ "resources",
+ where="origin = ? AND path = ?",
+ vals=[url.origin, url.path],
+ )[0]
+ resource = Transaction(url, fetch=False)
+ resource.headers = resource_data["headers"] # TODO case-insen
+ resource.text = resource_data["html"]
+ except (AttributeError, IndexError):
+ url, resource = self.add(resource_url)
+ return resource
+
+
+cache = Cache
+
+
+def parse(html):
+ """Return a document object for given html."""
+ return Document(html)
+
+
+# XXX def apply_dns(url):
+# XXX if url.startswith("/"):
+# XXX return url
+# XXX url = easyuri.parse(url)
+# XXX if url.host == "alice.example":
+# XXX url = str(url).replace("http://alice.example", "http://127.0.0.1:8080")
+# XXX elif url.host == "bob.example":
+# XXX url = str(url).replace("http://bob.example", "http://127.0.0.1:8081")
+# XXX elif url.host == "hello.example":
+# XXX url = str(url).replace("http://hello.example", "http://127.0.0.1:8082")
+# XXX else:
+# XXX url = str(url)
+# XXX return url
+
+
+# XXX def unapply_dns(url):
+# XXX url = easyuri.parse(url)
+# XXX if url.host == "127.0.0.1":
+# XXX if url.port == 8080:
+# XXX url = str(url).replace("http://127.0.0.1:8080", "http://alice.example")
+# XXX elif url.port == 8081:
+# XXX url = str(url).replace("http://127.0.0.1:8081", "http://bob.example")
+# XXX elif url.port == 8082:
+# XXX url = str(url).replace("http://127.0.0.1:8082", "http://hello.example")
+# XXX else:
+# XXX url = str(url)
+# XXX return url
+
+
+class Document:
+
+ # TODO with html as dom: -- manipulate dom -- on exit html is modified
+
+ def __init__(self, html, url=None):
+ self.doc = lxml.html.fromstring(str(html).strip())
+ if url:
+ self.doc.make_links_absolute(str(url))
+
+ def select(self, selector):
+ els = []
+ for el in self.doc.cssselect(selector):
+ els.append(Element(el))
+ return els
+
+ @property
+ def children(self):
+ return self.doc.getchildren()
+
+ @property
+ def html(self):
+ return lxml.html.tostring(self.doc).decode()
+
+
+class Element:
+ def __init__(self, element):
+ self.element = element
+
+ def append(self, *html):
+ for _html in html:
+ self.element.append(_make_element(_html))
+
+ def select(self, selector):
+ els = []
+ for el in self.element.cssselect(selector):
+ els.append(Element(el))
+ return els
+
+ def replace(self, html):
+ self.element.getparent().replace(self.element, _make_element(html))
+
+ @property
+ def href(self):
+ try:
+ return self.element.attrib["href"]
+ except KeyError:
+ raise AttributeError("href")
+
+ @property
+ def text(self):
+ return self.element.text_content()
+
+
+def _make_element(html):
+ el = lxml.html.fromstring(f"<DOUGIE>{html}</DOUGIE>")
+ return el.cssselect("DOUGIE")[0].getchildren()[0]
+
+
+class Agent:
+ """"""
+
+ def __init__(self, user_agent=None, apps=None):
+ self.session = requests.Session()
+ if user_agent:
+ self.session.headers["User-Agent"] = user_agent
+
+ def get(self, url, **kwargs):
+ return Transaction(url, "GET", session=self.session, **kwargs)
+
+ def post(self, url, **kwargs):
+ return Transaction(url, "POST", session=self.session, **kwargs)
+
+
+agent = Agent
+
+
+class Firefox:
+ """Firefox via Selenium."""
+
+ By = By
+ EC = expected_conditions
+
+ def __init__(self, name=None, width=1024, height=768):
+ if not len(displays):
+ display = pyvirtualdisplay.Display(visible=False, size=(2048, 768))
+ display.start()
+ displays.append(display)
+ # profile = webdriver.FirefoxProfile()
+ # # profile.add_extension(
+ # # extension="/home/gaea/canopy/var/identities/"
+ # # "6c189616-4fe1-4f3f-84dc-c4a13ee9b155/"
+ # # "asteria/asteria-dev.xpi"
+ # # )
+ # binary = "/home/gaea/firefox/firefox-bin"
+ # self.browser = webdriver.Firefox(
+ # firefox_profile=profile,
+ # firefox_binary=binary,
+ # )
+ self.browser = webdriver.Firefox(
+ service=FirefoxService(GeckoDriverManager().install())
+ )
+ count = len(browsers)
+ browsers.append(self)
+ self._top = 0
+ self._left = count * 1024
+ self._width = width
+ self._height = height
+ self._update_window()
+ self.name = name
+ self.shot_id = 0
+ self.locked = False
+
+ @property
+ def width(self):
+ return self._width
+
+ @width.setter
+ def width(self, value):
+ self._width = value
+ self._update_window()
+
+ @property
+ def height(self):
+ return self._height
+
+ @height.setter
+ def height(self, value):
+ self._height = value
+ self._update_window()
+
+ def _update_window(self):
+ self.browser.set_window_rect(self._left, self._top, self._width, self._height)
+
+ def go(self, *args, wait=0):
+ if len(args) == 1:
+ url = args[0]
+ elif len(args) == 2:
+ url = "/".join(args)
+ # XXX url = apply_dns(url)
+ self.browser.get(url)
+ if wait:
+ time.sleep(wait)
+ return self
+ # XXX self.browser.get(str(easyuri.parse(url)))
+
+ def wait(self, *conditions, duration=20):
+ for condition in conditions:
+ time.sleep(0.1)
+ wait = WebDriverWait(self.browser, duration)
+ wait.until(condition)
+
+ def wait_until_url_contains(self, url):
+ # XXX self.wait(self.EC.url_contains(apply_dns(url)))
+ self.wait(self.EC.url_contains(url))
+
+ def select(self, selector):
+ return self.browser.find_element(By.CSS_SELECTOR, selector)
+
+ def select_first(self, selector):
+ return self.browser.find_elements(By.CSS_SELECTOR, selector)
+
+ def action(self):
+ return ActionChains(self.browser)
+
+ def shot(self, path):
+ # TODO take in pieces & stitch together -- using way too much memory
+ # self._height = self.browser.execute_script("return document.body."
+ # "scrollHeight;") + 100
+ # self._update_window()
+ self.browser.get_screenshot_as_file(f"{path}.png")
+
+ # XXX def shot_url(self):
+ # XXX base64 = self.browser.get_screenshot_as_base64()
+ # XXX return f"data:image/png;BASE64,{base64}"
+
+ def shot_url(self, filename=None):
+ # XXX grab = pyscreenshot.grab(bbox=(0, 0, 920, 920)).tobytes()
+ # XXX base64png = b"".join(base64.encodebytes(grab).splitlines())
+ self.shot_id += 1
+ if not filename:
+ filename = f"{self.name}-{self.shot_id}.png"
+ placement = browsers.index(self)
+ coords = (1024 * placement, 0, 1024 * (placement + 1), 768)
+ # import sh
+ # sh.Command("import")("-screen", "-window", "root", filename)
+ # time.sleep(2)
+ # sh.Command("import")("-window", "root", filename)
+ # sh.convert(sh.xwd("-root", "-screen"), "xwd:-", f"png:{filename}")
+ pyscreenshot.grab(bbox=coords).save(filename)
+ return filename
+
+ def quit(self):
+ try:
+ self.browser.quit()
+ except selenium.common.exceptions.WebDriverException:
+ pass
+ if displays:
+ try:
+ displays[0].stop()
+ except KeyError: # raising during multi-user testing
+ pass
+
+ def __getattr__(self, attr):
+ return getattr(self.browser, attr)
+
+ def _repr_html_(self):
+ return f"<img class=screen src={self.shot_url()}>"
+ # url = unapply_dns(self.current_url)
+ # site_character = url.partition("//")[2].partition(".")[0]
+ # TODO FIXME remove hard-coded IndieWeb..
+ # return (f"<div class=screenshot>"
+ # f"<div class=browser><small>{self.name}'s "
+ # f"Browser</small></div>"
+ # f"<div class=tab><img src=/IndieWeb/{site_character}16.png>"
+ # f" {self.title}</div>"
+ # f"<div class=address><small><code>{url}</code></small></div>"
+ # f"<img class=screen src={self.shot_url()}></div>")
+
+
+firefox = Firefox
+
+
+# def shot(name, description): # XXX , *browsers):
+# test_case = inspect.stack()[1].function
+# global shot_counter
+# shot_id = "{:03}".format(shot_counter)
+# dashed_name = name.replace(" ", "-")
+# for user, browser in sorted(browsers.items()):
+# shot_filename = "{}-{}-{}.png".format(shot_id, user, dashed_name)
+# height = browser.execute_script("return document.body.scrollHeight;")
+# browser.set_window_size(browser_width, height + 100)
+# browser.get_screenshot_as_file(str(build_dir / "features" /
+# shot_filename))
+# features.append((test_case, shot_id, dashed_name, name, description))
+# # XXX , shot_filename, [user for u in browsers.keys()]))
+# shot_counter += 1
+
+
+@main.register()
+class Parse:
+ """Get a resource and parse it for Microformats."""
+
+ def setup(self, add_arg):
+ add_arg("uri", help="address of the resource")
+
+ def run(self, stdin, log):
+ pprint(webagt.get(self.uri).mf2json.data)
+ return 0