my eye

Bootstrap TTS/STT and text readability

Committed daf149

index 0000000..d90ee90
--- /dev/null

+model
+vosk-model-*

index 0000000..810989e
--- /dev/null

+"""
+Media analysis and manipulation
+
+"""

index 0000000..6694f4a
--- /dev/null

+"""
+utility functions for breaking down a given block of text
+into it's component syntactic parts.
+
+"""
+
+import math
+import re
+import time
+
+import nltk
+from nltk.tokenize import RegexpTokenizer
+
+__all__ = ["Readability"]
+
+TOKENIZER = RegexpTokenizer(r"(?u)\W+|\$[\d\.]+|\S+")
+SPECIAL_CHARS = [".", ",", "!", "?"]
+
+
+def get_smog(text):
+    if not text:
+        return 0
+    return round(Readability(text).SMOGIndex())
+
+
+def get_char_count(words):
+    characters = 0
+    for word in words:
+        characters += len(word)
+    return characters
+
+
+def get_words(text=""):
+    words = []
+    words = TOKENIZER.tokenize(text)
+    filtered_words = []
+    for word in words:
+        if word in SPECIAL_CHARS or word == " ":
+            pass
+        else:
+            new_word = word.replace(",", "").replace(".", "")
+            new_word = new_word.replace("!", "").replace("?", "")
+            filtered_words.append(new_word)
+    return filtered_words
+
+
+def get_sentences(text=""):
+    try:
+        tokenizer = nltk.data.load("tokenizers/punkt/english.pickle")
+    except LookupError:
+        nltk.download("punkt")
+        while True:
+            try:
+                tokenizer = nltk.data.load("tokenizers/punkt/english.pickle")
+                break
+            except LookupError:
+                time.sleep(1)
+    sentences = tokenizer.tokenize(text)
+    return sentences
+
+
+def count_syllables(words):
+    syllableCount = 0
+    for word in words:
+        syllableCount += count(word)
+    return syllableCount
+
+
+# This method must be enhanced. At the moment it only
+# considers the number of syllables in a word.
+# This often results in that too many complex words are detected.
+def count_complex_words(text=""):
+    words = get_words(text)
+    sentences = get_sentences(text)
+    complex_words = 0
+    found = False
+    cur_word = []
+
+    for word in words:
+        cur_word.append(word)
+        if count_syllables(cur_word) >= 3:
+
+            # Checking proper nouns. If a word starts with a capital letter
+            # and is NOT at the beginning of a sentence we don't add it
+            # as a complex word.
+            if not (word[0].isupper()):
+                complex_words += 1
+            else:
+                for sentence in sentences:
+                    if str(sentence).startswith(word):
+                        found = True
+                        break
+                if found:
+                    complex_words += 1
+                    found = False
+
+        cur_word.remove(word)
+    return complex_words
+
+
+"""
+Fallback syllable counter
+
+This is based on the algorithm in Greg Fast's perl module
+Lingua::EN::Syllable.
+
+"""
+
+specialSyllables_en = """tottered 2
+chummed 1
+peeped 1
+moustaches 2
+shamefully 3
+messieurs 2
+satiated 4
+sailmaker 4
+sheered 1
+disinterred 3
+propitiatory 6
+bepatched 2
+particularized 5
+caressed 2
+trespassed 2
+sepulchre 3
+flapped 1
+hemispheres 3
+pencilled 2
+motioned 2
+poleman 2
+slandered 2
+sombre 2
+etc 4
+sidespring 2
+mimes 1
+effaces 2
+mr 2
+mrs 2
+ms 1
+dr 2
+st 1
+sr 2
+jr 2
+truckle 2
+foamed 1
+fringed 2
+clattered 2
+capered 2
+mangroves 2
+suavely 2
+reclined 2
+brutes 1
+effaced 2
+quivered 2
+h'm 1
+veriest 3
+sententiously 4
+deafened 2
+manoeuvred 3
+unstained 2
+gaped 1
+stammered 2
+shivered 2
+discoloured 3
+gravesend 2
+60 2
+lb 1
+unexpressed 3
+greyish 2
+unostentatious 5
+"""
+
+fallback_cache = {}
+
+fallback_subsyl = ["cial", "tia", "cius", "cious", "gui", "ion", "iou", "sia$", ".ely$"]
+
+fallback_addsyl = [
+    "ia",
+    "riet",
+    "dien",
+    "iu",
+    "io",
+    "ii",
+    "[aeiouy]bl$",
+    "mbl$",
+    "[aeiou]{3}",
+    "^mc",
+    "ism$",
+    "(.)(?!\\1)([aeiouy])\\2l$",
+    "[^l]llien",
+    "^coad.",
+    "^coag.",
+    "^coal.",
+    "^coax.",
+    "(.)(?!\\1)[gq]ua(.)(?!\\2)[aeiou]",
+    "dnt$",
+]
+
+
+# Compile our regular expressions
+for i in range(len(fallback_subsyl)):
+    fallback_subsyl[i] = re.compile(fallback_subsyl[i])
+for i in range(len(fallback_addsyl)):
+    fallback_addsyl[i] = re.compile(fallback_addsyl[i])
+
+
+def _normalize_word(word):
+    return word.strip().lower()
+
+
+# Read our syllable override file and stash that info in the cache
+for line in specialSyllables_en.splitlines():
+    line = line.strip()
+    if line:
+        toks = line.split()
+        assert len(toks) == 2
+        fallback_cache[_normalize_word(toks[0])] = int(toks[1])
+
+
+def count(word):
+    word = _normalize_word(word)
+    if not word:
+        return 0
+
+    # Check for a cached syllable count
+    count = fallback_cache.get(word, -1)
+    if count > 0:
+        return count
+
+    # Remove final silent "e"
+    if word[-1] == "e":
+        word = word[:-1]
+
+    # Count vowel groups
+    count = 0
+    prev_was_vowel = 0
+    for c in word:
+        is_vowel = c in ("a", "e", "i", "o", "u", "y")
+        if is_vowel and not prev_was_vowel:
+            count += 1
+        prev_was_vowel = is_vowel
+
+    # Add & subtract syllables
+    for r in fallback_addsyl:
+        if r.search(word):
+            count += 1
+    for r in fallback_subsyl:
+        if r.search(word):
+            count -= 1
+
+    # Cache the syllable count
+    fallback_cache[word] = count
+
+    return count
+
+
+class Readability:
+    """"""
+
+    def __init__(self, text):
+        self.analyze(text)
+
+    def analyze(self, text):
+        words = get_words(text)
+        char_count = get_char_count(words)
+        word_count = len(words)
+        sentence_count = len(get_sentences(text))
+        syllable_count = count_syllables(words)
+        complexwords_count = count_complex_words(text)
+        avg_words_p_sentence = word_count / sentence_count
+        self.stats = {
+            "words": words,
+            "char_cnt": float(char_count),
+            "word_cnt": float(word_count),
+            "sentence_cnt": float(sentence_count),
+            "syllable_cnt": float(syllable_count),
+            "complex_word_cnt": float(complexwords_count),
+            "avg_words_p_sentence": float(avg_words_p_sentence),
+        }
+
+    @property
+    def metrics(self):
+        return {
+            metric: getattr(self, metric)()
+            for metric in (
+                "ARI",
+                "FleschReadingEase",
+                "FleschKincaidGradeLevel",
+                "GunningFogIndex",
+                "SMOGIndex",
+                "ColemanLiauIndex",
+                "LIX",
+                "RIX",
+            )
+        }
+
+    def ARI(self):
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            score = (
+                4.71 * (self.stats["char_cnt"] / self.stats["word_cnt"])
+                + 0.5 * (self.stats["word_cnt"] / self.stats["sentence_cnt"])
+                - 21.43
+            )
+        return score
+
+    def FleschReadingEase(self):
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            score = (
+                206.835
+                - (1.015 * self.stats["avg_words_p_sentence"])
+                - (84.6 * (self.stats["syllable_cnt"] / self.stats["word_cnt"]))
+            )
+        return round(score, 4)
+
+    def FleschKincaidGradeLevel(self):
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            score = (
+                0.39 * self.stats["avg_words_p_sentence"]
+                + 11.8 * (self.stats["syllable_cnt"] / self.stats["word_cnt"])
+                - 15.59
+            )
+        return round(score, 4)
+
+    def GunningFogIndex(self):
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            score = 0.4 * (
+                self.stats["avg_words_p_sentence"]
+                + (100 * (self.stats["complex_word_cnt"] / self.stats["word_cnt"]))
+            )
+        return round(score, 4)
+
+    def SMOGIndex(self):
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            score = (
+                math.sqrt(
+                    self.stats["complex_word_cnt"] * (30 / self.stats["sentence_cnt"])
+                )
+                + 3
+            )
+        return score
+
+    def ColemanLiauIndex(self):
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            score = (
+                (5.89 * (self.stats["char_cnt"] / self.stats["word_cnt"]))
+                - (30 * (self.stats["sentence_cnt"] / self.stats["word_cnt"]))
+                - 15.8
+            )
+        return round(score, 4)
+
+    def LIX(self):
+        longwords = 0.0
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            for word in self.stats["words"]:
+                if len(word) >= 7:
+                    longwords += 1.0
+            score = (
+                self.stats["word_cnt"] / self.stats["sentence_cnt"]
+                + float(100 * longwords) / self.stats["word_cnt"]
+            )
+        return score
+
+    def RIX(self):
+        longwords = 0.0
+        score = 0.0
+        if self.stats["word_cnt"] > 0.0:
+            for word in self.stats["words"]:
+                if len(word) >= 7:
+                    longwords += 1.0
+            score = longwords / self.stats["sentence_cnt"]
+        return score

