Satisfy pyright
Committed 5f3298
index ed8fdde..0000000
--- a/pyrightconfig.json
-{
- "reportGeneralTypeIssues": false
-}
--- a/tests/test_indieweb.py
+++ b/tests/test_indieweb.py
# import pytest
import requests
import web
-
-# from PIL import ImageGrab
+from PIL import ImageGrab
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
-from selenium.webdriver.support.ui import WebDriverWait
+from selenium.webdriver.support.wait import WebDriverWait
from web import tx
from webdriver_manager.firefox import GeckoDriverManager
--- a/web/framework/__init__.py
+++ b/web/framework/__init__.py
"serve",
"anti_csrf",
"form",
- "secure_form",
- "get_nonce",
- "get_token",
"best_match",
- # XXX "sessions",
"require_auth",
"tx",
"header",
"nbrandom",
"config_templates",
"generate_cert",
- "get_integrity_factory",
"now",
"dump",
"load",
"default_session_timeout",
"textslug",
"get_host_hash",
- "config_servers",
"b64encode",
"b64decode",
"timeslug",
"enqueue",
- "run_redis",
- "kill_redis",
"get_apps",
"get_sites",
"nb60_re",
return root
-# TODO rename
-def config_servers(root, web_server_config_handler=None):
- """Update symlinks to config files and reload nginx."""
- root_hash = get_host_hash(root)
- # TODO FIXME XXX figure out what to do here...
- nginx_conf = pathlib.Path("/home/admin/detritus/nginx/conf")
- sh.ln("-sf", (root / "etc/nginx.conf").resolve(), nginx_conf / "nginx.conf")
- if web_server_config_handler:
- web_server_config_handler(nginx_conf / "conf.d", root_hash)
- sh.sudo(
- "ln",
- "-sf",
- (root / "etc/supervisor.conf").resolve(),
- "/etc/supervisor/conf.d/{}.conf".format(root_hash),
- )
-
-
class StandaloneServer(gunicorn.app.base.BaseApplication):
"""A standalone gunicorn webapp context for the canopy."""
return dict(handlers)[best_match](*args, **kwargs)
-def get_integrity_factory(template_pkg):
- """Generate integrity hashes on the fly for local assets."""
-
- def handler(fn):
- static_assets_dir = pathlib.Path(template_pkg).parent.parent / "static"
- return sh.base64(
- sh.xxd(sh.sha256sum("-b", static_assets_dir / fn), "-r", "-p")
- ).strip()
-
- return handler
-
-
def parse_dt(dt: str) -> pendulum.datetime.DateTime:
"""Return the current datetime."""
- return pendulum.parse(dt)
+ return pendulum.parser.parse(dt)
def now() -> pendulum.datetime.DateTime:
yield
-def get_nonce():
- """"""
- return nbrandom(32)
-
-
-def get_token(nonce):
- """
- return a token for given `secret` and `nonce`
-
- secret can be a string or a Redis instance in which key `secret`
-
- """
- secret = str(kvdb["auth:secret"])
- return hashlib.sha1(bytes(secret + nonce, "utf-8")).hexdigest()
-
-
-def secure_form(*args, **kwargs):
- """"""
- args = list(args) + ["token", "nonce"]
- form = Form(*args, **kwargs)
- if not kvdb["auth:nonces"].add(form["nonce"]):
- raise OK("given `nonce` has already been used")
- if get_token(form["nonce"]) != form["token"]:
- raise OK("invalid `token` for given `nonce`")
- form.pop("token")
- form.pop("nonce")
- return form
-
-
class Form(dict):
-
- """ """
-
def __init__(self, *requireds, **defaults):
_data = tx.request.body._data
for required in requireds:
except AttributeError:
def loader():
- None
+ pass
data = loader()
self.data = data
return handler()._get(self, *args, **kwargs)
@classmethod
- def _get(resource, parent, **kwargs):
+ def _get(cls, parent, **kwargs):
""""""
header_shift = kwargs.pop("shift_headings", None)
if "_data" in parent.__dict__:
parent.__dict__.pop("data", None) # NOTE magic name "data" is reserved
kwargs.update(parent.__dict__)
try:
- handler = resource(**kwargs)
+ handler = cls(**kwargs)
handler.get_data()
content = handler.get()
except NotFound:
try:
content = bytes(content, "utf-8")
except TypeError:
- content
+ pass
return [content]
try:
if path.endswith("/") and not path.startswith("static/"):
raise PermanentRedirect("/" + path.rstrip("/"))
+ outer_self = self
+
class ResourceNotFound(Resource):
- def get(inner_self):
+ def get(self):
raise NotFound("Resource not found") # TODO XXX
- error = self.view.error
+ error = outer_self.view.error
# TODO recursively ascend app ancestors
# try:
- # error = self.parent_app.view.error
+ # error = outer_self.parent_app.view.error
# except AttributeError:
- # error = self.parent_app.parent_app.view.error
+ # error = outer_self.parent_app.parent_app.view.error
raise NotFound(error.resource_not_found())
def get_resource(controllers):
return resource(**unquoted)
return None
- if resource := get_resource(self.controllers):
+ if resource := get_resource(outer_self.controllers):
return resource
- for mount, app in self.mounts:
+ for mount, app in outer_self.mounts:
m = re.match(mount, path)
if m:
match_end = m.span()[1]
for k, v in m.groupdict().items():
setattr(controller, k, v)
return controller
- if resource := get_resource(self.after_controllers):
+ if resource := get_resource(outer_self.after_controllers):
return resource
- if resource := get_resource(self.unprefixed_controllers):
+ if resource := get_resource(outer_self.unprefixed_controllers):
return resource
return ResourceNotFound()
tx.response.status == "200 OK"
and tx.response.headers.content_type == "text/html"
):
- doc = parse(tx.response.body)
+ doc = webagt.parse(tx.response.body)
head = doc.select("head")[0]
head.append("<link rel=icon href=/icon.png>")
tx.response.body = doc.html
return payload
-def run_redis(socket):
- sh.redis_server("--daemonize", "yes", "--unixsocket", socket, "--port", 0)
-
-
-def kill_redis(socket):
- sh.redis_cli("-s", socket, "shutdown")
-
-
def enqueue(callable, *args, **kwargs):
"""
append a function call to the end of the job queue
--- a/web/framework/util.py
+++ b/web/framework/util.py
netloc = self.host.domain
except KeyError:
netloc = self.request.uri.netloc
- return (
- self.request.headers.get("X-Scheme", self.request.uri.scheme)
- + "://"
- + netloc
- )
+ scheme = self.request.headers.get("X-Scheme", self.request.uri.scheme)
+ if scheme:
+ return scheme + "://" + netloc
+ else:
+ return ""
# @property
# def owner(self):
--- a/web/host/__init__.py
+++ b/web/host/__init__.py
logging.debug(f"stdout: {stdout}")
logging.debug(f"stderr: {stderr}")
else:
- for line in proc.stdout:
- decoded_line = line.decode("utf-8").rstrip("\r\n")
- process.lines.append(decoded_line)
- logging.debug(decoded_line)
- if output_handler:
- output_handler(decoded_line)
+ if proc.stdout:
+ for line in proc.stdout:
+ decoded_line = line.decode("utf-8").rstrip("\r\n")
+ process.lines.append(decoded_line)
+ logging.debug(decoded_line)
+ if output_handler:
+ output_handler(decoded_line)
process.returncode = proc.returncode
return process
],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
- ) as process:
- for line in process.stdout:
- logging.debug(line.decode("utf-8"))
- return process
+ ) as proc:
+ if proc.stdout:
+ for line in proc.stdout:
+ logging.debug(line.decode("utf-8"))
+ return proc
def setup_machine(self, locale="en_US.UTF-8"):
"""Upgrade the system, install system packages and configure the firewall."""
--- a/web/host/providers.py
+++ b/web/host/providers.py
import requests
+class MachineCreationError(Exception):
+ """Could not make initial connection."""
+
+
class TokenError(Exception):
"""Bad auth token."""
ssh_keys=None,
):
"""Create a machine."""
- # machine = None
machine_id = self._request(
"post",
"droplets",
break
time.sleep(2)
tries -= 1
+ else:
+ raise MachineCreationError()
return machine
def get_machine(self, machine_id):
--- a/web/response/__init__.py
+++ b/web/response/__init__.py
from .success import * # noqa
from .util import Status
-__all__ = ["Status"] + (
- informational.__all__
- + success.__all__
- + redirection.__all__
- + clienterror.__all__
- + servererror.__all__
-)
+# XXX __all__ = ["Status"] + (
+# XXX informational.__all__
+# XXX + success.__all__
+# XXX + redirection.__all__
+# XXX + clienterror.__all__
+# XXX + servererror.__all__
+# XXX )
def parse(response):
--- a/web/templating/__main__.py
+++ b/web/templating/__main__.py
import txt
-import web
+import web.templating
-main = txt.application("mm", mm.__doc__)
+main = txt.application("web-template", web.templating.__doc__)
@main.register()
)
def run(self, args, stdin):
- document = mm.Template(stdin)(*args.args)
+ document = web.templating.Template(stdin)(*args.args)
for wrapper in args.wrappers:
- document = mm.Template(wrapper)(document)
+ document = web.templating.Template(wrapper)(document)
print(document)
--- a/web/templating/templating.py
+++ b/web/templating/templating.py
from RestrictedPython.Eval import default_guarded_getitem, default_guarded_getiter
from RestrictedPython.Guards import safe_builtins
-from .. import markdown
+from .. import markdown, slrzd
from . import parse
safe_globals["_getiter_"] = default_guarded_getiter
"""
for dirpath, dirnames, filenames in os.walk(directory):
with open(os.path.join(dirpath, "__init__.py"), "w") as template:
- print("from mm.parse import *", file=template)
- print("from mm.templating import *", file=template)
+ print("from web.templating.parse import *", file=template)
+ print("from web.templating.templating import *", file=template)
for dirname in dirnames[:]:
if dirname.startswith("."):
dirnames.remove(dirname)
return Template(self._template + new_text, restricted=self._restricted)
def _repr_html_(self):
- return solarized.highlight(self._template, ".html")
+ return slrzd.highlight(self._template, ".html")
def __call__(self, *args, **kwargs):
"""
else:
compiled_code = compile(code, filename, "exec")
except SyntaxError as err:
- # display template line that caused the error along with traceback
- lineno = get_source_line(err.filename, err.lineno - 1)
- try:
- msg = "\n\nTemplate traceback:\n File {}, line {}\n {}"
- err.msg += msg.format(repr(err.filename), err.lineno - 5, lineno)
- except Exception:
- pass
+ if err.lineno:
+ # display template line that caused the error along with traceback
+ lineno = get_source_line(err.filename, err.lineno - 1)
+ try:
+ msg = "\n\nTemplate traceback:\n File {}, line {}\n {}"
+ err.msg += msg.format(repr(err.filename), err.lineno - 5, lineno)
+ except Exception:
+ pass
raise
return compiled_code