"""An opinionated Git interface."""
from __future__ import annotations
import collections
import difflib
import re
import subprocess
import textwrap
import xml.sax.saxutils
from pathlib import Path
import pendulum
# TODO update subprocess usage
__all__ = ["get_repo", "clone_repo", "colorize_diff", "Repository"]
def get_repo(
location: Path | str = ".", init=False, bare=False, gpg_home: Path | str = None
) -> Repository:
"""Return a Repository for given location."""
location = Path(location)
if gpg_home:
gpg_home = Path(gpg_home)
# if not Path(location).exists():
if init:
args = ["git", "init", "-b", "main", str(location)]
if bare:
args.append("--bare")
subprocess.check_call(args)
# else:
# raise FileNotFoundError("repository does not exist "
# "at {}".format(str(location)))
return Repository(location, gpg_home=gpg_home)
def clone_repo(source, destination, bare=False) -> Repository:
"""Clone source repository and return a Repository of destination."""
args = ["git", "clone", str(source), str(destination)]
if bare:
args.append("--bare")
subprocess.Popen(args)
subprocess.Popen(["git", "checkout", "-b", "main"], cwd=str(destination))
return Repository(destination)
def colorize_diff(diff) -> list:
"""Return HTML for presenting given unified diff."""
files = []
for filediff in re.split(r"diff --git [\w/._]+ [\w/._]+\n", str(diff))[1:]:
lines = filediff.split("\n")
current = {"changes": []}
current["index"] = lines[0]
current["from"], current["to"] = lines[1], lines[2]
changes = re.split(
r"^@@ (-\d+,\d+ \+\d+,\d+) @@(.*)$",
"\n".join(lines[3:]),
flags=re.MULTILINE,
)[1:]
grouped_changes = zip(*(changes[i::3] for i in (0, 1, 2)))
for changed_linespec, _, changed_lines in grouped_changes:
changed_linenos = re.match(
r"-(\d+),(\d+) \+(\d+),(\d+)", changed_linespec
).groups()
current["changes"].append((changed_linenos, changed_lines))
# diff_spec = re.match(r"@@ -(\d+),(\d+) +(\d+),(\d+) @@.+", lines[3])
# fromstart, fromlength, tostart, tolength = diff_spec.groups()
# current["from"].append()
# current["lines"] = [first_line]
# current["lines"].extend(lines[4:-1])
files.append(current)
return files
# html = ["<div class=diff>"]
# for line in diff.split("\n"):
# html.append("<div class=''>{}</div>".format(line))
# html.append("</div>")
# return "\n".join(html)
def _colorize_diff(diff): # NoQA FIXME
lines = diff.splitlines()
lines.reverse()
while lines and not lines[-1].startswith("@@"):
lines.pop()
yield "<div class=diff>"
while lines:
line = lines.pop()
klass = ""
if line.startswith("@@"):
klass = "control"
elif line.startswith("-"):
klass = "delete"
if lines:
_next = []
while lines and len(_next) < 2:
_next.append(lines.pop())
if _next[0].startswith("+") and (
len(_next) == 1 or _next[1][0] not in ("+", "-")
):
aline, bline = _line_diff(line[1:], _next.pop(0)[1:])
yield "<div class=delete>-{}</div>".format(aline)
yield "<div class=insert>+{}</div>".format(bline)
if _next:
lines.append(_next.pop())
continue
lines.extend(reversed(_next))
elif line.startswith("+"):
klass = "insert"
yield "<div class={}>{}</div>".format(klass, _escape(line))
yield "</div>"
def _line_diff(a, b):
aline = []
bline = []
tpl = "<span class=highlight>{}</span>"
for tag, i1, i2, j1, j2 in difflib.SequenceMatcher(a=a, b=b).get_opcodes():
if tag == "equal":
aline.append(_escape(a[i1:i2]))
bline.append(_escape(b[j1:j2]))
continue
aline.append(tpl.format(_escape(a[i1:i2])))
bline.append(tpl.format(_escape(b[j1:j2])))
return "".join(aline), "".join(bline)
def _escape(text):
return xml.sax.saxutils.escape(text, {" ": " "})
class Repository:
"""A git repository."""
location: Path
gpg_home: Path
def __init__(self, location: Path | str, gpg_home: Path | str = None):
"""
Return a Repository instance for git repository at given location.
Use gpg_home to provide an alternate GPG directory.
"""
self.location = Path(location)
if gpg_home:
self.gpg_home = Path(gpg_home)
def git(self, *command_args):
"""Yield lines of output from running git with `command_args`."""
option_args = {}
try:
option_args["env"] = {"GNUPGHOME": str(self.gpg_home)}
except AttributeError:
pass
return [
line.decode("utf-8")
for line in subprocess.check_output(
["git", "-C", str(self.location)] + list(command_args), **option_args
).splitlines()
]
def exists(self):
return (self.location / ".git").exists()
def add(self, *files):
"""Add files to the index."""
if not files:
files = ["*"]
return self.git("add", *files)
def config(self, name, value):
"""Set repository options."""
return self.git("config", name, value)
def commit(self, message, author=None, key=None):
"""Record changes to the repository."""
args = []
if author:
args.extend(["--author", author])
if key:
args.append(f"-S{key}")
details = self._gitlines("commit", "-m", message, *args)
short_hash = re.match(r".+ ([\w\d]{7})\]", details[0]).group(1)
return self[short_hash]
def fetch_into_bare(self, repository="origin", refspec="master:master"):
""".""" # TODO
self.git("fetch", repository, refspec)
def push(self):
"""Update remote refs along with associated objects."""
self.git("push")
def pull(self):
"""Fetch from and integrate with another repository or branch."""
self.git("pull")
def show(self, gitobject):
"""Show various types of objects."""
return self.git("--no-pager", "show", gitobject)
def diff(self, start=None, end=None):
"""Show changes between commits, commit and working tree, etc."""
args = []
if start is None:
start = "HEAD"
if end is None:
end = start
start = end + "^"
if start and end:
args.extend((start, end))
# if start is None and end is None:
# args = []
# if start is not None and end is None:
# args = [start + "^", start]
return self.git("--no-pager", "diff", "--no-color", *args)
@property
def files(self):
"""Show information about files in the index and the working tree."""
return [
(Path(self.location) / path).relative_to(self.location)
for path in self.git("ls-files")
]
@property
def status(self):
"""Show the working tree status."""
return self.git("status", "--porcelain")
@property
def changed_files(self):
"""Compare files in the working tree and the index."""
return self.git("diff-files")
@property
def remotes(self):
"""Yield 3-tuples of a remote's `name`, `url` and `context`."""
for remote in self._gitlines("--no-pager", "remote", "-v"):
if not remote:
continue
name, url, context = remote.split()
yield name, url, context.strip("()")
def update_server_info(self):
self.git("update-server-info")
def drift_from_push_remote(self):
"""
Return 2-tuple containing (direction, distance) from push remote.
Direction is `ahead` or `behind`. Distance is an integer of commits.
"""
match = re.match(
r"\[(ahead|behind) (\d+)\]",
"\n".join(
self.git("for-each-ref", "--format", "%(push:track)", "refs/heads")
),
)
if match:
return match.groups()
def create_branch(self, name):
"""Create a new branch."""
return self.git("branch", name)
@property
def branches(self):
"""Return a list of branches."""
branches = []
for branch in self._gitlines("branch", "-a", "--no-color"):
active, _, name = branch.partition(" ")
branches.append((name, bool(active)))
return branches
@property
def tags(self):
"""Return a list of tags."""
tags = []
for tag_id in reversed(self._gitlines("--no-pager", "tag")):
if not tag_id:
continue
details = []
signature = []
def get_details(line):
details.append(line.strip())
def get_signature(line):
signature.append(line.strip())
self.git("tag", "-v", tag_id, _out=get_details, _err=get_signature)
tag_object, tag_type, _, tag_tagger, _, tag_message = details
timestamp = pendulum.from_timestamp(float(tag_tagger.split()[-2]))
tags.append(
(tag_id, tag_object.split()[1], timestamp, signature[1].split()[-1])
)
return tags
def log(self, selector=None) -> collections.OrderedDict:
"""
Return a list of commits.
`selector` can be a number of recent commits (-1, -2, etc.) or the
hash of a specific commit.
"""
entries = collections.OrderedDict()
current_hash = None
def get_lines(line):
nonlocal current_hash
if line.startswith("commit "):
current_hash = line.split()[1]
entries[current_hash] = {"hash": current_hash, "message": ""}
elif line.startswith("gpg: using"):
entries[current_hash]["pubkey"] = line.split()[-1]
elif line.startswith("Author:"):
(
entries[current_hash]["author_name"],
_,
entries[current_hash]["author_email"],
) = (
line.partition(": ")[2].strip(">\n").partition(" <")
)
elif line.startswith("Date:"):
dt = pendulum.from_format(
line.partition(": ")[2], "YYYY-MM-DD HH:mm:ss Z"
)
entries[current_hash]["timestamp"] = dt.in_timezone("UTC")
elif not line.startswith("gpg: "):
entries[current_hash]["message"] += line + "\n"
args = []
if selector:
args.append(selector)
try:
for line in self.git(
"--no-pager",
"log",
"--date=iso",
"--no-color",
"--show-signature",
*args,
):
get_lines(line)
except subprocess.CalledProcessError:
pass
else:
for commit in entries.keys():
entries[commit]["message"] = textwrap.dedent(
entries[commit]["message"]
).strip()
return entries
def __getitem__(self, hash):
"""Return the commit for given hash."""
return list(self.log(hash).values())[0]
def _gitlines(self, *args, **kwargs) -> list:
"""Return a list of the result of a git command split by lines."""
return self.git(*args, **kwargs).rstrip().split("\n")