my eye

analysis.py

Raw

"""
Tools for metamodern software development.

Includes support for testing, syntax checking and metrics measurement
using pytest, flake8, radon respectively.

Provides code analysis and package/interface introspection.

"""

# TODO issue tracking, code review
# TODO code analysis via pysonar2, psydiff
# TODO facilitate anonymous A/B testing in the canopy

import __future__

import collections
import importlib
import inspect
import json
import os
import pathlib
import pkgutil
import re
import subprocess
import sys
import textwrap
import types
import xml.etree.ElementTree

import radon.complexity
import radon.metrics
import radon.raw
from radon.complexity import cc_rank as rank_cc
from radon.metrics import mi_rank as rank_mi

from . import git

__all__ = ["git", "get_api", "get_metrics", "rank_cc", "rank_mi"]


languages = {"py": "Python", "c": "C", "html": "HTML", "css": "CSS", "js": "Javascript"}


def get_metrics(code):
    """
    Return metrics for given code.

    Uses radon to analyze line counts, complexity and maintainability.

    """
    return {
        "lines": radon.raw.analyze(code),
        "maintainability": radon.metrics.mi_visit(code, True),
        "complexity": {o[0]: o[-1] for o in radon.complexity.cc_visit(code)},
    }


def generate_dependency_graph(project_name, project_dir="."):
    project_dir = pathlib.Path(project_dir)
    proc = subprocess.Popen(
        [
            "pydeps",
            project_name,
            "--no-show",
            "--reverse",
            "--rankdir",
            "BT",
            "--pylib",
            "-o",
            "deps.svg",
            "--show-deps",
        ],
        cwd=project_dir,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    with (project_dir / "deps.json").open("w") as fp:
        for line in proc.communicate():
            print(line.decode("utf-8"), file=fp)


def test(pkgdir="."):
    """Test pkgdir with pytest and return test results."""
    # TODO packages = pkg.discover(pkgdir).pop("packages", [])
    proc = subprocess.Popen(  # TODO use .run()
        [
            "pytest-gevent",
            "--doctest-modules",
            "-s",
            "-vv",
            "--ignore",
            "setup.py",
            # XXX "--pep8",
            "--cov",
            ".",  # TODO ",".join(packages),
            "--cov-report",
            "xml:test_coverage.xml",
            "--junit-xml",
            "test_results.xml",
            "--doctest-glob",
            "README*",
        ],
        env=os.environ,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    for line in proc.communicate():
        print(line.decode("utf-8"))
    return proc.returncode


def _parse_junit(path="test_results.xml"):
    suite_tag = xml.etree.ElementTree.parse(str(path)).find("testsuite")
    _suite = dict(suite_tag.attrib)
    suite = {
        "tests": int(_suite["tests"]),
        "errors": int(_suite["errors"]),
        "failures": int(_suite["failures"]),
        "skipped": int(_suite["skipped"]),
        "time": _suite["time"],
        "cases": collections.defaultdict(collections.OrderedDict),
    }
    if not suite_tag:
        return
    for case_tag in suite_tag:
        case = dict(case_tag.attrib)
        case["type"] = "success"
        for child in case_tag:
            if child.tag == "failure":
                case["type"] = "failure"
                case["message"] = child.attrib["message"]
            elif child.tag == "system-out":
                ...
            if child.text:
                case["output"] = child.text
        test_identifier = ":".join((case.pop("classname"), case.pop("name")))
        suite["cases"][test_identifier] = case
        # XXX details = {"line": case["line"], "time": case["time"], "outcome": outcome}
        # XXX suite["cases"][case["file"]][test_identifier] = details
    return suite


def _parse_coverage(path="test_coverage.xml"):
    coverages = {}
    for package in list(list(xml.etree.ElementTree.parse(str(path)).getroot())[1]):
        for case in list(list(package)[0]):
            lines = []
            for line in list(list(case)[1]):
                lines.append((line.attrib["number"], line.attrib["hits"]))
            coverages[case.attrib["filename"]] = (
                round(float(case.attrib["line-rate"]) * 100, 1),
                lines,
            )
    return coverages


# def count_sloc(self):
#     """
#     count Source Lines Of Code
#
#     """
#     # TODO accrue statistics
#     line_counts = collections.defaultdict(int)
#
#     def handle(file):
#         line_count = 0
#         suffix = file.suffix.lstrip(".")
#         if suffix in languages:
#             with file.open() as fp:
#                 lines = fp.readlines()
#                 for line in lines[:10]:
#                     if line.rstrip() == "# noqa":
#                         break
#                 else:
#                     line_count = len(lines)
#                     line_counts[suffix] += line_count
#         yield
#         if line_count:
#             print(" /d,lg/{}/X/".format(line_count), end="")
#             self.position += 3 + len(str(line_count))
#         yield
#
#     def summarize():
#         # TODO commify
#         print("Source Lines of Code:")
#         # print("--------------------", end="\n\n")  TODO markdown output
#         # (`cli` feature to uniform output to HTML for pipe to web agent)
#         total = 0
#         for suffix, line_count in line_counts.items():
#             print("  {:15}{:>10}".format(languages[suffix], line_count))
#             total += line_count
#         print("  {:>25}".format(total))
#
#     return handle, summarize


def get_api(mod, pkg=None) -> dict:
    """Return a dictionary containing contents of given module."""
    mod = mod.removesuffix(".py")
    if pkg:
        mod = ".".join((pkg, mod))
    try:
        module = importlib.import_module(mod)
    except Exception as err:
        print(err)
        module = None
    members = []
    if module:
        members = _get_namespace_members(module)
    details = {"name": mod, "mod": module, "members": members, "descendants": {}}
    try:
        mod_location = module.__path__
        for _, _mod, __ in pkgutil.iter_modules(mod_location):
            details["descendants"][_mod] = get_api(_mod, pkg=mod)
    except AttributeError:
        pass
    return json.loads(JSONEncoder().encode(details))


class JSONEncoder(json.JSONEncoder):
    def default(self, obj):
        details = {"doc": inspect.getdoc(obj)}
        if callable(obj):
            try:
                details["sig"] = str(inspect.signature(obj))
            except ValueError:
                print(f"can't get signature for builtin {obj}")
        if isinstance(obj, types.ModuleType):
            metrics = None
            if obj.__name__ not in sys.stdlib_module_names:
                metrics = get_metrics(get_code(obj))
            details.update(
                **{
                    "type": "module",
                    "all": getattr(obj, "__all__", []),
                    "metrics": metrics,
                }
            )
        elif isinstance(obj, type):
            details.update(**{"type": "class"})
        elif isinstance(obj, types.FunctionType):
            details.update(**{"type": "function"})
        elif isinstance(obj, object):
            details.update(**{"type": "object"})
        elif isinstance(obj, __future__._Feature):
            details.update(**{"type": "future feature"})
        else:
            return json.JSONEncoder.default(self, obj)
        return details


# def get_api(mod, pkg=None) -> dict:
#     """Return a dictionary containing contents of given module."""
#     if pkg:
#         mod = ".".join((pkg, mod))
#     try:
#         module = importlib.import_module(mod)
#     except Exception as err:
#         print(err)
#         module = None
#     members = []
#     if module:
#         members = _get_namespace_members(module)
#     details = {"name": mod, "mod": module, "members": members, "descendants": {}}
#     try:
#         mod_location = module.__path__
#         for _, _mod, __ in pkgutil.iter_modules(mod_location):
#             details["descendants"][_mod] = get_api(_mod, pkg=mod)
#     except AttributeError:
#         pass
#     return details


def get_doc(obj):
    """Return a two-tuple of object's first line and rest of docstring."""
    docstring = obj.__doc__
    if not docstring:
        return "", ""
    return inspect.cleandoc(docstring).partition("\n\n")[::2]


def _get_namespace_members(mod):  # NOQA FIXME
    modules = inspect.getmembers(mod, inspect.ismodule)
    # for name, m in inspect.getmembers(m, inspect.ismodule):
    #     if inspect.getmodule(mod) != m:
    #         continue
    #     modules.append((name, m))
    exceptions = []
    for name, exc in inspect.getmembers(mod, _isexception):
        if inspect.getmodule(exc) != mod:
            continue
        exceptions.append((name, exc))
    functions = []
    for name, func in get_members(mod, "function"):
        if inspect.getmodule(func) != mod:
            continue
        functions.append((name, func))
    classes = []
    for name, cls in get_members(mod, "class"):
        # if inspect.getmodule(cls) != mod:
        #     continue
        if (name, cls) in exceptions:
            continue
        classes.append((name, cls))
    global_mems = []
    defaults = (
        "__all__",
        "__builtins__",
        "__cached__",
        "__doc__",
        "__file__",
        "__loader__",
        "__name__",
        "__package__",
        "__spec__",
    )
    for global_mem in inspect.getmembers(mod):
        if (
            global_mem in modules
            or global_mem in exceptions
            or global_mem in functions
            or global_mem in classes
            or global_mem[0] in defaults
        ):
            continue
        global_mems.append(global_mem)
    return modules, global_mems, exceptions, functions, classes


def _isexception(obj):
    return inspect.isclass(obj) and issubclass(obj, Exception)


# XXX def _isfunction_or_datadescriptor(obj):
# XXX     return inspect.isfunction(obj) or inspect.isdatadescriptor(obj)


def get_members(obj, pred, hidden=True):
    """Return a list of object's members."""
    pub = []
    hid = []
    keywords = {
        "function": ("def ", "("),
        "class": ("class ", ":("),
        "datadescriptor": ("def ", "("),
        "function_or_datadescriptor": ("def ", "("),
    }
    document_order = []
    for line in get_code(obj).splitlines():
        keyword, delimiter = keywords[pred]
        if line.lstrip().startswith(keyword):
            match = re.search(r" ([A-Za-z0-9_]+)[{}]".format(delimiter), line)
            document_order.append(match.groups()[0])
    try:
        pred_handler = getattr(inspect, "is" + pred)
    except AttributeError:
        pred_handler = globals().get("is" + pred)
    members = dict(inspect.getmembers(obj, pred_handler))
    for name in document_order:
        try:
            _obj = members[name]
        except KeyError:
            continue
        (hid if name.startswith("_") else pub).append((name, _obj))
    return (pub + hid) if hidden else pub


def get_source(obj):
    """
    Return the string representation of given object's code.

    Comments are stripped and code is dedented for easy parsing.

    """
    lines, lineno = inspect.getsourcelines(obj)
    code = "".join(line for line in lines if not line.lstrip().startswith("#"))
    docstring = getattr(obj, "__doc__", None)
    if docstring is not None:
        code = code.replace('"""{}"""'.format(docstring), "", 1)
    return textwrap.dedent(code), lineno


def get_code(obj):
    """
    Return a string containing the source code of given object.

    The declaration statement and any associated docstring will be removed.

    """
    # TODO use sourcelines to return line start no
    try:
        source = inspect.getsource(obj)
    except (OSError, TypeError):
        source = ""
    if obj.__doc__:
        source = source.partition('"""')[2].partition('"""')[2]
    if not source.strip():
        source = source.partition("\n")[2]
    return textwrap.dedent(source)