index 0000000..627ea49
--- /dev/null

+"""
+Voice input/output.
+
+"""
+
+import functools
+import logging
+import pathlib
+import queue
+import subprocess
+import sys
+
+import gtts
+import pyglet
+import requests
+import sounddevice
+import vosk
+import webagt
+
+ELEVENLABS_API = "https://api.elevenlabs.io/v1"
+
+vosk.SetLogLevel(-1)
+
+data_dir = pathlib.Path(".")  # pathlib.Path(__file__).parent
+vosk_base_url = "https://alphacephei.com/vosk/models/"
+vosk_archive_name = "vosk-model-small-en-us-0.15.zip"
+vosk_model_dir = data_dir / "model"
+
+
+def install():
+    """Ensure the models are present."""
+    if vosk_model_dir.exists():
+        return
+    logging.debug("installing Vosk model")
+    webagt.download(
+        f"{vosk_base_url}/{vosk_archive_name}", data_dir / vosk_archive_name
+    )
+    subprocess.run(["unzip", vosk_archive_name], cwd=data_dir)
+    subprocess.run(["mv", vosk_archive_name[:-4], "model"], cwd=data_dir)
+
+
+def speak(message, voice="google", elevenlabs_key=None):
+    """Play message in voice."""
+    audio = data_dir / "speech.wav"
+    if voice == "google":
+        gtts.gTTS(message).save(audio)
+    else:
+        get_audio(message, elevenlabs_key, filename=audio)
+    pyglet.resource.media(str(audio)).play()
+    audio.unlink()
+
+
+def transcribe():
+    """
+    Return a list of phrasal voice inputs.
+
+    - say "try again" to try the previous phrase again
+    - say "new paragraph" to start a new paragraph
+    - say "finished" when done
+
+    """
+    install()
+
+    phrases = []
+    paragraphs = []
+
+    q = queue.Queue()
+
+    def callback(indata, frames, time, status):
+        if status:
+            print(status, file=sys.stderr)
+        q.put(bytes(indata))
+
+    device = None
+    device_info = sounddevice.query_devices(device, "input")
+    samplerate = int(device_info["default_samplerate"])
+
+    with sounddevice.RawInputStream(
+        samplerate=samplerate,
+        blocksize=8000,
+        device=device,
+        dtype="int16",
+        channels=1,
+        callback=callback,
+    ):
+        rec = vosk.KaldiRecognizer(vosk.Model(str(vosk_model_dir)), samplerate)
+        while True:
+            data = q.get()
+            if rec.AcceptWaveform(data):
+                words = rec.Result()[14:-3]
+                if words == "try again":
+                    phrases.pop()
+                elif words == "new paragraph":
+                    paragraphs.append(phrases)
+                    phrases = []
+                    print(" " * 13 + "\n", end="\r", file=sys.stderr)
+                elif words == "finished":
+                    if phrases:
+                        paragraphs.append(phrases)
+                    print("", end="\r", file=sys.stderr)
+                    return paragraphs
+                else:
+                    if words:
+                        phrases.append(words)
+                        print(words, file=sys.stderr)
+            else:
+                words = rec.PartialResult()[17:-3]
+                if words.endswith("wait try again"):
+                    rec.Reset()
+                print(words, end="\r", file=sys.stderr)
+
+
+@functools.cache
+def get_voice(key):
+    return [
+        v
+        for v in requests.get(
+            f"{ELEVENLABS_API}/voices",
+            headers={"Accept": "application/json", "xi-api-key": key},
+        ).json()["voices"]
+        if v["name"] == "Angelo"
+    ][0]["voice_id"]
+
+
+def get_audio(text, key, filename=None):
+    audio = requests.post(
+        f"{ELEVENLABS_API}/text-to-speech/{get_voice(key)}/stream",
+        headers={
+            "Accept": "audio/mpeg",
+            "Content-Type": "application/json",
+            "xi-api-key": key,
+        },
+        json={
+            "text": text,
+            "model_id": "eleven_multilingual_v1",
+            "voice_settings": {"stability": 1, "similarity_boost": 1},
+        },
+    )
+    if filename:
+        with open(filename, "wb") as fp:
+            fp.write(audio.content)
+    else:
+        return audio.content
+
+
+if __name__ == "__main__":
+    print(transcribe())

