"""Search the web from your website."""
import collections
import random
import re
import sqlite3
import string
import subprocess
import black
import easyuri
import eng_to_ipa
import nltk
import pint
import pronouncing
import requests
import typesense
import web
import webagt
import webint_owner
import wn
import youtube_search
from RestrictedPython import (
compile_restricted,
limited_builtins,
safe_builtins,
utility_builtins,
)
from RestrictedPython.Eval import (
default_guarded_getattr,
default_guarded_getitem,
default_guarded_getiter,
)
from RestrictedPython.PrintCollector import PrintCollector
app = web.application(__name__, prefix="search")
client = typesense.Client(
{
"nodes": [
{
"host": "localhost",
"port": "8108",
"protocol": "http",
}
],
"api_key": "hpAnnsIdJse2NejW8RFKKRZ8z2lfhRjWCNtWWvwNFWXTyB1Y",
"connection_timeout_seconds": 2,
}
)
ureg = pint.UnitRegistry()
books_schema = {
"name": "books",
"fields": [
{"name": "title", "type": "string"},
{"name": "authors", "type": "string[]", "facet": True},
{"name": "publication_year", "type": "int32", "facet": True},
{"name": "ratings_count", "type": "int32"},
{"name": "average_rating", "type": "float"},
],
"default_sorting_field": "ratings_count",
}
# client.collections.create(books_schema)
# with open("/tmp/books.jsonl") as jsonl_file:
# client.collections["books"].documents.import_(jsonl_file.read().encode("utf-8"))
@app.wrap
def linkify_head(handler, main_app):
"""Ensure OpenSearch document is referenced from homepage."""
yield
if web.tx.request.uri.path == "":
web.add_rel_links(
search=(
"/search/opensearch.xml",
{
"type": "application/opensearchdescription+xml",
"title": "Angelo Gladding",
},
)
)
def search_youtube(query):
return youtube_search.YoutubeSearch(query, max_results=10).to_dict()
IW_HANDLE_RE = r"^@(?P<domain>[\w.]+)$"
AP_HANDLE_RE = r"^@(?P<user>[\w.]+)@(?P<domain>[\w.]+)$"
def iw_lookup(handle):
match = re.match(IW_HANDLE_RE, handle)
if match is None:
return
(domain,) = match.groups()
return webagt.get(domain).card
def ap_lookup(handle):
match = re.match(AP_HANDLE_RE, handle)
if match is None:
return
user, domain = match.groups()
for link in requests.get(
f"https://{domain}/.well-known/webfinger?resource=acct:{user}@{domain}",
headers={"Accept": "application/activity+json"},
).json()["links"]:
if link["rel"] == "self":
identity_url = link["href"]
break
else:
return
return webint_owner.ap_request(identity_url)
@app.control("")
class Search:
"""Search everything."""
def get(self):
"""Return an index of data sources."""
try:
form = web.form("q")
except web.BadRequest:
return app.view.index()
query = form.q
if not query:
raise web.SeeOther("/search")
conversion = None
units = {
# length
"fm": ("femtometer", "femtometer", "femtometers"),
"pm": ("picometer", "picometer", "picometers"),
"nm": ("nanometer", "nanometer", "nanometers"),
"μm": ("micron", "micron", "microns"),
"mm": ("millimeter", "millimeter", "millimeters"),
"cm": ("centimeter", "centimeter", "centimeters"),
"dm": ("decimeter", "decimeter", "decimeters"),
"m": ("meter", "meter", "meters"),
"dam": ("decameter", "decameter", "decameters"),
"hm": ("hectometer", "hectometer", "hectometers"),
"km": ("kilometer", "kilometer", "kilometers"),
"au": ("astronomical unit", "astronomical unit", "astronomical units"),
"ly": ("light year", "light year", "light years"),
"pc": ("parsec", "parsec", "parsecs"),
"mil": ("mil", "mil", "mils"),
"in": ("inch", "inch", "inches"),
"ft": ("foot", "foot", "feet"),
"yd": ("yard", "yard", "yards"),
"mi": ("mile", "mile", "miles"),
# temperature
"f": ("fahrenheit", "°F"),
"c": ("celsius", "°C"),
"k": ("kelvin", "°K"),
# area
r"sq\ in": ("square inch", "square inch", "square inches"),
r"sq\ ft": ("square foot", "square foot", "square feet"),
r"sq\ yd": ("square yard", "square yard", "square yards"),
r"sq\ mi": ("square mile", "square mile", "square miles"),
"acre": ("acre", "acre", "acres"),
"hectare": ("hectare", "hectare", "hectares"),
# volume
r"cu\ in": ("cubic inch", "cubic inch", "cubic inches"),
r"cu\ ft": ("cubic foot", "cubic foot", "cubic feet"),
r"cu\ yd": ("cubic yard", "cubic yard", "cubic yards"),
r"cu\ cm": ("cubic centimeter", "cubic centimeter", "cubic centimeter"),
"μl": ("microliter", "microliter", "microliters"),
"l": ("liter", "liter", "liters"),
"gal": ("gallon", "gallon", "gallons"),
"qt": ("quart", "quart", "quarts"),
"pt": ("pint", "pint", "pints"),
"cup": ("cup", "cup", "cups"),
"fl oz": ("fluid ounce", "fluid ounce", "fluid ounces"),
"tbls": ("tablespoon", "tablespoon", "tablespoons"),
"tspn": ("teaspoon", "teaspoon", "teaspoons"),
# weight
"μg": ("microgram", "microgram", "micrograms"),
"mg": ("milligram", "milligram", "millagrams"),
"g": ("gram", "gram", "grams"),
"kg": ("kilogram", "kilogram", "kilograms"),
"ct": ("carat", "carat", "carats"),
"oz": ("ounce", "ounce", "ounces"),
"lb": ("pound", "pound", "pounds"),
"tn": ("short ton", "short ton", "short tons"),
"t": ("metric ton", "metric ton", "metric tons"),
"lt": ("long ton", "long ton", "long tons"),
# time
"as": ("attosecond", "attosecond", "attoseconds"),
"fs": ("femtosecond", "femtosecond", "femtoseconds"),
"ps": ("picosecond", "picosecond", "picoseconds"),
"ns": ("nanosecond", "nanosecond", "nanoseconds"),
"sh": ("shake", "shake", "shakes"),
"μs": ("microsecond", "microsecond", "microseconds"),
"ms": ("millisecond", "millisecond", "milliseconds"),
"s": ("second", "second", "seconds"),
"min": ("minute", "minute", "minutes"),
"h": ("hour", "hour", "hours"),
"d": ("day", "day", "days"),
"wk": ("week", "week", "weeks"),
"fn": ("fortnight", "fortnight", "fortnights"),
"mo": ("month", "month", "months"),
"yr": ("year", "year", "years"),
"dec": ("decade", "decade", "decades"),
"cen": ("century", "century", "centuries"),
"ml": ("millennium", "millennium", "millennia"),
# data
"bit": ("bit", "bit", "bits"),
"kb": ("kilobit", "kilobit", "kilobits"),
"Mb": ("megabit", "megabit", "megabits"),
"Gb": ("gigabit", "gigabit", "gigabits"),
"Tb": ("terabit", "terabit", "terabits"),
"Pb": ("petabit", "petabit", "petabit"),
"Eb": ("exabit", "exabit", "exabit"),
"byte": ("byte", "bytes", "bytes"),
"kB": ("kilobyte", "kilobyte", "kilobytes"),
"MB": ("megabyte", "megabyte", "megabytes"),
"GB": ("gigabyte", "gigabyte", "gigabytes"),
"TB": ("terabyte", "terabyte", "terabytes"),
"PB": ("petabyte", "petabyte", "petabytes"),
"EB": ("exabyte", "exabyte", "exabytes"),
# speed
"kph": ("kilometer per hour", "kilometer/hour", "kilometers/hour"),
"kps": ("kilometer per second", "kilometer/second", "kilometers/second"),
"mph": ("mile per hour", "mile/hour", "miles/hour"),
}
if match := re.match(
rf"""^(?P<quantity>[\d.]+)(?P<from>({'|'.join(units)}))
\ to\ (?P<to>({'|'.join(units)}))$""",
query,
re.VERBOSE,
):
matches = match.groupdict()
from_quantity = float(matches["quantity"])
from_sig = len(matches["quantity"].partition(".")[2])
from_unit = units[matches["from"].replace(" ", r"\ ")]
to_unit = units[matches["to"].replace(" ", r"\ ")]
to_quantity = ureg.convert(
float(from_quantity),
getattr(ureg, from_unit[0].replace(" ", "_")),
getattr(ureg, to_unit[0].replace(" ", "_")),
)
output_from_unit = from_unit[1]
if len(from_unit) == 3 and from_quantity != 1:
output_from_unit = from_unit[2]
output_to_unit = to_unit[1]
if len(to_unit) == 3 and to_quantity != 1:
output_to_unit = to_unit[2]
conversion = (
f"{round(from_quantity, from_sig):n} {output_from_unit}",
f"{round(to_quantity, from_sig):n} {output_to_unit}",
)
iw_profile = iw_lookup(query)
ap_profile = ap_lookup(query)
builtins = dict(safe_builtins)
builtins.update(**limited_builtins)
builtins.update(**utility_builtins)
env = {
"__builtins__": builtins,
"_getiter_": default_guarded_getiter,
"_getattr_": default_guarded_getattr,
"_getitem_": default_guarded_getitem,
}
secret = "".join(random.choices(string.ascii_lowercase, k=20))
try:
formatted_query = black.format_str(query, mode=black.mode.Mode()).rstrip()
except black.parsing.InvalidInput:
formatted_query = None
try:
exec(compile_restricted(f"{secret} = {query}", "<string>", "exec"), env)
except Exception as err:
result = None
else:
result = env[secret]
if re.match(r"^[0-9A-Za-z_-]{10}[048AEIMQUYcgkosw]$", query):
raise web.SeeOther(f"/player/{query}")
if query.startswith("!"):
bang, _, query = query[1:].partition(" ")
match bang:
case "yt":
return app.view.youtube_results(query, search_youtube(query))
case "imdb":
web.tx.response.headers["Referrer-Policy"] = "no-referrer"
url = easyuri.parse("https://www.imdb.com/find/")
url["q"] = query
raise web.SeeOther(url)
case "ud":
web.tx.response.headers["Referrer-Policy"] = "no-referrer"
url = easyuri.parse("https://www.urbandictionary.com/define.php")
url["term"] = query
raise web.SeeOther(url)
word = query
snow = nltk.stem.SnowballStemmer("english")
stem = snow.stem(query)
ipa_pronunciation = None
cmu_pronunciation = None
definition = None
rhymes = []
try:
en = wn.Wordnet("oewn:2022")
except (sqlite3.OperationalError, wn.Error):
web.enqueue(subprocess.run, ["python", "-m", "wn", "download", "oewn:2022"])
else:
try:
definition = en.synsets(query)[0].definition()
except IndexError:
try:
definition = en.synsets(stem)[0].definition()
except IndexError:
pass
if definition:
ipa_pronunciation = eng_to_ipa.convert(query)
try:
cmu_pronunciation = pronouncing.phones_for_word(query)[0]
except IndexError:
pass
rhymes = pronouncing.rhymes(query)
if web.tx.user.is_owner:
web_results = [
(
webagt.uri(webagt.uri(result.element.attrib["href"])["uddg"][0]),
result.element.text if result.element.text is not None else "",
)
for result in webagt.get(
f"https://html.duckduckgo.com/html?q={query}"
).dom.select(".result__a")
]
else:
web_results = None
code_projects = collections.Counter()
code_files = collections.defaultdict(list)
for code_project, code_file in web.application("webint_code").model.search(
query
):
code_projects[code_project] += 1
code_files[code_project].append(code_file)
# books = client.collections["books"].documents.search(
# {
# "q": query,
# "query_by": "authors,title",
# "sort_by": "ratings_count:desc",
# }
# )
books = {}
return app.view.results(
query,
# scope,
conversion,
iw_profile,
ap_profile,
formatted_query,
result,
ipa_pronunciation,
cmu_pronunciation,
definition,
rhymes,
web_results,
code_projects,
code_files,
books,
)
@app.control("opensearch.xml")
class OpenSearch:
""""""
def get(self):
web.header("Content-Type", "application/xml; charset=utf-8")
return bytes(str(app.view.opensearch()), "utf-8")
@app.control("collections")
class Collections:
""""""
def get(self):
return app.view.collections(client.collections.retrieve())