Bootstrap
Committed fd7f4c
index 0000000..4711d41
--- /dev/null
+poetry.lock
+.coverage
+.test_coverage.xml
+.test_results.xml
index 0000000..53e7e76
--- /dev/null
+"""A simple asynchronous background job queue."""
+
+from gevent import monkey
+
+monkey.patch_all()
+
+import argparse # noqa: E402
+import importlib # noqa: E402
+import pathlib # noqa: E402
+import time # noqa: E402
+import traceback # noqa: E402
+
+import gevent.queue # noqa: E402
+import pendulum # noqa: E402
+import sqlyte # noqa: E402
+import txt # noqa: E402
+import web # noqa: E402
+import webagt # noqa: E402
+
+# XXX queue = gevent.queue.PriorityQueue()
+worker_count = 20
+main = txt.application("webqueue", __doc__)
+
+
+def handle_job(host, job_run_id, db, cache_db, browser):
+ """Handle a freshly dequeued job."""
+ # TODO handle retries
+ web.tx.host.name = host
+ web.tx.host.db = db
+ web.tx.host.cache = webagt.cache(db=cache_db)
+ web.tx.browser = browser
+ job = web.tx.db.select(
+ "job_runs AS r",
+ what="s.rowid, *",
+ join="job_signatures AS s ON s.rowid = r.job_signature_id",
+ where="r.job_id = ?",
+ vals=[job_run_id],
+ )[0]
+ if job["started"]:
+ return
+ web.tx.db.update(
+ "job_runs",
+ where="job_id = ?",
+ vals=[job_run_id],
+ what="started = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')",
+ )
+ _module = job["module"]
+ _object = job["object"]
+ _args = web.load(job["args"])
+ _kwargs = web.load(job["kwargs"])
+ print(
+ f"{host}/{_module}:{_object}",
+ *(_args + list(f"{k}={v}" for k, v in _kwargs.items())),
+ sep="\n ",
+ flush=True,
+ )
+ status = 0
+ try:
+ output = getattr(importlib.import_module(_module), _object)(*_args, **_kwargs)
+ except Exception as err:
+ status = 1
+ output = str(err)
+ traceback.print_exc()
+ web.tx.db.update(
+ "job_runs",
+ vals=[status, output, job_run_id],
+ what="finished = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'), status = ?, output = ?",
+ where="job_id = ?",
+ )
+ run = web.tx.db.select("job_runs", where="job_id = ?", vals=[job_run_id])[0]
+ st, rt = run["started"] - run["created"], run["finished"] - run["started"]
+ web.tx.db.update(
+ "job_runs",
+ where="job_id = ?",
+ what="start_time = ?, run_time = ?",
+ vals=[
+ f"{st.seconds}.{st.microseconds}",
+ f"{rt.seconds}.{rt.microseconds}",
+ job_run_id,
+ ],
+ )
+ # XXX print(flush=True)
+
+
+def run_scheduler(browser):
+ """Check all schedules every minute and enqueue any scheduled jobs."""
+ while True:
+ now = pendulum.now()
+ if now.second:
+ time.sleep(0.9)
+ continue
+ # TODO schedule_jobs()
+ time.sleep(1)
+
+
+def schedule_jobs(browser):
+ """Check schedule and enqueue jobs if necessary."""
+ # TODO support for days of month, days of week
+ print("checking schedule")
+ # for host in get_hosts():
+ # web.tx = canopy.contextualize(host)
+ # jobs = web.tx.db.select("job_schedules AS sch",
+ # join="""job_signatures AS sig ON
+ # sch.job_signature_id = sig.rowid""")
+ # for job in jobs:
+ # run = True
+ # minute = job["minute"]
+ # hour = job["hour"]
+ # month = job["month"]
+ # if minute[:2] == "*/":
+ # if now.minute % int(minute[2]) == 0:
+ # run = True
+ # else:
+ # run = False
+ # if hour[:2] == "*/":
+ # if now.hour % int(hour[2]) == 0 and now.minute == 0:
+ # run = True
+ # else:
+ # run = False
+ # if month[:2] == "*/":
+ # if now.month % int(month[2]) == 0 and now.hour == 0 \
+ # and now.minute == 0:
+ # run = True
+ # else:
+ # run = False
+ # if run:
+ # canopy.enqueue(getattr(importlib.import_module(job["module"]),
+ # job["object"]))
+ # time.sleep(.9)
+
+
+@main.register()
+class Run:
+ """Run the job queue."""
+
+ def setup(self, add_arg):
+ add_arg(
+ "--browser",
+ action=argparse.BooleanOptionalAction,
+ help="include a web browser",
+ )
+
+ def run(self, stdin, log):
+ """Serve the queue."""
+ browser = None
+ if self.browser:
+ browser = web.agent.Firefox()
+ # TODO gevent.spawn(run_scheduler, browser)
+
+ def run_worker(domain, db, cache_db, browser):
+ while True:
+ for job in db.select("job_runs", where="started is null"):
+ handle_job(domain, job["job_id"], db, cache_db, browser)
+ time.sleep(0.5)
+
+ while not pathlib.Path("site.db").exists():
+ time.sleep(1)
+ # for db_filename in pathlib.Path().glob("site.db"):
+ domain = "localhost" # XXX db_filename.stem.removeprefix("site-")
+ log.info(f"Watching for jobs at: {domain}")
+ # XXX sqldb = sqlyte.db(f"site-{domain}.db")
+ # XXX cache_db = sqlyte.db(f"cache-{domain}.db")
+ sqldb = sqlyte.db("site.db")
+ cache_db = sqlyte.db("cache.db")
+ for _ in range(worker_count):
+ gevent.spawn(run_worker, domain, sqldb, cache_db, browser)
+ try:
+ while True:
+ time.sleep(1)
+ except KeyboardInterrupt: # TODO capture supervisord's kill signal
+ if self.browser:
+ browser.quit()
index 0000000..5e59006
--- /dev/null
+[tool.poetry]
+name = "bgq"
+version = "0.1.3"
+description = "a simple asynchronous background job queue"
+homepage = "https://ragt.ag/code/projects/bgq"
+repository = "https://ragt.ag/code/projects/bgq.git"
+documentation = "https://ragt.ag/code/projects/bgq/api"
+authors = ["Angelo Gladding <angelo@ragt.ag>"]
+license = "BSD-2-Clause"
+packages = [{include="bgq.py"}, {include="webint_jobs"}]
+
+[tool.poetry.scripts]
+bgq = "bgq:main"
+
+[tool.poetry.plugins."webapps"]
+jobs = "webint_jobs:app"
+
+[tool.poetry.dependencies]
+python = ">=3.10,<3.11"
+sqlyte = ">0.0.50"
+txtint = ">0.0.68"
+webagt = ">0.0.5"
+webint = ">0.0.569"
+pendulum = "^2.1.2"
+gevent = "^23.7.0"
+
+[tool.poetry.group.dev.dependencies]
+gmpg = {path="../gmpg", develop=true}
+sqlyte = {path="../sqlyte", develop=true}
+txtint = {path="../txtint", develop=true}
+webagt = {path="../webagt", develop=true}
+webint = {path="../webint", develop=true}
+newmath = {path="../newmath", develop=true}
+microformats = {path="../python-microformats", develop=true}
+
+# [[tool.poetry.source]]
+# name = "main"
+# url = "https://ragt.ag/code/pypi"
+
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
index 0000000..f18a383
--- /dev/null
+""""""
+
+import importlib
+
+import web
+
+app = web.application(
+ __name__,
+ prefix="jobs",
+ args={
+ "job_module": r"[\w.]+",
+ "job_object": r"\w+",
+ "job_arghash": r"\w+",
+ "job_run_id": r"\!+",
+ },
+ model={
+ "job_signatures": {
+ "module": "TEXT",
+ "object": "TEXT",
+ "args": "BLOB",
+ "kwargs": "BLOB",
+ "arghash": "TEXT",
+ "unique": ("module", "object", "arghash"),
+ },
+ "job_runs": {
+ "job_signature_id": "INTEGER",
+ "job_id": "TEXT UNIQUE",
+ "created": """DATETIME NOT NULL
+ DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))""",
+ "started": "DATETIME",
+ "finished": "DATETIME",
+ "start_time": "REAL",
+ "run_time": "REAL",
+ "status": "INTEGER",
+ "output": "TEXT",
+ },
+ "job_schedules": {
+ "job_signature_id": "INTEGER",
+ "minute": "TEXT",
+ "hour": "TEXT",
+ "day_of_month": "TEXT",
+ "month": "TEXT",
+ "day_of_week": "TEXT",
+ "unique": (
+ "job_signature_id",
+ "minute",
+ "hour",
+ "day_of_month",
+ "month",
+ "day_of_week",
+ ),
+ },
+ },
+)
+
+
+@app.control("")
+class Jobs:
+ """"""
+
+ owner_only = ["get"]
+
+ def get(self):
+ active = web.tx.db.select(
+ "job_runs AS jr",
+ join="job_signatures AS js ON js.rowid = jr.job_signature_id",
+ where="started IS NOT NULL AND finished IS NULL",
+ )
+ finished = web.tx.db.select(
+ "job_runs AS jr",
+ join="job_signatures AS js ON js.rowid = jr.job_signature_id",
+ where="finished IS NOT NULL",
+ order="finished DESC",
+ limit=20,
+ )
+ return app.view.index(active, finished)
+
+
+@app.control("slow")
+class SlowJobs:
+ """"""
+
+ owner_only = ["get"]
+
+ def get(self):
+ duration = web.form(duration=60).duration
+ slowest = web.tx.db.select(
+ "job_runs AS jr",
+ join="job_signatures AS js ON js.rowid = jr.job_signature_id",
+ where="finished IS NOT NULL AND run_time > ?",
+ vals=[duration],
+ order="run_time DESC",
+ )
+ return app.view.slow(slowest)
+
+
+@app.control("schedules")
+class Schedules:
+ """"""
+
+ owner_only = ["get"]
+
+ def get(self):
+ schedules = web.tx.db.select(
+ "job_schedules AS sc",
+ join="job_signatures AS si ON si.rowid = sc.job_signature_id",
+ )
+ return app.view.schedules(schedules)
+
+
+@app.control("{job_module}")
+class ByModule:
+ """"""
+
+ owner_only = ["get"]
+
+ def get(self, job_module):
+ jobs = web.tx.db.select(
+ "job_signatures",
+ what="rowid AS id, *",
+ where="module = ?",
+ vals=[job_module],
+ )
+ return app.view.by_module(job_module, jobs)
+
+
+@app.control("{job_module}/{job_object}")
+class ByObject:
+ """"""
+
+ owner_only = ["get"]
+
+ def get(self, job_module, job_object):
+ callable = getattr(importlib.import_module(job_module), job_object)
+ jobs = web.tx.db.select(
+ "job_signatures",
+ what="rowid AS id, *",
+ where="module = ? AND object = ?",
+ vals=[job_module, job_object],
+ )
+ return app.view.by_object(job_module, job_object, callable, jobs)
+
+
+@app.control("{job_module}/{job_object}/{job_arghash}")
+class Job:
+ """"""
+
+ owner_only = ["get"]
+
+ def get(self, job_module, job_object, job_arghash):
+ callable = getattr(importlib.import_module(job_module), job_object)
+ job = web.tx.db.select(
+ "job_signatures",
+ what="rowid AS id, *",
+ where="module = ? AND object = ? AND arghash LIKE ?",
+ vals=[job_module, job_object, job_arghash + "%"],
+ )[0]
+ runs = web.tx.db.select(
+ "job_runs",
+ where="job_signature_id = ?",
+ vals=[job["id"]],
+ order="finished DESC",
+ limit=100,
+ )
+ total = web.tx.db.select(
+ "job_runs",
+ what="count(*) AS count",
+ where="job_signature_id = ?",
+ vals=[job["id"]],
+ order="finished DESC",
+ )[0]["count"]
+ return app.view.job(job_module, job_object, callable, job, runs, total)
+
+
+@app.control("{job_module}/{job_object}/{job_arghash}/{job_run_id}")
+class JobRun:
+ """"""
+
+ owner_only = ["get"]
+
+ def get(self, job_module, job_object, job_arghash, job_run_id):
+ callable = getattr(importlib.import_module(job_module), job_object)
+ job = web.tx.db.select(
+ "job_signatures",
+ what="rowid AS id, *",
+ where="module = ? AND object = ? AND arghash LIKE ?",
+ vals=[job_module, job_object, job_arghash + "%"],
+ )[0]
+ run = web.tx.db.select(
+ "job_runs",
+ what="rowid, *",
+ where="job_id = ?",
+ vals=[job_run_id],
+ order="finished DESC",
+ )[0]
+ return app.view.run(job_module, job_object, callable, job, run)
index 0000000..fce90d9
--- /dev/null
+import inspect
+from json import loads as load_json
+from pprint import pformat
+
+__all__ = ["load_json", "pformat", "get_doc"]
+
+
+def get_doc(obj):
+ """Return a two-tuple of object's first line and rest of docstring."""
+ docstring = obj.__doc__
+ if not docstring:
+ return "", ""
+ return inspect.cleandoc(docstring).partition("\n\n")[::2]
index 0000000..262f1c2
--- /dev/null
+$def with (module, jobs)
+$var title: $module
+
+<ul>
+$for job in jobs:
+ <li><a href=/$module/$job["object"]>$job["object"]</a></li>
+</ul>
index 0000000..e2ec2ab
--- /dev/null
+$def with (module, object, callable, jobs)
+$var breadcrumbs = (module, f"<code><b>{module}</b></code>")
+$var title = object
+
+$ short_desc, long_desc = get_doc(callable)
+
+<div class=shortdesc>
+$:mkdn(short_desc)
+</div>
+
+<div class=longdesc>
+$:mkdn(long_desc)
+</div>
+
+<style>
+.shortdesc {
+ font-size: 1.25em; }
+</style>
+
+<ul>
+$for job in jobs:
+ $ args = load_json(job["args"])
+ $ kwargs = load_json(job["kwargs"])
+ $ shorthash = job["arghash"][:16]
+ <li>
+ $if args:
+ $", ".join(args)
+ $if kwargs:
+ $kwargs
+ <small><a href=/$module/$object/$shorthash>$shorthash</a></small></li>
+</ul>
index 0000000..cb68ca1
--- /dev/null
+$def with (active, finished)
+$var title: Jobs
+
+$def render_job(job):
+ $ args = load_json(job["args"])
+ $ kwargs = load_json(job["kwargs"])
+ <a href=/$job["module"]/$job["object"]/$job["arghash"][:16]/$job["job_id"]><code>\
+ $job["module"]:${job["object"]}\
+ $for arg in args:
+ <br><span title="$type(arg)">$arg</span>\
+ $for k, v in kwargs.items():
+ <br><span title="$type(v)">$k=$v</span>\
+ </code></a>
+
+$def aside():
+ <h2>Active</h2>
+ $if active:
+ <ul>
+ $for job in active:
+ <li>
+ $:render_job(job)
+ <p><small>$job["created"], $job["started"]</small></p>
+ </li>
+ </ul>
+ $else:
+ <p>no active jobs</p>
+
+$:aside()
+
+<h2>Finished</h2>
+<p><a href=/slow>slowest jobs</a></p>
+$if finished:
+ <ul>
+ $for job in finished:
+ <li><p>
+ $:render_job(job)<br>
+ <small>$job["finished"].diff_for_humans(),
+ took $job["run_time"] seconds</small>
+ </p></li>
+ </ul>
+$else:
+ <p>no finished jobs</p>
+
+$# $var aside = aside
index 0000000..d5116b0
--- /dev/null
+$def with (module, object, callable, job, runs, total)
+$var breadcrumbs = (module, f"<code><b>{module}</b></code>", object, f"<code><b>{object}</b></code>")
+$var title: $module:$object
+$var subtitle: <code><strong>$job["arghash"][:16]</strong> $job["arghash"][16:]</code>
+
+<p>$total total runs.</p>
+
+<ul>
+$for run in runs:
+ <li>
+ <p>Created: <time datetime=$run["created"]>$run["created"]</time>
+ <br>Started: <time datetime=$run["started"]>$run["started"]</time>
+ $if run["finished"]:
+ <br>Finished: <time datetime=$run["finished"]>$run["finished"]</time>
+ </p>
+ $if not run["finished"]:
+ <form method=post action=/>
+ <button>Cancel</button>
+ </form>
+ <small><a href=/$module/$object/\
+ $job["arghash"][:16]/$run["job_id"]>$run["job_id"]</a></small>
+ </li>
+</ul>
+
+$def aside():
+ $ args = load_json(job["args"])
+ $ kwargs = load_json(job["kwargs"])
+
+ <ol>
+ $for arg in args:
+ <li>$arg</li>
+ </ol>
+
+ <dl>
+ $for key, val in kwargs:
+ <dt>$key</dt>
+ <dd>$val</dd>
+ </dl>
+$var aside = aside
index 0000000..e5c47fe
--- /dev/null
+$def with (module, object, callable, job, run)
+$ shorthash = job["arghash"][:16]
+$var breadcrumbs = (module, f"<code><b>{module}</b></code>", object, f"<code><b>{object}</b></code>", shorthash, f"<code><b>{shorthash}</b></code>")
+$var title: $module:$object
+$var subtitle: <code><strong>$shorthash</strong>$job["arghash"][16:] : $run["job_id"]</code>
+
+$ args = load_json(job["args"])
+$ kwargs = load_json(job["kwargs"])
+
+<ol>
+$for arg in args:
+ <li>$arg</li>
+</ol>
+
+<dl>
+$for key, val in kwargs:
+ <dt>$key</dt>
+ <dd>$val</dd>
+</dl>
+
+$if run["status"]:
+ <p>Error code <code>$run["status"]</code></p>
+<pre>$run["output"]</pre>
+
+$def aside():
+ <p>Created: $run["created"].diff_for_humans()
+ $ duration = run["started"] - run["created"]
+ <br>Started: $run["started"].diff_for_humans() <small>in $(duration.seconds).$duration.microseconds seconds</small>
+ $if run["finished"]:
+ $ duration = run["finished"] - run["started"]
+ <br>Finished: $run["finished"].diff_for_humans() <small>in $(duration.seconds).$duration.microseconds seconds</small>
+ </p>
+ $if not run["finished"]:
+ <form method=post action=/>
+ <button>Cancel</button>
+ </form>
+$var aside = aside
index 0000000..ddaed40
--- /dev/null
+$def with (schedules)
+$var breadcrumbs = ("system", "System", "jobs", "Jobs")
+$var title: Schedules
+
+$def humanize(s):
+ $ min = s["minute"]
+ $ hr = s["hour"]
+ $ day = s["day_of_month"]
+ $ mon = s["month"]
+ $ dow = s["day_of_week"]
+ <abbr title="$min $hr $day $mon $dow">
+ $if min == "*":
+ every minute
+ $if "/" in min:
+ every $min.partition("/")[2] minutes
+ $# $if hr == "*":
+ $# every hour
+ $# $if day == "*" and dow == "*":
+ $# every day
+ $# $if mon == "*":
+ $# every month
+ </abbr>
+
+<ul>
+$for schedule in schedules:
+ <li><a
+ href=$schedule["module"]/$schedule["object"]>$schedule["object"]</a><br>
+ <small><a href=$schedule["module"]>$schedule["module"]</a></small><br>
+ $if schedule["args"] != "[]" or schedule["kwargs"] != "{}":
+ <small>\
+ $if schedule["args"] != "[]":
+ $schedule["args"]<br>
+ $if schedule["kwargs"] != "{}":
+ $schedule["kwargs"]<br>
+ </small>
+ $:humanize(schedule)
+ </li>
+</ul>
index 0000000..556e7ee
--- /dev/null
+$def with (slowest)
+$var title: Slowest Jobs
+
+$def render_job(job):
+ $ args = load_json(job["args"])
+ $ kwargs = load_json(job["kwargs"])
+ <a href=/$job["module"]/$job["object"]/$job["arghash"][:16]/$job["job_id"]><code>\
+ $job["module"]:${job["object"]}\
+ $for arg in args:
+ <br><span title="$type(arg)">$arg</span>\
+ $for k, v in kwargs.items():
+ <br><span title="$type(v)">$k=$v</span>\
+ </code></a>
+
+<ul>
+$for job in slowest:
+ <li><p>
+ $:render_job(job)<br>
+ <small>$job["finished"].diff_for_humans(),
+ took $job["run_time"] seconds</small>
+ </p></li>
+</ul>