my eye

Bootstrap

Committed fd7f4c

index 0000000..4711d41
--- /dev/null

+poetry.lock
+.coverage
+.test_coverage.xml
+.test_results.xml

index 0000000..53e7e76
--- /dev/null

+"""A simple asynchronous background job queue."""
+
+from gevent import monkey
+
+monkey.patch_all()
+
+import argparse  # noqa: E402
+import importlib  # noqa: E402
+import pathlib  # noqa: E402
+import time  # noqa: E402
+import traceback  # noqa: E402
+
+import gevent.queue  # noqa: E402
+import pendulum  # noqa: E402
+import sqlyte  # noqa: E402
+import txt  # noqa: E402
+import web  # noqa: E402
+import webagt  # noqa: E402
+
+# XXX queue = gevent.queue.PriorityQueue()
+worker_count = 20
+main = txt.application("webqueue", __doc__)
+
+
+def handle_job(host, job_run_id, db, cache_db, browser):
+    """Handle a freshly dequeued job."""
+    # TODO handle retries
+    web.tx.host.name = host
+    web.tx.host.db = db
+    web.tx.host.cache = webagt.cache(db=cache_db)
+    web.tx.browser = browser
+    job = web.tx.db.select(
+        "job_runs AS r",
+        what="s.rowid, *",
+        join="job_signatures AS s ON s.rowid = r.job_signature_id",
+        where="r.job_id = ?",
+        vals=[job_run_id],
+    )[0]
+    if job["started"]:
+        return
+    web.tx.db.update(
+        "job_runs",
+        where="job_id = ?",
+        vals=[job_run_id],
+        what="started = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')",
+    )
+    _module = job["module"]
+    _object = job["object"]
+    _args = web.load(job["args"])
+    _kwargs = web.load(job["kwargs"])
+    print(
+        f"{host}/{_module}:{_object}",
+        *(_args + list(f"{k}={v}" for k, v in _kwargs.items())),
+        sep="\n  ",
+        flush=True,
+    )
+    status = 0
+    try:
+        output = getattr(importlib.import_module(_module), _object)(*_args, **_kwargs)
+    except Exception as err:
+        status = 1
+        output = str(err)
+        traceback.print_exc()
+    web.tx.db.update(
+        "job_runs",
+        vals=[status, output, job_run_id],
+        what="finished = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'), status = ?, output = ?",
+        where="job_id = ?",
+    )
+    run = web.tx.db.select("job_runs", where="job_id = ?", vals=[job_run_id])[0]
+    st, rt = run["started"] - run["created"], run["finished"] - run["started"]
+    web.tx.db.update(
+        "job_runs",
+        where="job_id = ?",
+        what="start_time = ?, run_time = ?",
+        vals=[
+            f"{st.seconds}.{st.microseconds}",
+            f"{rt.seconds}.{rt.microseconds}",
+            job_run_id,
+        ],
+    )
+    # XXX print(flush=True)
+
+
+def run_scheduler(browser):
+    """Check all schedules every minute and enqueue any scheduled jobs."""
+    while True:
+        now = pendulum.now()
+        if now.second:
+            time.sleep(0.9)
+            continue
+        # TODO schedule_jobs()
+        time.sleep(1)
+
+
+def schedule_jobs(browser):
+    """Check schedule and enqueue jobs if necessary."""
+    # TODO support for days of month, days of week
+    print("checking schedule")
+    # for host in get_hosts():
+    #     web.tx = canopy.contextualize(host)
+    #     jobs = web.tx.db.select("job_schedules AS sch",
+    #                         join="""job_signatures AS sig ON
+    #                                 sch.job_signature_id = sig.rowid""")
+    #     for job in jobs:
+    #         run = True
+    #         minute = job["minute"]
+    #         hour = job["hour"]
+    #         month = job["month"]
+    #         if minute[:2] == "*/":
+    #             if now.minute % int(minute[2]) == 0:
+    #                 run = True
+    #             else:
+    #                 run = False
+    #         if hour[:2] == "*/":
+    #             if now.hour % int(hour[2]) == 0 and now.minute == 0:
+    #                 run = True
+    #             else:
+    #                 run = False
+    #         if month[:2] == "*/":
+    #             if now.month % int(month[2]) == 0 and now.hour == 0 \
+    #                and now.minute == 0:
+    #                 run = True
+    #             else:
+    #                 run = False
+    #         if run:
+    #             canopy.enqueue(getattr(importlib.import_module(job["module"]),
+    #                                    job["object"]))
+    # time.sleep(.9)
+
+
+@main.register()
+class Run:
+    """Run the job queue."""
+
+    def setup(self, add_arg):
+        add_arg(
+            "--browser",
+            action=argparse.BooleanOptionalAction,
+            help="include a web browser",
+        )
+
+    def run(self, stdin, log):
+        """Serve the queue."""
+        browser = None
+        if self.browser:
+            browser = web.agent.Firefox()
+            # TODO gevent.spawn(run_scheduler, browser)
+
+        def run_worker(domain, db, cache_db, browser):
+            while True:
+                for job in db.select("job_runs", where="started is null"):
+                    handle_job(domain, job["job_id"], db, cache_db, browser)
+                time.sleep(0.5)
+
+        while not pathlib.Path("site.db").exists():
+            time.sleep(1)
+        # for db_filename in pathlib.Path().glob("site.db"):
+        domain = "localhost"  # XXX db_filename.stem.removeprefix("site-")
+        log.info(f"Watching for jobs at: {domain}")
+        # XXX sqldb = sqlyte.db(f"site-{domain}.db")
+        # XXX cache_db = sqlyte.db(f"cache-{domain}.db")
+        sqldb = sqlyte.db("site.db")
+        cache_db = sqlyte.db("cache.db")
+        for _ in range(worker_count):
+            gevent.spawn(run_worker, domain, sqldb, cache_db, browser)
+        try:
+            while True:
+                time.sleep(1)
+        except KeyboardInterrupt:  # TODO capture supervisord's kill signal
+            if self.browser:
+                browser.quit()

