Bootstrap
Committed 2bad03
index 0000000..faba686
--- /dev/null
+poetry.lock
+.api_python.json
+.coverage
+.test_coverage.xml
+.test_results.xml
+_test_data
index 0000000..5247e24
--- /dev/null
+"""
+Tools for metamodern software development.
+
+Design in accordance with the meta system.
+
+"""
+
+from .analysis import test
+from .git import clone_repo, get_repo
+from .pkg import (get_current_packages, get_current_project,
+ strip_local_dev_deps)
+
+__all__ = [
+ "clone_repo",
+ "get_repo",
+ "test",
+ "get_current_project",
+ "get_current_packages",
+ "strip_local_dev_deps",
+]
index 0000000..3861318
--- /dev/null
+"""Packaging tools for the terminal."""
+
+import json
+import os
+import pathlib
+
+import txt
+
+import gmpg
+
+from . import pkg
+
+main = txt.application("gmpg", gmpg.__doc__)
+
+
+def complete_distname(prefix, **kwargs):
+ return (
+ dist.project_name
+ for dist in gmpg.listing.get_environment()
+ if dist.startswith(prefix)
+ )
+
+
+@main.register()
+class Test:
+ """Test a package."""
+
+ def run(self, stdin, log):
+ gmpg.test()
+
+
+@main.register()
+class Analyze:
+ """Analyze the API of a package."""
+
+ def run(self, stdin, log):
+ project_name = gmpg.get_current_packages()[0]
+ with open("api_python.json", "w") as fp:
+ json.dump(gmpg.analysis.get_api(project_name), fp)
+
+
+@main.register()
+class Graph:
+ """Graph the dependencies of a package."""
+
+ def run(self, stdin, log):
+ project_name = gmpg.get_current_packages()[0]
+ gmpg.analysis.generate_dependency_graph(project_name)
+
+
+@main.register()
+class Commit:
+ """Commit a change to the package."""
+
+ def run(self, stdin, log):
+ pass # TODO pare down publish to only push
+
+
+@main.register()
+class Publish:
+ """Publish a package to PyPI and GitHub."""
+
+ def setup(self, add_arg):
+ add_arg(
+ "rule",
+ choices=["patch", "minor", "major"],
+ help="increment to bump the version",
+ )
+
+ def run(self, stdin, log):
+ stashed = False
+ try:
+ sh.git("diff", "--quiet")
+ except sh.ErrorReturnCode_1:
+ if input("Repo is dirty! Stash? [y/N] ").lower() == "y":
+ sh.git("stash", "push", "--keep-index")
+ stashed = True
+ else:
+ return 1
+ try:
+ print(
+ sh.poetry(
+ "run",
+ "pytest",
+ "--doctest-modules",
+ doctest_glob="README*",
+ cov=".",
+ )
+ )
+ except sh.ErrorReturnCode_5:
+ pass
+ env = os.environ.copy()
+ env.update(NO_COLOR="1", GH_PAGER="cat")
+ private = json.loads(
+ str(sh.gh("repo", "view", "--json", "isPrivate", _env=env))
+ )["isPrivate"]
+ print(sh.poetry("version", self.rule))
+ print(sh.poetry("build"))
+ if private:
+ print(sh.poetry("publish", "-r", "gaea"))
+ else:
+ print(sh.poetry("publish"))
+ version = str(sh.poetry("version", "-s")).strip()
+ print(sh.git("commit", "-a", "-m", f"Release {version}"))
+ print(sh.git("push"))
+ dist_dir = (
+ pathlib.Path(str(sh.git("rev-parse", "--show-toplevel")).strip()) / "dist"
+ )
+ asset = str(sh.tail(sh.grep(sh.ls("-1tr", dist_dir), ".whl"), "-n1")).strip()
+ try:
+ previous_version_string = (
+ str(sh.git("describe", "--tags", abbrev=0)).strip() + ".."
+ )
+ except sh.ErrorReturnCode_128:
+ previous_version_string = ""
+ changelog = sh.git(
+ "--no-pager",
+ "log",
+ "--no-color",
+ f"{previous_version_string}HEAD^",
+ "--oneline",
+ "--no-decorate",
+ )
+ # XXX print(sh.gh("release", "create", version, f"dist/{asset}", notes=changelog))
+ print(sh.git("pull"))
+ print(f"Published release {version}!")
+ if stashed:
+ try:
+ sh.git("stash", "pop", "--index")
+ except sh.ErrorReturnCode_1:
+ pass
+ sh.git("mergetool", "pyproject.toml")
+ return 0
+
+
+@main.register()
+class List:
+
+ """list installed distributions"""
+
+ def setup(self, add_arg):
+ # add_arg(
+ # "dists",
+ # nargs="*",
+ # completer=complete_distname,
+ # help="name of distribution(s) to list",
+ # )
+ add_arg("-d", "--deps", action="store_true", help="include dependencies")
+ add_arg(
+ "-v", "--verbosity", action="count", default=0, help="show more details"
+ )
+
+ def run(self, stdin, log):
+ get_dist = pkg.listing.get_distributions
+ for dist in sorted(
+ self.dists if self.dists else get_dist(dependencies=self.deps)
+ ):
+ self.print_dist(dist)
+ return 0
+
+ def print_dist(self, dist):
+ d = pkg.listing.get_distribution(dist)
+ details = d.details
+ name = details.pop("name")
+ version = details.pop("version")
+ if not self.machine:
+ print("/c/{}".format(name), end=" ")
+
+ def machined_print(
+ *prefixes, value, color="d,w", prefix=True, indent="", end="\n"
+ ):
+ """
+ machinable human output
+
+ """
+ prefixes = list(prefixes)
+ if self.machine:
+ prefixes.insert(0, name)
+ end = "\n"
+ else:
+ if len(prefixes) > 1:
+ prefixes = prefixes[1:]
+ if not prefix:
+ prefixes = []
+
+ padding = " " if prefixes and not self.machine else ""
+
+ if isinstance(value, list):
+ _vals = ["/{}/{}/X/".format(color, v) for v in value]
+ value = [padding + (" " if self.machine else ", ").join(_vals)]
+ else:
+ value = ["{}/{}/{}/X/".format(padding, color, value)]
+ if not self.machine and indent:
+ try:
+ indent = " " * indent
+ except TypeError:
+ pass
+ print(indent, end="")
+
+ print(
+ *(["/lg/{}/X/".format(p) for p in prefixes] + value),
+ sep="/X//lg/:/X/",
+ end=end,
+ )
+
+ machined_print(
+ "version",
+ value=version,
+ color="lr,Cle",
+ prefix=None,
+ end=": " if self.verbosity else "",
+ )
+
+ if self.verbosity:
+ summary = details.pop("summary")
+ if summary.lower().startswith(name + " "):
+ summary = summary[len(name) :].lstrip(":")
+ summary = summary.strip(". ")
+ machined_print("summary", value=summary, prefix=None)
+
+ reqs = sorted(details["reqs"])
+ if reqs:
+ machined_print("requires", value=reqs, color="lm", indent=2)
+
+ mods = sorted(details["mods"])
+ if mods:
+ machined_print("provides", value=mods, color="m", indent=2)
+ entrances = details["entry-points"]
+ if "term.apps" in entrances:
+ entrances.pop("console_scripts", None)
+ else:
+ try:
+ entrances["term.apps"] = entrances["console_scripts"]
+ entrances.pop("console_scripts")
+ except KeyError:
+ pass
+ if entrances:
+ for e_cls, e_points in sorted(entrances.items()):
+ if self.machine:
+ for e_pnt, e_obj in sorted(e_points.items()):
+ o = "{}={}:{}".format(e_pnt, e_obj[0], e_obj[1][0])
+ machined_print(
+ "entrances", e_cls, indent=4, color="g", value=o
+ )
+ else:
+ indent = 6 + len(e_cls)
+ joiner = "\n" + (" " * (indent))
+ o = joiner.join(
+ "/g/{}/X/ /d,lg/{}:{}" "/X/".format(ep, eo[0], eo[1][0])
+ for ep, eo in sorted(e_points.items())
+ )
+ machined_print("entrances", e_cls, value=o, indent=4, color="g")
+
+ location = str(d.location)
+ home_dir = str(d.location.home())
+ if location.startswith(home_dir):
+ location = "~" + location[len(home_dir) :]
+ if d.is_dirty():
+ location += " /r/*/X/"
+ machined_print("installed", indent=2, color="d,b", value=location)
+
+ if self.verbosity > 1:
+ if details["url"]:
+ machined_print(
+ "website", value=details["url"].rstrip("/"), color="b", indent=2
+ )
+ if details["download_url"]:
+ machined_print(
+ "download",
+ color="b",
+ indent=2,
+ value=details["download_url"].rstrip("/"),
+ )
+
+ raw_license = details.pop("license")
+ if raw_license != "UNKNOWN":
+ try:
+ if " and " in raw_license:
+ license = "/".join(
+ pkg.licensing.get_license(l).abbr
+ for l in raw_license.split(" and ")
+ )
+ elif " or " in raw_license:
+ license = "/".join(
+ pkg.licensing.get_license(l).abbr
+ for l in raw_license.split(" or ")
+ )
+ else:
+ license = pkg.licensing.get_license(raw_license).abbr
+ except KeyError:
+ license = raw_license
+ machined_print("license", value=license, color="y", indent=2)
+
+ for prs, prs_det in sorted(details["people"].items()):
+ email = list(prs_det.items())[0][1]
+ o = "/lr/{}/X/ /lg/</X//b/{}/X//lg/>/X/".format(prs, email)
+ machined_print("authors", value=o, color="lg", indent=2)
+ print()
+
+
+@main.register()
+class Add:
+
+ """add distributions"""
+
+ def setup(self, add_arg):
+ add_arg("dist", nargs="+", help="name of distribution(s) to install")
+
+ def run(self, stdin, log):
+ for dist in self.dist:
+ print("Adding:")
+ pkg.add(dist)
+
+
+@main.register()
+class Remove:
+
+ """remove distributions"""
+
+ def setup(self, add_arg):
+ ...
+ # add_arg(
+ # "dist",
+ # nargs="+",
+ # completer=complete_distname,
+ # help="name of distribution(s) to uninstall",
+ # )
+
+ def run(self, stdin, log):
+ for dist in self.dist:
+ print("Removing:")
+ d = pkg.install.pkg_resources.get_distribution(dist)
+ self.print_tree(d, pkg.install.get_orphans(dist))
+ pkg.remove(dist, clean_reqs=True)
+
+ def print_tree(self, dist, orphans, indent=0):
+ """"""
+ print(
+ " " * indent * 4,
+ "{0.project_name} {0.version} ({0.location})".format(dist),
+ sep="",
+ )
+ for req in pkg.listing._requires(dist):
+ if req in orphans:
+ self.print_tree(req, orphans, indent + 1)
index 0000000..e69de29
index 0000000..e823048
--- /dev/null
+"""
+install & uninstall distributions
+
+"""
+
+# TODO CHEESESHOP = "https://lahacker.net/software"
+
+import os
+import shlex
+
+import pkg_resources
+
+from . import listing
+
+# XXX import pip.__main__
+
+
+__all__ = ["add", "remove", "get_orphans"]
+
+
+class VirtualEnvironmentError(Exception):
+
+ """
+ raised when an action is attemped outside a virtual environment
+
+ """
+
+
+def add(*distributions, editable=False):
+ """
+ instruct `pip` to install given distributions `dists`
+
+ `pip` will automatically fetch and install required dependencies.
+
+ """
+ # TODO verify w/ GPG
+ args = ["install"]
+ if editable:
+ args.append("-e")
+ for dist in distributions:
+ pipmain(*args, str(dist))
+ # TODO "--no-index -f", CHEESESHOP,
+ # TODO "-i https://pypi.python.org/simple/",
+ write_log("installed", distributions)
+
+
+def remove(*distributions, clean_reqs=False):
+ """
+ instruct `pip` to uninstall given `distributions`
+
+ Set `clean_reqs` True to remove distributions' orphaned requirements.
+
+ """
+ for distribution in distributions:
+ for dist in get_orphans(distribution):
+ pipmain("uninstall", "-y", dist.project_name)
+ write_log("remove", distributions)
+
+
+def pipmain(*args):
+ """"""
+ status = pip.__main__._main(shlex.split(" ".join(args)))
+ # XXX pip.logger.consumers = [] # reset accumulated loggers after each run
+ return status
+
+
+def write_log(action, distributions):
+ """"""
+ # TODO timestamp & sign w/ GPG
+ try:
+ venv_dir = os.environ["VIRTUAL_ENV"]
+ except KeyError:
+ raise VirtualEnvironmentError()
+ with open(os.path.join(venv_dir, "package.log"), "a") as fp:
+ print(f"{action}:", " ".join(str(d) for d in distributions), file=fp)
+
+
+def get_orphans(distribution):
+ """"""
+ dist = pkg_resources.get_distribution(distribution)
+ return _find_all_dead(listing.get_graph(), set([dist]))
+
+
+def _find_all_dead(graph, start):
+ """"""
+ return _fixed_point(lambda d: _find_dead(graph, d), start)
+
+
+def _fixed_point(f, x):
+ """"""
+ while True:
+ y = f(x)
+ if y == x:
+ return x
+ x = y
+
+
+def _find_dead(graph, dead):
+ """"""
+
+ def is_killed_by_us(node):
+ succ = graph[node]
+ return succ and not (succ - dead)
+
+ return dead | set(filter(is_killed_by_us, graph))
index 0000000..8d7c0e7
--- /dev/null
+"""
+terms of use for software, content & data
+
+"""
+
+import collections
+import distutils.version
+import json
+import pkg_resources
+import re
+
+__all__ = ["get_license"]
+
+
+index = {}
+uris = {}
+features = collections.defaultdict(list)
+pattern = re.compile(
+ r"""(?xi)
+ ^
+ ([\s\w]+?)
+ (
+ \s?
+ (license|l)
+ \s?
+ )?
+ (
+ \s?
+ (version|v)
+ \s?
+ ([.\w]+)
+ )?
+ $"""
+)
+
+
+def get_license(identifier):
+ """
+ return best matching license for given `identifier` (name *or* URI)
+
+ An identifier may contain version information.
+
+ >>> long = get_license("Affero General Public License version 3")
+ >>> short = get_license("Affero General Public")
+ >>> abbreviated = get_license("AGPL")
+ >>> long == short == abbreviated
+ True
+ >>> long
+ <licensing.License: Affero General Public License v3>
+
+ """
+ _load()
+
+ if "/" in identifier and " " not in identifier:
+ if identifier.startswith(("http://", "https://", "//")):
+ identifier = identifier.partition("//")[2]
+ try:
+ req_name, req_version = uris[identifier]
+ except KeyError:
+ raise KeyError("`{}` not found in index".format(identifier))
+ else:
+ identifier = identifier.replace("_", " ")
+ try:
+ req_name, req_version = pattern.match(identifier).groups()[::5]
+ except AttributeError:
+ raise KeyError("`{}` not found in index".format(identifier))
+
+ def cmp(kv):
+ return distutils.version.LooseVersion(str(kv[0]))
+
+ for name, versions in index.items():
+ abbreviation = "".join(w[0] for w in name.split() if w not in {"of"}).lower()
+ if req_name.lower().strip() not in (name.lower(), abbreviation):
+ continue
+ for version, details in sorted(versions.items(), key=cmp):
+ if version == req_version:
+ break
+ if req_version and version != req_version:
+ err_msg = "unknown version `{}` for `{}`"
+ raise KeyError(err_msg.format(req_version, name))
+ return _License(
+ name=name,
+ version=version,
+ abbr=details["abbr"],
+ uri=details["uri"],
+ features=details.get("features", []),
+ )
+ raise KeyError("`{}` not found in index".format(identifier))
+
+
+class _License(collections.namedtuple("License", "name version abbr uri features")):
+
+ """
+ a `NamedTuple` for licenses
+
+ >>> license = _License(name="Affero General Public", version="3",
+ ... abbr="AGPLv3", uri="gnu.org/licenses/agpl.html",
+ ... features=["dfsg","gpl","fsf","osi","copyleft"])
+ >>> license.name
+ 'Affero General Public'
+ >>> license.version
+ '3'
+ >>> license.uri
+ 'gnu.org/licenses/agpl.html'
+ >>> license.features
+ ['dfsg', 'gpl', 'fsf', 'osi', 'copyleft']
+
+ >>> repr(license) #doctest: +ELLIPSIS
+ '<licensing.License: Affero General Public License v3>'
+ >>> str(license)
+ 'Affero General Public License v3 (gnu.org/licenses/agpl.html)'
+ >>> license.is_compatible("copyleft")
+ True
+
+ """
+
+ @property
+ def canonical(self):
+ version = ""
+ if self.version != "0":
+ version = " v" + self.version
+ return "".join((self.name, " License", version))
+
+ def is_compatible(self, compatibility):
+ return compatibility in self.features
+
+ def __repr__(self):
+ return "<licensing.License: {}>".format(self.canonical)
+
+ def __str__(self):
+ return "{} ({})".format(self.canonical, self.uri)
+
+
+def _load():
+ """
+ populate license index
+
+ """
+ global index
+ global uris
+ global features
+ if index:
+ return
+ licenses = json.loads(
+ pkg_resources.resource_string(__name__, "licenses.json").decode("utf-8")
+ )
+ index.update(licenses.items())
+ for name, versions in licenses.items():
+ for version, details in versions.items():
+ license = name, version
+ uris[details["uri"]] = license
+ for feature in details.get("features", []):
+ features[feature].append(license)
index 0000000..23eec8b
--- /dev/null
+{
+ "Affero General Public": {
+ "3": {
+ "abbr": "AGPLv3",
+ "features": [
+ "dfsg",
+ "gpl",
+ "fsf",
+ "osi",
+ "copyleft"
+ ],
+ "uri": "gnu.org/licenses/agpl"
+ }
+ },
+ "Apache Software": {
+ "1": {
+ "abbr": "Apache 1.0",
+ "features": [
+ "fsf",
+ "osi"
+ ],
+ "uri": "apache.org/licenses/LICENSE-1.0"
+ },
+ "1.1": {
+ "abbr": "Apache 1.1",
+ "features": [
+ "fsf",
+ "osi"
+ ],
+ "uri": "apache.org/licenses/LICENSE-1.1"
+ },
+ "2": {
+ "abbr": "Apache 2.0",
+ "features": [
+ "dfsg",
+ "gpl",
+ "fsf",
+ "osi"
+ ],
+ "uri": "apache.org/licenses/LICENSE-2.0"
+ }
+ },
+ "Berkeley Software Distribution": {
+ "0": {
+ "abbr": "BSD",
+ "features": [
+ "dfsg",
+ "gpl",
+ "fsf",
+ "osi"
+ ],
+ "uri": "opensource.org/licenses/bsd-license"
+ }
+ },
+ "Massachusetts Institute of Technology": {
+ "0": {
+ "abbr": "MIT",
+ "date": 1988,
+ "features": [
+ "dfsg",
+ "fsf",
+ "osi",
+ "gpl"
+ ],
+ "uri": "opensource.org/licenses/MIT"
+ }
+ },
+ "Python Software Foundation": {
+ "0": {
+ "abbr": "PSF",
+ "uri": "opensource.org/licenses/Python-2.0"
+ }
+ },
+ "Zope Public": {
+ "2.1": {
+ "abbr": "ZPL",
+ "uri": "foundation.zope.org/agreements/ZPL_2.1.pdf"
+ }
+ }
+}
\ No newline at end of file
index 0000000..6d57f43
--- /dev/null
+"""
+list distributions
+
+"""
+
+import collections
+import json
+import os
+import pathlib
+import re
+
+import pkg_resources
+
+# from .discover import PackageRepoError, gitsh
+
+__all__ = ["get_graph", "get_distributions", "get_distribution"]
+
+
+def get_graph():
+ """
+ return a dict mapping env's installed distributions to their requirements
+
+ """
+ graph = {dist: set() for dist in pkg_resources.working_set}
+ for dist in pkg_resources.working_set:
+ for req in _requires(dist):
+ graph[req].add(dist)
+ return graph
+
+
+def _requires(dist):
+ """"""
+ return [pkg_resources.get_distribution(d) for d in dist.requires()]
+
+
+def get_distributions(dependencies=False):
+ """
+ return a list of installed distributions
+
+ """
+ if dependencies:
+ return list(pkg_resources.Environment())
+ return [
+ dist.project_name
+ for dist, required_by in get_graph().items()
+ if not required_by
+ ]
+ # and not dist.location.startswith("/usr/lib/")]
+
+
+def get_distribution(name):
+ """
+ return a dictionary containing details of given installed distribution
+
+ >>> dist = get_distribution("gmpg")
+ >>> dist["name"]
+ 'gmpg'
+
+ # >>> dist["home-page"]
+ # 'https://angelo.lahacker.net/software/source/projects/gmpg'
+ # >>> dist["summary"]
+ # 'a library for software packaging'
+ # >>> dist["license"]
+ # 'GNU Affero General Public License v3'
+
+ """
+ return Distribution(name)
+
+
+class Distribution:
+
+ """ """
+
+ def __init__(self, name):
+ dist = pkg_resources.get_distribution(name)
+ self.location = pathlib.Path(dist.location)
+ # TODO check if system installation
+ try:
+ env = pathlib.Path(os.environ["VIRTUAL_ENV"]).resolve()
+ except KeyError:
+ env = None
+ self.in_env = self.location in env.parents if env else False
+
+ try:
+ key = None
+ metadata = {}
+ for match in re.split(
+ r"^([A-Z][A-Za-z-]+): ",
+ dist.get_metadata("PKG-INFO"),
+ flags=re.MULTILINE,
+ )[3:]:
+ if key:
+ metadata[key.lower()] = match.rstrip()
+ key = None
+ else:
+ key = match
+ except FileNotFoundError:
+ try:
+ metadata = json.loads(dist.get_metadata("metadata.json"))
+ except FileNotFoundError:
+ try:
+ metadata = json.loads(dist.get_metadata("pydist.json"))
+ except FileNotFoundError:
+ metadata = {
+ "name": dist.project_name,
+ "version": dist.version,
+ "summary": "",
+ }
+ details = {
+ "name": metadata["name"],
+ "version": metadata["version"],
+ "summary": metadata["summary"],
+ "license": metadata.get("license", "UNKNOWN"),
+ "url": metadata.get("home-page", ""),
+ "download_url": metadata.get("download-url", ""),
+ "people": collections.defaultdict(dict),
+ }
+
+ if "contacts" in metadata: # for flake8 & requests package formats
+ for contact in metadata["contacts"]:
+ person = details["people"][contact["name"]]
+ person[contact["role"]] = contact["email"]
+ else:
+ try:
+ author = metadata["author"]
+ author_email = metadata["author-email"]
+ details["people"][author]["author"] = author_email
+ except KeyError:
+ pass
+ try:
+ maintainer = metadata["maintainer"]
+ maintainer_email = metadata["maintainer-email"]
+ details["people"][maintainer]["maintainer"] = maintainer_email
+ except KeyError:
+ pass
+ details["people"] = dict(details["people"])
+
+ try:
+ dep_links = dist.get_metadata("dependency_links.txt")
+ details["deps"] = dep_links.strip().splitlines()
+ except (KeyError, FileNotFoundError):
+ pass
+ mods = []
+ try:
+ mods = dist.get_metadata("top_level.txt").strip().splitlines()
+ except (KeyError, FileNotFoundError):
+ pass
+ finally:
+ details["mods"] = [mod for mod in mods if mod != "tests"]
+ details["reqs"] = {
+ r.project_name: [[list(s) for s in r.specs], list(r.extras)]
+ for r in dist.requires()
+ }
+ details["entry-points"] = entry_points = dict(dist.get_entry_map())
+ for group, group_eps in dist.get_entry_map().items():
+ entry_points[group] = {
+ n: (ep.module_name, ep.attrs) for n, ep in group_eps.items()
+ }
+ self.details = details
+
+ def __getitem__(self, name):
+ return self.details[name]
+
+ def is_dirty(self):
+ """ """
+ try:
+ dirty = bool(gitsh("status --porcelain", self.location))
+ except PackageRepoError:
+ dirty = False
+ return dirty
index 0000000..ac2f506
--- /dev/null
+#!/usr/bin/env bash
+VENV=$1
+. ${VENV}/bin/activate
+shift 1
+exec "$@"
+deactivate
index 0000000..7c8a2a1
--- /dev/null
+"""
+interrogate debian system packages
+
+"""
+
+# TODO use `src.git` to interrogate `etckeeper` git log
+
+__all__ = ["get_apt_history"]
+
+
+def get_apt_history():
+ """"""
+ log = []
+ with open("/var/log/apt/history.log") as fp:
+ contents = fp.read().strip()
+ if contents:
+ for raw_entry in fp.read().strip().split("\n\n"):
+ log.append(
+ dict(line.partition(": ")[::2] for line in raw_entry.split("\n"))
+ )
+ return log
index 0000000..89c34d0
--- /dev/null
+"""
+Tools for metamodern software development.
+
+Includes support for testing, syntax checking and metrics measurement
+using pytest, flake8, radon respectively.
+
+Provides code analysis and package/interface introspection.
+
+"""
+
+# TODO issue tracking, code review
+# TODO code analysis via pysonar2, psydiff
+# TODO facilitate anonymous A/B testing in the canopy
+
+import __future__
+
+import collections
+import importlib
+import inspect
+import json
+import os
+import pkgutil
+import re
+import subprocess
+import sys
+import textwrap
+import types
+import xml.etree.ElementTree
+
+import radon.complexity
+import radon.metrics
+import radon.raw
+from radon.complexity import cc_rank as rank_cc
+from radon.metrics import mi_rank as rank_mi
+
+from . import git
+
+__all__ = ["git", "get_api", "get_metrics", "rank_cc", "rank_mi"]
+
+
+languages = {"py": "Python", "c": "C", "html": "HTML", "css": "CSS", "js": "Javascript"}
+
+
+def get_metrics(code):
+ """
+ Return metrics for given code.
+
+ Uses radon to analyze line counts, complexity and maintainability.
+
+ """
+ return {
+ "lines": radon.raw.analyze(code),
+ "maintainability": radon.metrics.mi_visit(code, True),
+ "complexity": {o[0]: o[-1] for o in radon.complexity.cc_visit(code)},
+ }
+
+
+def generate_dependency_graph(project_name, project_dir="."):
+ subprocess.run(
+ [
+ # XXX "/srv/poetry/bin/poetry",
+ # XXX "run",
+ "pydeps",
+ project_name,
+ "--show-deps",
+ "--noshow",
+ "--max-bacon",
+ "2",
+ "--pylib",
+ "-x",
+ "os",
+ "re types",
+ "_*",
+ "enum",
+ ],
+ cwd=project_dir,
+ )
+
+
+def test(pkgdir="."):
+ """Test pkgdir with pytest and return test results."""
+ # TODO packages = pkg.discover(pkgdir).pop("packages", [])
+ proc = subprocess.Popen( # TODO use .run()
+ [
+ "pytest-gevent",
+ "--doctest-modules",
+ "--ignore",
+ "setup.py",
+ # XXX "--pep8",
+ "--cov",
+ ".", # TODO ",".join(packages),
+ "--cov-report",
+ "xml:.test_coverage.xml",
+ "--junit-xml",
+ ".test_results.xml",
+ "--doctest-glob",
+ "README*",
+ ],
+ env=os.environ,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ _, err = [x.decode("utf-8") for x in proc.communicate()]
+ return
+ return _parse_junit(), _parse_coverage(), err
+
+
+def _parse_junit(path=".test_results.xml"):
+ suite_tag = xml.etree.ElementTree.parse(str(path)).find("testsuite")
+ _suite = dict(suite_tag.attrib)
+ suite = {
+ "tests": int(_suite["tests"]),
+ "errors": int(_suite["errors"]),
+ "failures": int(_suite["failures"]),
+ "skipped": int(_suite["skipped"]),
+ "time": _suite["time"],
+ "cases": collections.defaultdict(collections.OrderedDict),
+ }
+ for case_tag in suite_tag:
+ case = dict(case_tag.attrib)
+ case["type"] = "success"
+ for child in case_tag:
+ if child.tag == "failure":
+ case["type"] = "failure"
+ case["message"] = child.attrib["message"]
+ elif child.tag == "system-out":
+ ...
+ if child.text:
+ case["output"] = child.text
+ test_identifier = ":".join((case.pop("classname"), case.pop("name")))
+ suite["cases"][test_identifier] = case
+ # XXX details = {"line": case["line"], "time": case["time"], "outcome": outcome}
+ # XXX suite["cases"][case["file"]][test_identifier] = details
+ return suite
+
+
+def _parse_coverage(path=".test_coverage.xml"):
+ coverages = {}
+ for package in list(list(xml.etree.ElementTree.parse(str(path)).getroot())[1]):
+ for case in list(list(package)[0]):
+ lines = []
+ for line in list(list(case)[1]):
+ lines.append((line.attrib["number"], line.attrib["hits"]))
+ coverages[case.attrib["filename"]] = (
+ round(float(case.attrib["line-rate"]) * 100, 1),
+ lines,
+ )
+ return coverages
+
+
+# def count_sloc(self):
+# """
+# count Source Lines Of Code
+#
+# """
+# # TODO accrue statistics
+# line_counts = collections.defaultdict(int)
+#
+# def handle(file):
+# line_count = 0
+# suffix = file.suffix.lstrip(".")
+# if suffix in languages:
+# with file.open() as fp:
+# lines = fp.readlines()
+# for line in lines[:10]:
+# if line.rstrip() == "# noqa":
+# break
+# else:
+# line_count = len(lines)
+# line_counts[suffix] += line_count
+# yield
+# if line_count:
+# print(" /d,lg/{}/X/".format(line_count), end="")
+# self.position += 3 + len(str(line_count))
+# yield
+#
+# def summarize():
+# # TODO commify
+# print("Source Lines of Code:")
+# # print("--------------------", end="\n\n") TODO markdown output
+# # (`cli` feature to uniform output to HTML for pipe to web agent)
+# total = 0
+# for suffix, line_count in line_counts.items():
+# print(" {:15}{:>10}".format(languages[suffix], line_count))
+# total += line_count
+# print(" {:>25}".format(total))
+#
+# return handle, summarize
+
+
+def get_api(mod, pkg=None) -> dict:
+ """Return a dictionary containing contents of given module."""
+ mod = mod.removesuffix(".py")
+ if pkg:
+ mod = ".".join((pkg, mod))
+ try:
+ module = importlib.import_module(mod)
+ except Exception as err:
+ print(err)
+ module = None
+ members = []
+ if module:
+ members = _get_namespace_members(module)
+ details = {"name": mod, "mod": module, "members": members, "descendants": {}}
+ try:
+ mod_location = module.__path__
+ for _, _mod, __ in pkgutil.iter_modules(mod_location):
+ details["descendants"][_mod] = get_api(_mod, pkg=mod)
+ except AttributeError:
+ pass
+ return json.loads(JSONEncoder().encode(details))
+
+
+class JSONEncoder(json.JSONEncoder):
+ def default(self, obj):
+ details = {"doc": inspect.getdoc(obj)}
+ if callable(obj):
+ try:
+ details["sig"] = str(inspect.signature(obj))
+ except ValueError:
+ print(f"can't get signature for builtin {obj}")
+ if isinstance(obj, types.ModuleType):
+ metrics = None
+ if obj.__name__ not in sys.stdlib_module_names:
+ metrics = get_metrics(get_code(obj))
+ details.update(
+ **{
+ "type": "module",
+ "all": getattr(obj, "__all__", []),
+ "metrics": metrics,
+ }
+ )
+ elif isinstance(obj, type):
+ details.update(**{"type": "class"})
+ elif isinstance(obj, types.FunctionType):
+ details.update(**{"type": "function"})
+ elif isinstance(obj, object):
+ details.update(**{"type": "object"})
+ elif isinstance(obj, __future__._Feature):
+ details.update(**{"type": "future feature"})
+ else:
+ return json.JSONEncoder.default(self, obj)
+ return details
+
+
+# def get_api(mod, pkg=None) -> dict:
+# """Return a dictionary containing contents of given module."""
+# if pkg:
+# mod = ".".join((pkg, mod))
+# try:
+# module = importlib.import_module(mod)
+# except Exception as err:
+# print(err)
+# module = None
+# members = []
+# if module:
+# members = _get_namespace_members(module)
+# details = {"name": mod, "mod": module, "members": members, "descendants": {}}
+# try:
+# mod_location = module.__path__
+# for _, _mod, __ in pkgutil.iter_modules(mod_location):
+# details["descendants"][_mod] = get_api(_mod, pkg=mod)
+# except AttributeError:
+# pass
+# return details
+
+
+def get_doc(obj):
+ """Return a two-tuple of object's first line and rest of docstring."""
+ docstring = obj.__doc__
+ if not docstring:
+ return "", ""
+ return inspect.cleandoc(docstring).partition("\n\n")[::2]
+
+
+def _get_namespace_members(mod): # NOQA FIXME
+ modules = inspect.getmembers(mod, inspect.ismodule)
+ # for name, m in inspect.getmembers(m, inspect.ismodule):
+ # if inspect.getmodule(mod) != m:
+ # continue
+ # modules.append((name, m))
+ exceptions = []
+ for name, exc in inspect.getmembers(mod, _isexception):
+ if inspect.getmodule(exc) != mod:
+ continue
+ exceptions.append((name, exc))
+ functions = []
+ for name, func in get_members(mod, "function"):
+ if inspect.getmodule(func) != mod:
+ continue
+ functions.append((name, func))
+ classes = []
+ for name, cls in get_members(mod, "class"):
+ # if inspect.getmodule(cls) != mod:
+ # continue
+ if (name, cls) in exceptions:
+ continue
+ classes.append((name, cls))
+ global_mems = []
+ defaults = (
+ "__all__",
+ "__builtins__",
+ "__cached__",
+ "__doc__",
+ "__file__",
+ "__loader__",
+ "__name__",
+ "__package__",
+ "__spec__",
+ )
+ for global_mem in inspect.getmembers(mod):
+ if (
+ global_mem in modules
+ or global_mem in exceptions
+ or global_mem in functions
+ or global_mem in classes
+ or global_mem[0] in defaults
+ ):
+ continue
+ global_mems.append(global_mem)
+ return modules, global_mems, exceptions, functions, classes
+
+
+def _isexception(obj):
+ return inspect.isclass(obj) and issubclass(obj, Exception)
+
+
+# XXX def _isfunction_or_datadescriptor(obj):
+# XXX return inspect.isfunction(obj) or inspect.isdatadescriptor(obj)
+
+
+def get_members(obj, pred, hidden=True):
+ """Return a list of object's members."""
+ pub = []
+ hid = []
+ keywords = {
+ "function": ("def ", "("),
+ "class": ("class ", ":("),
+ "datadescriptor": ("def ", "("),
+ "function_or_datadescriptor": ("def ", "("),
+ }
+ document_order = []
+ for line in get_code(obj).splitlines():
+ keyword, delimiter = keywords[pred]
+ if line.lstrip().startswith(keyword):
+ match = re.search(r" ([A-Za-z0-9_]+)[{}]".format(delimiter), line)
+ document_order.append(match.groups()[0])
+ try:
+ pred_handler = getattr(inspect, "is" + pred)
+ except AttributeError:
+ pred_handler = globals().get("is" + pred)
+ members = dict(inspect.getmembers(obj, pred_handler))
+ for name in document_order:
+ try:
+ _obj = members[name]
+ except KeyError:
+ continue
+ (hid if name.startswith("_") else pub).append((name, _obj))
+ return (pub + hid) if hidden else pub
+
+
+def get_source(obj):
+ """
+ Return the string representation of given object's code.
+
+ Comments are stripped and code is dedented for easy parsing.
+
+ """
+ lines, lineno = inspect.getsourcelines(obj)
+ code = "".join(line for line in lines if not line.lstrip().startswith("#"))
+ docstring = getattr(obj, "__doc__", None)
+ if docstring is not None:
+ code = code.replace('"""{}"""'.format(docstring), "", 1)
+ return textwrap.dedent(code), lineno
+
+
+def get_code(obj):
+ """
+ Return a string containing the source code of given object.
+
+ The declaration statement and any associated docstring will be removed.
+
+ """
+ # TODO use sourcelines to return line start no
+ try:
+ source = inspect.getsource(obj)
+ except (OSError, TypeError):
+ source = ""
+ if obj.__doc__:
+ source = source.partition('"""')[2].partition('"""')[2]
+ if not source.strip():
+ source = source.partition("\n")[2]
+ return textwrap.dedent(source)
index 0000000..fcf44fc
--- /dev/null
+"""An opinionated Git interface."""
+
+from __future__ import annotations
+
+import collections
+import difflib
+import re
+import subprocess
+import textwrap
+import xml.sax.saxutils
+from pathlib import Path
+
+import pendulum
+
+# TODO update subprocess usage
+
+__all__ = ["get_repo", "clone_repo", "colorize_diff", "Repository"]
+
+
+def get_repo(
+ location: Path | str = ".", init=False, bare=False, gpg_home: Path | str = None
+) -> Repository:
+ """Return a Repository for given location."""
+ location = Path(location)
+ if gpg_home:
+ gpg_home = Path(gpg_home)
+ # if not Path(location).exists():
+ if init:
+ args = ["git", "init", "-b", "main", str(location)]
+ if bare:
+ args.append("--bare")
+ subprocess.check_call(args)
+ # else:
+ # raise FileNotFoundError("repository does not exist "
+ # "at {}".format(str(location)))
+ return Repository(location, gpg_home=gpg_home)
+
+
+def clone_repo(source, destination, bare=False) -> Repository:
+ """Clone source repository and return a Repository of destination."""
+ args = ["git", "clone", str(source), str(destination)]
+ if bare:
+ args.append("--bare")
+ subprocess.Popen(args)
+ subprocess.Popen(["git", "checkout", "-b", "main"], cwd=str(destination))
+ return Repository(destination)
+
+
+def colorize_diff(diff) -> list:
+ """Return HTML for presenting given unified diff."""
+ files = []
+ for filediff in re.split(r"diff --git [\w/._]+ [\w/._]+\n", str(diff))[1:]:
+ lines = filediff.split("\n")
+ current = {"changes": []}
+ current["index"] = lines[0]
+ current["from"], current["to"] = lines[1], lines[2]
+ changes = re.split(
+ r"^@@ (-\d+,\d+ \+\d+,\d+) @@(.*)$",
+ "\n".join(lines[3:]),
+ flags=re.MULTILINE,
+ )[1:]
+ grouped_changes = zip(*(changes[i::3] for i in (0, 1, 2)))
+ for changed_linespec, _, changed_lines in grouped_changes:
+ changed_linenos = re.match(
+ r"-(\d+),(\d+) \+(\d+),(\d+)", changed_linespec
+ ).groups()
+ current["changes"].append((changed_linenos, changed_lines))
+ # diff_spec = re.match(r"@@ -(\d+),(\d+) +(\d+),(\d+) @@.+", lines[3])
+ # fromstart, fromlength, tostart, tolength = diff_spec.groups()
+ # current["from"].append()
+ # current["lines"] = [first_line]
+ # current["lines"].extend(lines[4:-1])
+ files.append(current)
+ return files
+
+ # html = ["<div class=diff>"]
+ # for line in diff.split("\n"):
+ # html.append("<div class=''>{}</div>".format(line))
+ # html.append("</div>")
+ # return "\n".join(html)
+
+
+def _colorize_diff(diff): # NoQA FIXME
+ lines = diff.splitlines()
+ lines.reverse()
+ while lines and not lines[-1].startswith("@@"):
+ lines.pop()
+ yield "<div class=diff>"
+ while lines:
+ line = lines.pop()
+ klass = ""
+ if line.startswith("@@"):
+ klass = "control"
+ elif line.startswith("-"):
+ klass = "delete"
+ if lines:
+ _next = []
+ while lines and len(_next) < 2:
+ _next.append(lines.pop())
+ if _next[0].startswith("+") and (
+ len(_next) == 1 or _next[1][0] not in ("+", "-")
+ ):
+ aline, bline = _line_diff(line[1:], _next.pop(0)[1:])
+ yield "<div class=delete>-{}</div>".format(aline)
+ yield "<div class=insert>+{}</div>".format(bline)
+ if _next:
+ lines.append(_next.pop())
+ continue
+ lines.extend(reversed(_next))
+ elif line.startswith("+"):
+ klass = "insert"
+ yield "<div class={}>{}</div>".format(klass, _escape(line))
+ yield "</div>"
+
+
+def _line_diff(a, b):
+ aline = []
+ bline = []
+ tpl = "<span class=highlight>{}</span>"
+ for tag, i1, i2, j1, j2 in difflib.SequenceMatcher(a=a, b=b).get_opcodes():
+ if tag == "equal":
+ aline.append(_escape(a[i1:i2]))
+ bline.append(_escape(b[j1:j2]))
+ continue
+ aline.append(tpl.format(_escape(a[i1:i2])))
+ bline.append(tpl.format(_escape(b[j1:j2])))
+ return "".join(aline), "".join(bline)
+
+
+def _escape(text):
+ return xml.sax.saxutils.escape(text, {" ": " "})
+
+
+class Repository:
+ """A git repository."""
+
+ location: Path
+ gpg_home: Path
+
+ def __init__(self, location: Path | str, gpg_home: Path | str = None):
+ """
+ Return a Repository instance for git repository at given location.
+
+ Use gpg_home to provide an alternate GPG directory.
+
+ """
+ self.location = Path(location)
+ if gpg_home:
+ self.gpg_home = Path(gpg_home)
+
+ def git(self, *command_args):
+ """Yield lines of output from running git with `command_args`."""
+ option_args = {}
+ try:
+ option_args["env"] = {"GNUPGHOME": str(self.gpg_home)}
+ except AttributeError:
+ pass
+ return [
+ line.strip().decode("utf-8")
+ for line in subprocess.check_output(
+ ["git", "-C", str(self.location)] + list(command_args), **option_args
+ ).splitlines()
+ ]
+
+ def exists(self):
+ return (self.location / ".git").exists()
+
+ def add(self, *files):
+ """Add files to the index."""
+ if not files:
+ files = ["*"]
+ return self.git("add", *files)
+
+ def config(self, name, value):
+ """Set repository options."""
+ return self.git("config", name, value)
+
+ def commit(self, message, author=None, key=None):
+ """Record changes to the repository."""
+ args = []
+ if author:
+ args.extend(["--author", author])
+ if key:
+ args.append(f"-S{key}")
+ details = self._gitlines("commit", "-m", message, *args)
+ short_hash = re.match(r".+ ([\w\d]{7})\]", details[0]).group(1)
+ return self[short_hash]
+
+ def fetch_into_bare(self, repository="origin", refspec="master:master"):
+ """.""" # TODO
+ self.git("fetch", repository, refspec)
+
+ def push(self):
+ """Update remote refs along with associated objects."""
+ self.git("push")
+
+ def pull(self):
+ """Fetch from and integrate with another repository or branch."""
+ self.git("pull")
+
+ def show(self, gitobject):
+ """Show various types of objects."""
+ return self.git("--no-pager", "show", gitobject)
+
+ def diff(self, start=None, end=None):
+ """Show changes between commits, commit and working tree, etc."""
+ args = []
+ if start is None:
+ start = "HEAD"
+ if end is None:
+ end = start
+ start = end + "^"
+ if start and end:
+ args.extend((start, end))
+ # if start is None and end is None:
+ # args = []
+ # if start is not None and end is None:
+ # args = [start + "^", start]
+ return self.git("--no-pager", "diff", "--no-color", *args)
+
+ @property
+ def files(self):
+ """Show information about files in the index and the working tree."""
+ return [
+ (Path(self.location) / path).relative_to(self.location)
+ for path in self.git("ls-files")
+ ]
+
+ @property
+ def status(self):
+ """Show the working tree status."""
+ return self.git("status", "--porcelain")
+
+ @property
+ def changed_files(self):
+ """Compare files in the working tree and the index."""
+ return self.git("diff-files")
+
+ @property
+ def remotes(self):
+ """Yield 3-tuples of a remote's `name`, `url` and `context`."""
+ for remote in self._gitlines("--no-pager", "remote", "-v"):
+ if not remote:
+ continue
+ name, url, context = remote.split()
+ yield name, url, context.strip("()")
+
+ def update_server_info(self):
+ self.git("update-server-info")
+
+ def drift_from_push_remote(self):
+ """
+ Return 2-tuple containing (direction, distance) from push remote.
+
+ Direction is `ahead` or `behind`. Distance is an integer of commits.
+
+ """
+ match = re.match(
+ r"\[(ahead|behind) (\d+)\]",
+ "\n".join(
+ self.git("for-each-ref", "--format", "%(push:track)", "refs/heads")
+ ),
+ )
+ if match:
+ return match.groups()
+
+ def create_branch(self, name):
+ """Create a new branch."""
+ return self.git("branch", name)
+
+ @property
+ def branches(self):
+ """Return a list of branches."""
+ branches = []
+ for branch in self._gitlines("branch", "-a", "--no-color"):
+ active, _, name = branch.partition(" ")
+ branches.append((name, bool(active)))
+ return branches
+
+ @property
+ def tags(self):
+ """Return a list of tags."""
+ tags = []
+ for tag_id in reversed(self._gitlines("--no-pager", "tag")):
+ if not tag_id:
+ continue
+ details = []
+ signature = []
+
+ def get_details(line):
+ details.append(line.strip())
+
+ def get_signature(line):
+ signature.append(line.strip())
+
+ self.git("tag", "-v", tag_id, _out=get_details, _err=get_signature)
+ tag_object, tag_type, _, tag_tagger, _, tag_message = details
+ timestamp = pendulum.from_timestamp(float(tag_tagger.split()[-2]))
+ tags.append(
+ (tag_id, tag_object.split()[1], timestamp, signature[1].split()[-1])
+ )
+ return tags
+
+ @property
+ def log(self, selector=None) -> collections.OrderedDict:
+ """
+ Return a list of commits.
+
+ `selector` can be a number of recent commits (-1, -2, etc.) or the
+ hash of a specific commit.
+
+ """
+ entries = collections.OrderedDict()
+ current_hash = None
+
+ def get_lines(line):
+ nonlocal current_hash
+ if line.startswith("commit "):
+ current_hash = line.split()[1]
+ entries[current_hash] = {"hash": current_hash, "message": ""}
+ elif line.startswith("gpg: using"):
+ entries[current_hash]["pubkey"] = line.split()[-1]
+ elif line.startswith("Author:"):
+ (
+ entries[current_hash]["author_name"],
+ _,
+ entries[current_hash]["author_email"],
+ ) = (
+ line.partition(": ")[2].strip(">\n").partition(" <")
+ )
+ elif line.startswith("Date:"):
+ dt = pendulum.from_format(
+ line.partition(": ")[2], "YYYY-MM-DD HH:mm:ss Z"
+ )
+ entries[current_hash]["timestamp"] = dt.in_timezone("UTC")
+ elif not line.startswith("gpg: "):
+ entries[current_hash]["message"] += line + "\n"
+
+ args = []
+ if selector:
+ args.append(selector)
+ try:
+ for line in self.git(
+ "--no-pager",
+ "log",
+ "--date=iso",
+ "--no-color",
+ "--show-signature",
+ *args,
+ ):
+ get_lines(line)
+ except subprocess.CalledProcessError:
+ pass
+ else:
+ for commit in entries.keys():
+ entries[commit]["message"] = textwrap.dedent(
+ entries[commit]["message"]
+ ).strip()
+ return entries
+
+ def __getitem__(self, hash):
+ """Return the commit for given hash."""
+ return list(self.log(hash).values())[0]
+
+ def _gitlines(self, *args, **kwargs) -> list:
+ """Return a list of the result of a git command split by lines."""
+ return self.git(*args, **kwargs).rstrip().split("\n")
index 0000000..e38e54e
--- /dev/null
+"""Spawn Integrated Development Environment."""
+
+import os
+import pathlib
+import subprocess
+
+HOME = pathlib.Path("~").expanduser()
+WORKING = HOME / "code/working"
+TUNNEL_SERVER = "159.89.143.168"
+# XXX WIDTH = "54"
+
+# TODO CREATE THE TMUX and media pane
+# TODO bell inside bangarang (social reader) rings term for mentions
+
+
+def new_window(title, working_dir, command):
+ """Create a new window."""
+ pane_id = _create("new-window", "-c", working_dir, "-n", title, "-d", "-P")
+ _title_and_run(pane_id, working_dir, command)
+ return pane_id
+
+
+def split_window(reference_pane_id, orientation, quantity, working_dir, command):
+ """Split an existing window."""
+ quantity_type, _, quantity_size = quantity.partition(" ")
+ pane_id = _create(
+ "split-window",
+ "-c",
+ working_dir,
+ f"-{orientation}",
+ quantity_type,
+ quantity_size,
+ "-d",
+ "-t",
+ reference_pane_id,
+ "-P",
+ )
+ _title_and_run(pane_id, working_dir, command)
+ return pane_id
+
+
+def _create(*args):
+ """Create a pane using `args` and return its pane id."""
+ return (
+ subprocess.run(
+ ["tmux"] + list(args),
+ stdout=subprocess.PIPE,
+ )
+ .stdout.decode()
+ .strip()
+ )
+
+
+def _title_and_run(pane_id, working_dir, command):
+ """Title the pane `package` and run `command`."""
+ subprocess.run(
+ ["tmux", "select-pane", "-t", pane_id, "-T", f" {working_dir.name} "]
+ )
+ subprocess.run(["tmux", "send-keys", "-t", pane_id, command, "ENTER"])
+
+
+libs_tools = (
+ "canopy",
+ "easyuri",
+ "gfxint",
+ "gmpg",
+ "newmath",
+ "python-indieauth",
+ "python-microformats",
+ "python-micropub",
+ "python-webmention",
+ "sqlyte",
+ "txtint",
+ "understory",
+ "webagt",
+ "webint",
+)
+apps = [
+ "auth",
+ "code",
+ "data",
+ "editor",
+ "guests",
+ "live",
+ "media",
+ "mentions",
+ "owner",
+ "posts",
+]
+sites = (
+ "ragt.ag",
+ # "1856.house",
+ # "indieweb.rocks",
+ # "canopy.garden",
+)
+
+
+# XXX def send_keys(*args):
+# XXX return subprocess.run(("tmux", "send-keys") + args)
+
+
+# XXX def open_pyproject(ref):
+# XXX return send_keys("-t", ref, ":vsp pyproject.toml", "ENTER")
+
+
+def main():
+ """Spawn all windows."""
+ # XXX (1) Media
+ # XXX ref = new_window("media", HOME, "weechat")
+ # XXX split_window(ref, "h", f"-l {WIDTH}", HOME, "ssh pi@family_room")
+
+ # XXX # (1) Libraries & Tools
+ # XXX # TODO "mpcli-py", "mpcli-js"
+ # XXX ref = new_window("libraries", WORKING / libs_tools[0], "vi -S Session.vim")
+ # XXX # XXX open_pyproject(ref)
+ # XXX split_window(ref, "h", f"-l {WIDTH}", WORKING, "ls")
+ # XXX for lib_tool in libs_tools[1:]:
+ # XXX ref = split_window(
+ # XXX ref,
+ # XXX "v",
+ # XXX "-p 80",
+ # XXX WORKING / lib_tool,
+ # XXX f"vi {lib_tool}/__init__.py",
+ # XXX )
+ # XXX # XXX open_pyproject(ref)
+
+ # XXX # (2) Understory Core
+ # XXX ref = new_window("understory", WORKING / "understory", "vi -S Session.vim")
+ # XXX # ref = new_window("understory", WORKING / "understory", "vi web/__init__.py")
+ # XXX split_window(ref, "h", f"-l {WIDTH}", WORKING / "understory", "ls")
+ # XXX # XXX open_pyproject(ref)
+
+ # XXX # (2) Web Applications
+ # XXX ref = new_window(
+ # XXX "webapps",
+ # XXX WORKING / f"webint-{apps[0]}",
+ # XXX "vi -S Session.vim",
+ # XXX )
+ # XXX # XXX open_pyproject(ref)
+ # XXX split_window(ref, "h", f"-l {WIDTH}", WORKING, "ls")
+ # XXX for app in apps[1:]:
+ # XXX ref = split_window(
+ # XXX ref, "v", "-p 80", WORKING / f"webint-{app}", "vi -S Session.vim"
+ # XXX )
+ # XXX # XXX open_pyproject(ref)
+
+ # XXX # XXX # (3-n) Websites
+ # XXX for n, site in enumerate(sites):
+ # XXX port = 4010 + n
+ # XXX tunnel_port = port + 1000
+ # XXX site_name = site.replace(".", "_")
+ # XXX ref = new_window(site, WORKING / site, "vi -S Session.vim")
+ # XXX # XXX ref = new_window(site, WORKING / site, f"vi {site_name}/__init__.py")
+ # XXX # XXX open_pyproject(ref)
+ # XXX split_window(
+ # XXX ref,
+ # XXX "h",
+ # XXX f"-l {WIDTH}",
+ # XXX WORKING / site / site,
+ # XXX f"WEBCTX=dev poetry run web dev {site_name}:app --port {port}",
+ # XXX )
+ # XXX subprocess.run(
+ # XXX [
+ # XXX "ssh",
+ # XXX "-f",
+ # XXX "-N",
+ # XXX "-R",
+ # XXX f"{tunnel_port}:localhost:{tunnel_port}",
+ # XXX f"root@{TUNNEL_SERVER}",
+ # XXX ]
+ # XXX )
+
+ col_libs_tools = new_window("code", WORKING / libs_tools[0], "vi -S Session.vim")
+ col_sites = None
+ for n, site in enumerate(sites):
+ port = 4010 + (n * 10)
+ tunnel_port = port + 1000
+ site_name = site.replace(".", "_")
+ args = [
+ WORKING / site / site,
+ f"WEBCTX=dev poetry run web dev {site_name}:app"
+ f" --port {port} --watch {WORKING}",
+ ]
+ if n == 0:
+ col_sites = split_window(col_libs_tools, "h", "-l 54", *args)
+ else:
+ split_window(col_sites, "v", "-p 20", *args)
+ # subprocess.run(
+ # [
+ # "ssh",
+ # "-f",
+ # "-N",
+ # "-R",
+ # f"{tunnel_port}:localhost:{tunnel_port}",
+ # f"root@{TUNNEL_SERVER}",
+ # ]
+ # )
+ col_webapps = split_window(
+ col_libs_tools, "h", "-l 60", WORKING / sites[0], "vi -S Session.vim"
+ )
+ for app in reversed(apps):
+ split_window(
+ col_webapps, "v", "-p 15", WORKING / f"webint-{app}", "vi -S Session.vim"
+ )
+ for site in reversed(sites[1:]):
+ split_window(col_webapps, "v", "-p 15", WORKING / site, "vi -S Session.vim")
+ for lib_tool in reversed(libs_tools[1:]):
+ split_window(
+ col_libs_tools, "v", "-p 15", WORKING / lib_tool, "vi -S Session.vim"
+ )
+
+
+if __name__ == "__main__":
+ if os.getenv("TMUX"):
+ main()
+ else:
+ subprocess.run(["tmux", "new", "-x", "200", "-d", "python3", __file__])
index 0000000..2a15201
--- /dev/null
+"""
+Tools for metamodern software packaging.
+
+Package detail discovery and automated setup.
+
+"""
+
+import inspect
+import pathlib
+import re
+import subprocess
+import typing
+from importlib.machinery import SourceFileLoader
+
+import pydeps
+import toml
+
+# XXX from pkg_resources import DistributionNotFound
+# XXX from pkg_resources import iter_entry_points as get_entry_points
+
+# XXX from .install import add, remove
+# XXX from .listing import get_distribution
+# XXX from .system import get_apt_history
+
+__all__ = [
+ # XXX "DistributionNotFound",
+ # XXX "get_entry_points",
+ "auto_discover",
+ "discover",
+ "get_repo_files",
+ # XXX "add",
+ # XXX "remove",
+ # XXX "get_distribution",
+ # XXX "get_apt_history",
+]
+
+currently_discovering = False
+
+
+def get_current_project(project_dir=".") -> typing.MutableMapping:
+ """Return a dict of `pyproject.toml` in `project_dir`."""
+ with (pathlib.Path(project_dir) / "pyproject.toml").open() as fp:
+ return toml.load(fp)
+
+
+def get_current_packages(project_dir=".") -> list:
+ """Return a list of `pyproject.toml` in `project_dir`."""
+ project = get_current_project(project_dir)["tool"]["poetry"]
+ try:
+ packages = [p["include"] for p in project["packages"]]
+ except KeyError:
+ project_name = project["name"].replace(".", "_")
+ if not (pathlib.Path(project_dir) / project_name).exists():
+ project_name = f"{project_name}.py"
+ packages = [project_name]
+ return packages
+
+
+def strip_local_dev_deps(project_dir="."):
+ """Remove path-based development dependencies and add gmpg."""
+ pyproject_path = pathlib.Path(project_dir) / "pyproject.toml"
+ try:
+ with pyproject_path.open() as fp:
+ pyproject = toml.load(fp)
+ except FileNotFoundError:
+ return
+ try:
+ dev_deps = pyproject["tool"]["poetry"]["group"]["dev"]["dependencies"]
+ except KeyError:
+ return
+ for dep_name, dep_location in dict(dev_deps).items():
+ if isinstance(dep_location, dict):
+ if "path" in dep_location:
+ dev_deps.pop(dep_name)
+ dev_deps["gmpg"] = "^0.0"
+ with pyproject_path.open("w") as fp:
+ toml.dump(pyproject, fp)
+
+
+def detail_package(self):
+ """
+ a knowledge tree extension for detailing the contents of Python packages
+
+ """
+ packages = []
+
+ def handle(file):
+ z = discover(file)
+ print(z)
+ yield
+ print("XXX")
+ yield
+
+ def summarize():
+ print("{} packages found: {}".format(len(packages), ", ".join(packages)))
+
+ return handle, summarize
+
+
+class PackageRepoError(Exception):
+
+ """
+ raised when there exists a halting flaw in the package design
+
+ """
+
+
+def discover(pkgdir: str) -> dict:
+ """
+ return a dictionary containing package details discovered at `pkgdir`
+
+ """
+ # TODO gpg verify
+ # TODO author from first commit and maintainer from last tag's commit
+ # TODO url=`hg paths default`; verify against ^https://{gpg.comment}/
+ # TODO dirty versions
+ # TODO long_description = inspect.getdoc(setup_mod)
+ # TODO kwargs["package_data"] = {"": ["*.dat", "*.json", "*.yaml"]}
+ pkgdir = pathlib.Path(pkgdir)
+ if pkgdir.name == "setup.py":
+ pkgdir = pkgdir.parent
+
+ import setuptools
+
+ global discover
+ currently_supplied_args = None
+
+ def get_supplied(**args):
+ nonlocal currently_supplied_args
+ currently_supplied_args = args
+
+ _setup, setuptools.setup = setuptools.setup, get_supplied
+ _discover, discover = discover, lambda x, **y: {}
+ _setup_loader = SourceFileLoader("setup", str(pkgdir / "setup.py"))
+ setup_mod = _setup_loader.load_module()
+ setuptools.setup, discover = _setup, _discover
+
+ comments = inspect.getcomments(setup_mod)
+ name = re.match(r"^# \[`(.*?)`\]\[1\]", comments).groups()[0]
+ description = "TODO use setup.py docstring"
+ # XXX re.match(r"^# \[`.*`\]\[1\]: (.*)", comments).groups()[0]
+ license_match = re.search(r"%\[([A-Za-z ]+)\]", comments)
+ try:
+ license = license_match.groups()[0]
+ except AttributeError:
+ license = "Unknown"
+ url = re.search(r"^# \[1\]: (.*)$", comments, re.M).groups()[0]
+ if url.startswith("//"):
+ url = "https:" + url
+ download_url = "{}.git".format(url)
+
+ install_requires = currently_supplied_args.get("requires", [])
+ entry_points = currently_supplied_args.get("provides", {})
+ try:
+ entry_points["console_scripts"] = entry_points["term.apps"]
+ except KeyError:
+ pass
+
+ versions = gitsh("tag -l --sort -version:refname", pkgdir)
+ version = versions.splitlines()[0].lstrip("v") if versions else "0.0"
+
+ committers = gitsh(
+ "--no-pager log --no-color | grep " '"^Author: " --color=never', pkgdir
+ ).splitlines()
+
+ def get_committer(index):
+ return re.match(r"Author: (.*) <(.*)>", committers[index]).groups()
+
+ author, author_email = get_committer(-1)
+ maintainer, maintainer_email = get_committer(0)
+
+ packages = setuptools.find_packages(str(pkgdir))
+ py_modules = [
+ p.stem for p in pkgdir.iterdir() if p.suffix == ".py" and p.stem != "setup"
+ ]
+
+ kwargs = {}
+ if packages:
+ kwargs["packages"] = packages
+ if py_modules:
+ kwargs["py_modules"] = py_modules
+
+ return dict(
+ name=name,
+ version=version,
+ description=description,
+ url=url,
+ download_url=download_url,
+ install_requires=install_requires,
+ entry_points=entry_points,
+ license=license,
+ author=author,
+ author_email=author_email,
+ maintainer=maintainer,
+ maintainer_email=maintainer_email,
+ **kwargs,
+ )
+
+
+def auto_discover(dist, _, setup_file):
+ """
+ a `distutils` setup keyword for automatic discovery using `discover`
+
+ >>> import setuptools # doctest: +SKIP
+ >>> setuptools.setup(discover=__file__) # doctest: +SKIP
+
+ """
+ global currently_discovering
+ currently_discovering = True
+ details = discover(setup_file)
+ dist.packages = details.pop("packages", [])
+ dist.py_modules = details.pop("py_modules", [])
+ # dist.install_requires = details.pop("requires", [])
+ # dist.entry_points = details.pop("provides")
+ dist.metadata.author = details.get("author", "")
+ dist.metadata.author_email = details.get("author_email", "")
+ dist.__dict__.update(details)
+ dist.metadata.__dict__.update(details)
+
+
+def get_repo_files(setup_dir):
+ """
+ a `setuptools` file finder for finding installable files from a Git repo
+
+ """
+ if not currently_discovering:
+ return []
+ if not setup_dir:
+ setup_dir = "."
+ return gitsh("ls-files", setup_dir)
+
+
+def gitsh(command, working_dir):
+ """
+ return the output of running Git `command` in `working_dir`
+
+ """
+ raw_cmd = "git -C {} {}".format(working_dir, command)
+ try:
+ return subprocess.check_output(
+ raw_cmd, stderr=subprocess.STDOUT, shell=True
+ ).decode("utf-8")
+ except subprocess.CalledProcessError:
+ raise PackageRepoError("no Git repo at `{}`".format(working_dir))
index 0000000..77cf599
--- /dev/null
+[tool.poetry]
+name = "gmpg"
+version = "0.1.10"
+description = "tools for metamodern software development"
+keywords = ["Git", "Poetry"]
+homepage = "https://ragt.ag/code/projects/gmpg"
+repository = "https://ragt.ag/code/projects/gmpg.git"
+documentation = "https://ragt.ag/code/projects/gmpg/api"
+authors = ["Angelo Gladding <angelo@ragt.ag>"]
+license = "BSD-2-Clause"
+
+[tool.poetry.scripts]
+gmpg = "gmpg.__main__:main"
+
+[tool.poetry.dependencies]
+python = ">=3.8,<3.11"
+black = "^22.12.0"
+isort = "^5.11.4"
+pendulum = "^2.1.2"
+pydeps = "^1.11.0"
+pyright = "^1.1.291"
+mock = "^5.0.1"
+pytest-cov = "^4.0.0"
+pytest-pep8 = "^1.0.6"
+pytest-gevent = "^1.1.0"
+pytest = "^7.2.1"
+radon = "^5.1.0"
+responses = "^0.22.0"
+toml = "^0.10.2"
+txtint = ">=0.0.0"
+ipython = "^8.8.0"
+
+[tool.poetry.group.dev.dependencies]
+txtint = {path="../txtint", develop=true}
+
+# [[tool.poetry.source]]
+# name = "main"
+# url = "https://ragt.ag/code/pypi"
+
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
index 0000000..7fb8a64
--- /dev/null
+import gmpg
+
+
+def test_repos():
+ repo = gmpg.get_repo("_test_data", init=True)
+ assert len(repo.files) == 0