"""Analyze personal websites."""
import collections
import hashlib
import logging
import os
import pathlib
import subprocess
import time
import PIL
import requests
import web
import webagt
import webint_data
import webint_jobs
import webint_owner
import webint_system
import whois
from reportlab.graphics import renderPM
from svglib.svglib import svg2rlg
from web import tx
from .utils import silos
logging.basicConfig(level=logging.DEBUG, filename="crawl.log", filemode="w", force=True)
app = web.application(
__name__,
args={
"site": r"[a-z\d.-]+\.[a-z]+",
"page": r".*",
},
model={
"redirects": {
"incoming": "TEXT UNIQUE NOT NULL",
"outgoing": "TEXT NOT NULL",
},
"resources": {
"url": "TEXT UNIQUE NOT NULL",
"crawled": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
"details": "JSON NOT NULL",
},
},
mounts=[webint_data.app, webint_owner.app, webint_jobs.app, webint_system.app],
)
sites_path = pathlib.Path("sites")
sites_path.mkdir(exist_ok=True)
agent = webagt.Agent("IndieWebRocksBot")
blocklist = ["accounts.google.com"]
ignored_rels = [
"author",
"bookmark",
"canonical",
"category",
"contents",
"home",
"nofollow",
"noreferrer",
"noopener",
"pingback",
"profile",
"shortcut",
"shortlink",
"syndication",
"tag",
"ugc",
]
social_network_rels = ["acquaintance", "colleague", "friend", "met"]
def refresh_domain(domain):
"""Fetch `domain` and store site details and related media."""
if domain in blocklist or not webagt.uri(domain).suffix:
logging.debug(f"skipping {domain}")
return
# TODO logging.debug("getting previous details..") # for etag
start = time.time()
logging.debug("downloading HTML..")
try:
response = agent.get(domain)
except (requests.ConnectionError, requests.Timeout) as err:
return {"status": "not responding", "error": str(err)}
if domain != response.url.host:
try:
tx.db.insert("redirects", incoming=domain, outgoing=response.url.host)
except tx.db.IntegrityError:
tx.db.update(
"redirects",
outgoing=response.url.host,
where="incoming = ?",
vals=[domain],
)
refresh_domain(response.url.host)
return
domain_details = webagt.uri(domain)
try:
tx.db.insert(
"resources",
url=domain,
details={
"metaverse": hashlib.sha256(domain.encode("utf-8")).hexdigest().upper(),
"domain": {
"name": domain,
"suffix": domain_details.suffix,
"hsts": domain_details.in_hsts,
},
},
)
web.enqueue(query_whois, domain)
except tx.db.IntegrityError:
pass
site_path = sites_path / domain
site_path.mkdir(parents=True, exist_ok=True)
web.enqueue(run_lighthouse, domain)
web.enqueue(run_pa11y, domain)
update_details = get_updater(domain)
update_details(
accessed=web.now().to_iso8601_string(),
response={
"status": response.status,
"time": time.time() - start,
"headers": dict(response.headers),
"length": round(len(response.text) / 1000),
},
)
logging.debug("parsing Microformats..")
mf2json = response.mf2json
rels = dict(mf2json["rels"])
if authorization_endpoint := rels.pop("authorization_endpoint", None):
indieauth_details = {"authorization_endpoint": authorization_endpoint}
if token_endpoint := rels.pop("token_endpoint", None):
indieauth_details["token_endpoint"] = token_endpoint
update_details(indieauth=indieauth_details)
if indieauth_metadata_endpoint := rels.pop("indieauth-metadata", None):
web.enqueue(get_indieauth_metadata, domain, indieauth_metadata_endpoint[0])
if search := rels.pop("search", None):
web.enqueue(get_search_description, domain, search[0])
if manifest := rels.pop("manifest", None):
web.enqueue(get_manifest, domain, manifest[0])
if hub_endpoint := rels.pop("hub", None):
web.enqueue(
get_websub_hub, domain, hub_endpoint[0], rels.pop("self", [domain])[0]
)
web.enqueue(get_activitypub, domain)
card = response.card
update_details(mf2json=mf2json, card=card, rels=rels)
photo_url = rels.pop("apple-touch-icon", None)
card_type = None
if card:
card_type = "person"
if card_org := card.get("org"):
if card["name"][0] == card_org[0]:
card_type = "organization"
if emails := card.get("email"):
gravatars = {}
for email in emails:
email = email.removeprefix("mailto:")
gravatars[email] = hashlib.md5(
email.strip().lower().encode("utf-8")
).hexdigest()
# TODO SET `gravatars`
if photo_urls := card.get("photo"): # TODO move to on-demand like icon?
if isinstance(photo_urls[0], dict):
photo_url = photo_urls[0]["value"]
else:
photo_url = photo_urls[0]
try:
icon_url = rels.pop("icon")[0]
except KeyError:
icon_url = f"{domain}/favicon.ico"
web.enqueue(get_media, domain, photo_url, icon_url)
scripts = []
for script in response.dom.select("script"):
script_details = dict(script.element.attrib)
script_details["content_length"] = len(script.text)
script_details["text"] = script.text
scripts.append(script_details)
stylesheets = rels.pop("stylesheet", [])
for stylesheet in response.dom.select("style"):
stylesheets.append(
{
"content_length": len(stylesheet.text),
"text": stylesheet.text,
}
)
whostyle = rels.pop("whostyle", None)
try:
title = response.dom.select("title")[0].text
except IndexError:
title = ""
update_details(
scripts=scripts, stylesheets=stylesheets, whostyle=whostyle, title=title
)
for ignored_rel in ignored_rels:
rels.pop(ignored_rel, None)
social_network = {}
for social_network_rel in social_network_rels:
if people_rels := rels.pop(social_network_rel, None):
social_network[social_network_rel] = people_rels
logging.debug("determining reciprocal rel=me..")
reciprocals = set()
rel_me_silos = []
for silo, silo_details in silos.items():
if len(silo_details) == 3:
rel_me_silos.append(silo_details[0])
rel_mes = rels.pop("me", [])
url = webagt.uri(domain) # TODO XXX
for me_url in rel_mes:
if not me_url.startswith(("http", "https")):
continue
me_url = webagt.uri(me_url)
logging.debug(f" rel=me {me_url}")
if (me_url.domain, me_url.suffix) == ("twitter", "com"):
if "/" in me_url.path:
continue
twitter_id = me_url.path.split("/")[0]
twitter_bearer = os.getenv("TWITTER")
twitter_profile = agent.get(
f"https://api.twitter.com/2/users"
f"/by/username/{twitter_id}?user.fields=url",
headers={"Authorization": f"Bearer {twitter_bearer}"},
).json["data"]
if twitter_profile_url := twitter_profile.get("url", None):
try:
recip_url = agent.get(twitter_profile_url).url
except requests.Timeout:
continue
if recip_url == url:
reciprocals.add(me_url.minimized)
if (me_url.subdomain, me_url.domain, me_url.suffix) == (
"en",
"wikipedia",
"org",
):
recip_url = agent.get(me_url).mf2json["items"][0]["properties"]["url"][0]
if recip_url == url:
reciprocals.add(me_url.minimized)
if me_url.host not in rel_me_silos:
continue
try:
reverse_rel_mes = agent.get(me_url).mf2json["rels"]["me"]
except KeyError:
continue
for reverse_rel_me in reverse_rel_mes:
if webagt.uri(reverse_rel_me).minimized == url.minimized:
reciprocals.add(me_url.minimized)
update_details(
social_network=social_network, reciprocals=list(reciprocals), rel_me=rel_mes
)
return
# XXX feed = page.feed
# alt_feed_urls = set()
# if not feed["entries"]:
# try:
# alt_feed_urls = set(rels["home"]) & set(rels["alternate"])
# except KeyError:
# pass
# alternate_reprs = rels.pop("alternate", [])
# alternate_feeds = rels.pop("feed", [])
# if not feed["entries"]:
# for alt_feed_url in alternate_reprs + alternate_feeds:
# try:
# feed = agent.get(alt_feed_url).feed
# except ValueError: # XML feed
# pass
# finally:
# print("using", alt_feed_url)
# # rels.pop("alternate", None)
# for entry in feed["entries"]:
# try:
# published = entry["published"]
# permalink = entry["url"]
# entry.pop("published-str")
# except KeyError:
# continue
# entry.pop("uid", None)
# # TODO refresh_page(permalink)
# logging.debug("archiving to WARC..")
# warc_file = site_path / "warc_output"
# subprocess.run(
# [
# "wget",
# "-EHkpq",
# site,
# f"--warc-file={warc_file}",
# "--no-warc-compression",
# "--delete-after",
# ]
# )
logging.debug("calculating IndieMark score..")
scores = [
[(3, None)] * 10,
[(3, None)] * 10,
[(3, None)] * 10,
[(3, None)] * 10,
[(3, None)] * 10,
]
# L1 Identity
if card:
if "icon" in rels:
scores[0][0] = (0, "contact info and icon on home page")
else:
scores[0][0] = (1, "contact info but no icon on home page")
else:
scores[0][0] = (2, "no contact info on home page")
# L1 Authentication
if rel_mes:
scores[0][1] = (
1,
"<code>rel=me</code>s found but none for GitHub or Twitter",
)
for rel_me in rel_mes:
if rel_me.startswith(("https://github.com", "https://twitter.com/")):
scores[0][1] = (
0,
"<code>rel=me</code>s found for GitHub and/or Twitter",
)
break
else:
scores[0][1] = (2, "no <code>rel=me</code>s found")
# L1 Posts
if feed["entries"]:
if len(feed["entries"]) > 1:
scores[0][2] = (0, "more than one post")
else:
scores[0][2] = (1, "only one post")
else:
scores[0][2] = (2, "no posts")
# L1 Search
if details["ddg"]:
scores[0][6] = (0, "your content was found on DuckDuckgo")
else:
scores[0][6] = (
1,
"your content was <strong>not</strong> found on DuckDuckgo",
)
# L1 Interactivity
scores[0][8] = (0, "content is accessible (select/copy text/permalinks)")
# L2 Identity
scores[1][0] = (0, "you've linked to silo profiles")
# L3 'h-card contact info and icon on homepage'
# L3 'multiple post types'
# L3 'POSSE'
# L3 'Posting UI'
# L3 'Next/Previus Navigation between posts'
# L3 'Search box on your site'
# L3 'Embeds/aggregation'
# L3 'Web Actions'
# L4 'Send Webmentions'
# L4 'PubSubHubbub support'
# L4 'Display Search Results on your site'
# L4 'Display Reply Context'
# L5 'Automatic Webmentions'
# L5 'Handle Webmentions'
# L5 'Display full content rich reply-contexts'
# L5 'Search on your own search backend'
# L5 'Multiple Reply Types'
# L5 'Display Backfeed of Comments'
details["scores"] = scores
# logging.debug("dumping details..")
# details["stored"] = web.now().to_iso8601_string()
# web.dump(details, path=site_path / "details.json")
logging.debug("generating scoreboard..")
subprocess.run(["node", "../index.js", site])
def get_updater(url):
"""Return an update function catered to `domain`."""
def update_details(**kwargs):
"""Atomically update the resource's details with `kwargs`."""
keys = ", ".join([f"'$.{key}', json(?)" for key in kwargs.keys()])
tx.db.update(
"resources",
what=f"details = json_set(details, {keys})",
where="url = ?",
vals=[web.dump(v) for v in kwargs.values()] + [url],
)
return update_details
def query_whois(domain):
"""Update the creation date for the domain."""
logging.debug("querying WHOIS")
domain_created = whois.whois(domain)["creation_date"]
if isinstance(domain_created, list):
domain_created = domain_created[0]
try:
domain_created = domain_created.isoformat()
except AttributeError:
pass
get_updater(domain)(**{"domain.created": domain_created})
def get_media(domain, photo_url, icon_url):
"""Download the representative photo for the domain."""
site_path = sites_path / domain
if photo_url:
logging.debug("downloading representative photo..")
filename = photo_url.rpartition("/")[2]
suffix = filename.rpartition(".")[2]
if not suffix:
suffix = "jpg"
original = site_path / f"photo.{suffix}"
webagt.download(photo_url, original)
final = site_path / "photo.png"
if suffix != "png":
if suffix == "svg":
drawing = svg2rlg(original)
renderPM.drawToFile(drawing, final, fmt="PNG")
else:
try:
image = PIL.Image.open(original)
except PIL.UnidentifiedImageError:
pass
else:
image.save(final)
logging.debug("downloading iconography..")
final = site_path / "icon.png"
filename = icon_url.rpartition("/")[2]
suffix = filename.rpartition(".")[2]
original = site_path / f"icon.{suffix}"
try:
download = webagt.download(icon_url, original)
except web.ConnectionError:
pass
else:
if download.status == 200 and suffix != "png":
try:
image = PIL.Image.open(original)
except PIL.UnidentifiedImageError:
pass
else:
image.save(final)
def get_indieauth_metadata(domain, indieauth_metadata_endpoint):
"""Download IndieAuth metadata for the domain."""
logging.debug("downloading IndieAuth metadata..")
metadata = agent.get(indieauth_metadata_endpoint).json
get_updater(domain)(**{"indieauth": {"metadata": metadata}})
def get_search_description(domain, search_url):
"""Download OpenSearch description document at `search_url`."""
logging.debug("downloading OpenSearch description..")
search_xml = agent.get(search_url).xml
search_url = webagt.uri(search_xml.find("Url", search_xml.nsmap).attrib["template"])
search_endpoint = f"//{search_url.host}/{search_url.path}"
name = None
for name, values in search_url.query.items():
if values[0] == "{template}":
break
get_updater(domain)(**{"search_url": [search_endpoint, name]})
def get_manifest(domain, manifest_url):
"""Download site manifest at `manifest_url`."""
logging.debug("downloading site manifest..")
# if "patches" in web.get(manifest_url).headers:
# get_updater(domain)(**{"manifest": "hot"})
webagt.download(manifest_url, sites_path / domain / "manifest.json")
def get_websub_hub(domain, endpoint, self):
"""Subscribe to site via WebSub `endpoint`."""
# TODO subscribe if not already
logging.debug("subscribing to WebSub hub..")
get_updater(domain)(**{"hub": [endpoint, self]})
def run_lighthouse(domain):
"""Run lighthouse for the domain."""
logging.debug("running lighthouse..")
subprocess.Popen(
[
"lighthouse",
f"https://{domain}",
"--output=json",
f"--output-path={sites_path}/{domain}/audits.json",
"--only-audits=total-byte-weight",
'--chrome-flags="--headless"',
"--quiet",
],
stdout=subprocess.PIPE,
).stdout.read()
def run_pa11y(domain):
"""Run pa11y for the domain."""
site_path = sites_path / domain
logging.debug("running pa11y..")
web.dump(
web.load(
subprocess.Popen(
[
"pa11y",
domain,
"--reporter",
"json",
"--screen-capture",
site_path / "site.png",
],
stdout=subprocess.PIPE,
).stdout.read()
),
path=site_path / "a11y.json",
)
found_icon = True # TODO XXX
logging.debug("finding most used color, generating images..")
try:
screenshot = PIL.Image.open(site_path / "site.png")
except FileNotFoundError:
pass
else:
colors = collections.Counter()
for x in range(screenshot.width):
for y in range(screenshot.height):
colors[screenshot.getpixel((x, y))] += 1
most_used_color = colors.most_common()[0][0]
icon = PIL.Image.new("RGB", (1, 1), color=most_used_color)
if not found_icon:
icon.save(site_path / "icon.png")
if not (site_path / "photo.png").exists():
icon.save(site_path / "photo.png")
def get_activitypub(domain):
webfinger = agent.get(f"https://{domain}/.well-known/webfinger")
print(webfinger.json)
@app.query
def get_categories(db):
categories = collections.Counter()
with db.transaction as cur:
for post in cur.cur.execute(
"select json_extract(resources.details, '$.category') "
"AS categories from resources"
):
if not post["categories"]:
continue
if post_categories := web.load(post["categories"]):
for post_category in post_categories:
categories[post_category] += 1
return categories
@app.query
def get_resources(db):
return db.select(
"resources",
where="crawled > ?",
vals=[web.now().subtract(days=7)],
order="crawled DESC",
)
@app.query
def get_posts(db):
return []
@app.query
def get_people(db):
return {
url: details["card"]
for url, details in tx.db.select(
"resources", what="url, details", order="url ASC"
)
}
@app.query
def get_people_details(db):
return tx.db.select("people", order="url ASC")
# @app._model.migrate(1)
# def add_redirects(db):
# db.create("redirects", "incoming TEXT UNIQUE NOT NULL, outgoing TEXT NOT NULL")