index 0000000..5e59006
--- /dev/null

+[tool.poetry]
+name = "bgq"
+version = "0.1.3"
+description = "a simple asynchronous background job queue"
+homepage = "https://ragt.ag/code/projects/bgq"
+repository = "https://ragt.ag/code/projects/bgq.git"
+documentation = "https://ragt.ag/code/projects/bgq/api"
+authors = ["Angelo Gladding <angelo@ragt.ag>"]
+license = "BSD-2-Clause"
+packages = [{include="bgq.py"}, {include="webint_jobs"}]
+
+[tool.poetry.scripts]
+bgq = "bgq:main"
+
+[tool.poetry.plugins."webapps"]
+jobs = "webint_jobs:app"
+
+[tool.poetry.dependencies]
+python = ">=3.10,<3.11"
+sqlyte = ">0.0.50"
+txtint = ">0.0.68"
+webagt = ">0.0.5"
+webint = ">0.0.569"
+pendulum = "^2.1.2"
+gevent = "^23.7.0"
+
+[tool.poetry.group.dev.dependencies]
+gmpg = {path="../gmpg", develop=true}
+sqlyte = {path="../sqlyte", develop=true}
+txtint = {path="../txtint", develop=true}
+webagt = {path="../webagt", develop=true}
+webint = {path="../webint", develop=true}
+newmath = {path="../newmath", develop=true}
+microformats = {path="../python-microformats", develop=true}
+
+# [[tool.poetry.source]]
+# name = "main"
+# url = "https://ragt.ag/code/pypi"
+
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"

index 0000000..f18a383
--- /dev/null