index 0000000..96c5bdb
--- /dev/null

+"""Transcription command line application."""
+
+import sys
+
+from mahnamahna import voice
+
+
+def transcribe():
+    print("\r\n\r\n".join(". ".join(phrases) + "." for phrases in voice.transcribe()))
+    sys.exit()
+
+
+if __name__ == "__main__":
+    transcribe()

index 0000000..60cb855
--- /dev/null

+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
+
+[tool.poetry]
+name = "mahnamahna"
+version = "0.0.1"
+description = "Media analysis and manipulation"
+homepage = "https://ragt.ag/code/projects/mahnamahna"
+repository = "https://ragt.ag/code/projects/mahnamahna.git"
+documentation = "https://ragt.ag/code/projects/mahnamahna/api"
+authors = ["Angelo Gladding <angelo@ragt.ag>"]
+license = "AGPL-3.0-or-later"
+
+[tool.pyright]
+reportGeneralTypeIssues = false
+reportOptionalMemberAccess = false
+
+[tool.poetry.scripts]
+transcribe = "mahnamahna.voice:transcribe"
+
+[[tool.poetry.source]]
+name = "main"
+url = "https://ragt.ag/code/pypi"
+
+[tool.poetry.dependencies]
+python = ">=3.10,<3.11"
+vosk = "^0.3.32"
+gTTS = "^2.2.3"
+pyttsx3 = "^2.90"
+sounddevice = "^0.4.4"
+nltk = "^3.8.1"
+pyglet = "^2.0.10"
+webagt = "^0.2.3"