+""""""
+
+import importlib
+
+import web
+
+app = web.application(
+    __name__,
+    prefix="jobs",
+    args={
+        "job_module": r"[\w.]+",
+        "job_object": r"\w+",
+        "job_arghash": r"\w+",
+        "job_run_id": r"\!+",
+    },
+    model={
+        "job_signatures": {
+            "module": "TEXT",
+            "object": "TEXT",
+            "args": "BLOB",
+            "kwargs": "BLOB",
+            "arghash": "TEXT",
+            "unique": ("module", "object", "arghash"),
+        },
+        "job_runs": {
+            "job_signature_id": "INTEGER",
+            "job_id": "TEXT UNIQUE",
+            "created": """DATETIME NOT NULL
+                          DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))""",
+            "started": "DATETIME",
+            "finished": "DATETIME",
+            "start_time": "REAL",
+            "run_time": "REAL",
+            "status": "INTEGER",
+            "output": "TEXT",
+        },
+        "job_schedules": {
+            "job_signature_id": "INTEGER",
+            "minute": "TEXT",
+            "hour": "TEXT",
+            "day_of_month": "TEXT",
+            "month": "TEXT",
+            "day_of_week": "TEXT",
+            "unique": (
+                "job_signature_id",
+                "minute",
+                "hour",
+                "day_of_month",
+                "month",
+                "day_of_week",
+            ),
+        },
+    },
+)
+
+
+@app.control("")
+class Jobs:
+    """"""
+
+    owner_only = ["get"]
+
+    def get(self):
+        active = web.tx.db.select(
+            "job_runs AS jr",
+            join="job_signatures AS js ON js.rowid = jr.job_signature_id",
+            where="started IS NOT NULL AND finished IS NULL",
+        )
+        finished = web.tx.db.select(
+            "job_runs AS jr",
+            join="job_signatures AS js ON js.rowid = jr.job_signature_id",
+            where="finished IS NOT NULL",
+            order="finished DESC",
+            limit=20,
+        )
+        return app.view.index(active, finished)
+
+
+@app.control("slow")
+class SlowJobs:
+    """"""
+
+    owner_only = ["get"]
+
+    def get(self):
+        duration = web.form(duration=60).duration
+        slowest = web.tx.db.select(
+            "job_runs AS jr",
+            join="job_signatures AS js ON js.rowid = jr.job_signature_id",
+            where="finished IS NOT NULL AND run_time > ?",
+            vals=[duration],
+            order="run_time DESC",
+        )
+        return app.view.slow(slowest)
+
+
+@app.control("schedules")
+class Schedules:
+    """"""
+
+    owner_only = ["get"]
+
+    def get(self):
+        schedules = web.tx.db.select(
+            "job_schedules AS sc",
+            join="job_signatures AS si ON si.rowid = sc.job_signature_id",
+        )
+        return app.view.schedules(schedules)
+
+
+@app.control("{job_module}")
+class ByModule:
+    """"""
+
+    owner_only = ["get"]
+
+    def get(self, job_module):
+        jobs = web.tx.db.select(
+            "job_signatures",
+            what="rowid AS id, *",
+            where="module = ?",
+            vals=[job_module],
+        )
+        return app.view.by_module(job_module, jobs)
+
+
+@app.control("{job_module}/{job_object}")
+class ByObject:
+    """"""
+
+    owner_only = ["get"]
+
+    def get(self, job_module, job_object):
+        callable = getattr(importlib.import_module(job_module), job_object)
+        jobs = web.tx.db.select(
+            "job_signatures",
+            what="rowid AS id, *",
+            where="module = ? AND object = ?",
+            vals=[job_module, job_object],
+        )
+        return app.view.by_object(job_module, job_object, callable, jobs)
+
+
+@app.control("{job_module}/{job_object}/{job_arghash}")
+class Job:
+    """"""
+
+    owner_only = ["get"]
+
+    def get(self, job_module, job_object, job_arghash):
+        callable = getattr(importlib.import_module(job_module), job_object)
+        job = web.tx.db.select(
+            "job_signatures",
+            what="rowid AS id, *",
+            where="module = ? AND object = ? AND arghash LIKE ?",
+            vals=[job_module, job_object, job_arghash + "%"],
+        )[0]
+        runs = web.tx.db.select(
+            "job_runs",
+            where="job_signature_id = ?",
+            vals=[job["id"]],
+            order="finished DESC",
+            limit=100,
+        )
+        total = web.tx.db.select(
+            "job_runs",
+            what="count(*) AS count",
+            where="job_signature_id = ?",
+            vals=[job["id"]],
+            order="finished DESC",
+        )[0]["count"]
+        return app.view.job(job_module, job_object, callable, job, runs, total)
+
+
+@app.control("{job_module}/{job_object}/{job_arghash}/{job_run_id}")
+class JobRun:
+    """"""
+
+    owner_only = ["get"]
+
+    def get(self, job_module, job_object, job_arghash, job_run_id):
+        callable = getattr(importlib.import_module(job_module), job_object)
+        job = web.tx.db.select(
+            "job_signatures",
+            what="rowid AS id, *",
+            where="module = ? AND object = ? AND arghash LIKE ?",
+            vals=[job_module, job_object, job_arghash + "%"],
+        )[0]
+        run = web.tx.db.select(
+            "job_runs",
+            what="rowid, *",
+            where="job_id = ?",
+            vals=[job_run_id],
+            order="finished DESC",
+        )[0]
+        return app.view.run(job_module, job_object, callable, job, run)

index 0000000..fce90d9
--- /dev/null

+import inspect
+from json import loads as load_json
+from pprint import pformat
+
+__all__ = ["load_json", "pformat", "get_doc"]
+
+
+def get_doc(obj):
+    """Return a two-tuple of object's first line and rest of docstring."""
+    docstring = obj.__doc__
+    if not docstring:
+        return "", ""
+    return inspect.cleandoc(docstring).partition("\n\n")[::2]

index 0000000..262f1c2
--- /dev/null

+$def with (module, jobs)
+$var title: $module
+
+<ul>
+$for job in jobs:
+    <li><a href=/$module/$job["object"]>$job["object"]</a></li>
+</ul>

index 0000000..e2ec2ab
--- /dev/null

+$def with (module, object, callable, jobs)
+$var breadcrumbs = (module, f"<code><b>{module}</b></code>")
+$var title = object
+
+$ short_desc, long_desc = get_doc(callable)
+
+<div class=shortdesc>
+$:mkdn(short_desc)
+</div>
+
+<div class=longdesc>
+$:mkdn(long_desc)
+</div>
+
+<style>
+.shortdesc {
+    font-size: 1.25em; }
+</style>
+
+<ul>
+$for job in jobs:
+    $ args = load_json(job["args"])
+    $ kwargs = load_json(job["kwargs"])
+    $ shorthash = job["arghash"][:16]
+    <li>
+    $if args:
+        $", ".join(args)
+    $if kwargs:
+        $kwargs
+    <small><a href=/$module/$object/$shorthash>$shorthash</a></small></li>
+</ul>

index 0000000..cb68ca1
--- /dev/null

+$def with (active, finished)
+$var title: Jobs
+
+$def render_job(job):
+    $ args = load_json(job["args"])
+    $ kwargs = load_json(job["kwargs"])
+    <a href=/$job["module"]/$job["object"]/$job["arghash"][:16]/$job["job_id"]><code>\
+    $job["module"]:${job["object"]}\
+    $for arg in args:
+        <br><span title="$type(arg)">$arg</span>\
+    $for k, v in kwargs.items():
+        <br><span title="$type(v)">$k=$v</span>\
+    </code></a>
+
+$def aside():
+    <h2>Active</h2>
+    $if active:
+        <ul>
+        $for job in active:
+            <li>
+            $:render_job(job)
+            <p><small>$job["created"], $job["started"]</small></p>
+            </li>
+        </ul>
+    $else:
+        <p>no active jobs</p>
+
+$:aside()
+
+<h2>Finished</h2>
+<p><a href=/slow>slowest jobs</a></p>
+$if finished:
+    <ul>
+    $for job in finished:
+        <li><p>
+        $:render_job(job)<br>
+        <small>$job["finished"].diff_for_humans(),
+        took $job["run_time"] seconds</small>
+        </p></li>
+    </ul>
+$else:
+    <p>no finished jobs</p>
+
+$# $var aside = aside

index 0000000..d5116b0
--- /dev/null

+$def with (module, object, callable, job, runs, total)
+$var breadcrumbs = (module, f"<code><b>{module}</b></code>", object, f"<code><b>{object}</b></code>")
+$var title: $module:$object
+$var subtitle: <code><strong>$job["arghash"][:16]</strong> $job["arghash"][16:]</code>
+
+<p>$total total runs.</p>
+
+<ul>
+$for run in runs:
+    <li>
+    <p>Created: <time datetime=$run["created"]>$run["created"]</time>
+    <br>Started: <time datetime=$run["started"]>$run["started"]</time>
+    $if run["finished"]:
+        <br>Finished: <time datetime=$run["finished"]>$run["finished"]</time>
+    </p>
+    $if not run["finished"]:
+        <form method=post action=/>
+        <button>Cancel</button>
+        </form>
+    <small><a href=/$module/$object/\
+    $job["arghash"][:16]/$run["job_id"]>$run["job_id"]</a></small>
+    </li>
+</ul>
+
+$def aside():
+    $ args = load_json(job["args"])
+    $ kwargs = load_json(job["kwargs"])
+
+    <ol>
+    $for arg in args:
+        <li>$arg</li>
+    </ol>
+
+    <dl>
+    $for key, val in kwargs:
+        <dt>$key</dt>
+        <dd>$val</dd>
+    </dl>
+$var aside = aside

index 0000000..e5c47fe
--- /dev/null

+$def with (module, object, callable, job, run)
+$ shorthash = job["arghash"][:16]
+$var breadcrumbs = (module, f"<code><b>{module}</b></code>", object, f"<code><b>{object}</b></code>", shorthash, f"<code><b>{shorthash}</b></code>")
+$var title: $module:$object
+$var subtitle: <code><strong>$shorthash</strong>$job["arghash"][16:] : $run["job_id"]</code>
+
+$ args = load_json(job["args"])
+$ kwargs = load_json(job["kwargs"])
+
+<ol>
+$for arg in args:
+    <li>$arg</li>
+</ol>
+
+<dl>
+$for key, val in kwargs:
+    <dt>$key</dt>
+    <dd>$val</dd>
+</dl>
+
+$if run["status"]:
+    <p>Error code <code>$run["status"]</code></p>
+<pre>$run["output"]</pre>
+
+$def aside():
+    <p>Created: $run["created"].diff_for_humans()
+    $ duration = run["started"] - run["created"]
+    <br>Started: $run["started"].diff_for_humans() <small>in $(duration.seconds).$duration.microseconds seconds</small>
+    $if run["finished"]:
+        $ duration = run["finished"] - run["started"]
+        <br>Finished: $run["finished"].diff_for_humans() <small>in $(duration.seconds).$duration.microseconds seconds</small>
+    </p>
+    $if not run["finished"]:
+        <form method=post action=/>
+        <button>Cancel</button>
+        </form>
+$var aside = aside

index 0000000..ddaed40
--- /dev/null

+$def with (schedules)
+$var breadcrumbs = ("system", "System", "jobs", "Jobs")
+$var title: Schedules
+
+$def humanize(s):
+    $ min = s["minute"]
+    $ hr = s["hour"]
+    $ day = s["day_of_month"]
+    $ mon = s["month"]
+    $ dow = s["day_of_week"]
+    <abbr title="$min $hr $day $mon $dow">
+    $if min == "*":
+        every minute
+    $if "/" in min:
+        every $min.partition("/")[2] minutes
+    $# $if hr == "*":
+    $#     every hour
+    $# $if day == "*" and dow == "*":
+    $#     every day
+    $# $if mon == "*":
+    $#     every month
+    </abbr>
+
+<ul>
+$for schedule in schedules:
+    <li><a
+    href=$schedule["module"]/$schedule["object"]>$schedule["object"]</a><br>
+    <small><a href=$schedule["module"]>$schedule["module"]</a></small><br>
+    $if schedule["args"] != "[]" or schedule["kwargs"] != "{}":
+        <small>\
+        $if schedule["args"] != "[]":
+            $schedule["args"]<br>
+        $if schedule["kwargs"] != "{}":
+            $schedule["kwargs"]<br>
+        </small>
+    $:humanize(schedule)
+    </li>
+</ul>

index 0000000..556e7ee
--- /dev/null

+$def with (slowest)
+$var title: Slowest Jobs
+
+$def render_job(job):
+    $ args = load_json(job["args"])
+    $ kwargs = load_json(job["kwargs"])
+    <a href=/$job["module"]/$job["object"]/$job["arghash"][:16]/$job["job_id"]><code>\
+    $job["module"]:${job["object"]}\
+    $for arg in args:
+        <br><span title="$type(arg)">$arg</span>\
+    $for k, v in kwargs.items():
+        <br><span title="$type(v)">$k=$v</span>\
+    </code></a>
+
+<ul>
+$for job in slowest:
+    <li><p>
+    $:render_job(job)<br>
+    <small>$job["finished"].diff_for_humans(),
+    took $job["run_time"] seconds</small>
+    </p></li>
+</ul>