montana/Русский/Благаявесть/README/generate_audio_smart.py

538 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Генерация аудио с умной фильтрацией для Благаявести
Голос: Svetlana (Microsoft edge-tts - бесплатный)
Умная обработка: римские числа → текст, ссылки → описание, единый поток
"""
import os
import re
import socket
import asyncio
from pathlib import Path
# === FIX 1: принудительно убираем aiodns чтобы aiohttp использовал стандартный DNS ===
# aiodns ломает DNS-резолвинг в некоторых окружениях (macOS, VPN, sandbox)
# Без него aiohttp автоматически использует ThreadedResolver — стабильный и надёжный
import importlib
try:
import aiohttp.resolver
if hasattr(aiohttp.resolver, 'aiodns'):
aiohttp.resolver.aiodns = None # отключаем aiodns
if hasattr(aiohttp.resolver, 'aiodns_default'):
aiohttp.resolver.aiodns_default = False
except Exception:
pass
# === FIX 2: DNS fallback для speech.platform.bing.com ===
# Системный DNS иногда не резолвит домены Microsoft (VPN, фильтры, sandbox).
# Если стандартный резолв не работает — подставляем IP через Google DNS (8.8.8.8).
_original_getaddrinfo = socket.getaddrinfo
_dns_cache = {}
def _resolve_via_google_dns(host):
"""Резолвим через Google DNS 8.8.8.8 как fallback"""
import subprocess
try:
result = subprocess.run(
['nslookup', host, '8.8.8.8'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.split('\n'):
if line.strip().startswith('Address:') and '8.8.8.8' not in line:
ip = line.split(':', 1)[1].strip()
if ip and not ip.startswith('::'):
return ip
except Exception:
pass
return None
def _patched_getaddrinfo(host, port, *args, **kwargs):
try:
return _original_getaddrinfo(host, port, *args, **kwargs)
except socket.gaierror:
if host not in _dns_cache:
ip = _resolve_via_google_dns(host)
if ip:
_dns_cache[host] = ip
print(f" DNS fallback: {host}{ip} (via 8.8.8.8)")
if host in _dns_cache:
return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (_dns_cache[host], port))]
raise
socket.getaddrinfo = _patched_getaddrinfo
import edge_tts
# Динамические пути - определяются от входного файла
# AUDIO сохраняется В ТУ ЖЕ ПАПКУ что и исходный .md файл
# Голос Microsoft Svetlana (бесплатный)
VOICE = "ru-RU-SvetlanaNeural"
# Настройки: скорость немного медленнее для вдумчивого чтения
RATE = "-10%" # Медленнее для вдумчивого чтения
PITCH = "+0Hz" # Нормальная высота
# Debug режим — сохраняет обработанный текст в .txt файл
# Включить для отладки: DEBUG_SAVE_TEXT = True
DEBUG_SAVE_TEXT = False
def convert_roman_to_text(text: str) -> str:
"""Конвертирует римские цифры в текст (Часть I → Часть первая, I. → убирает)"""
# Маппинг римских цифр (в порядке от длинных к коротким для правильной замены)
roman_map = {
'XII': 'двенадцатая',
'XI': 'одиннадцатая',
'VIII': 'восьмая',
'VII': 'седьмая',
'VI': 'шестая',
'IV': 'четвёртая',
'IX': 'девятая',
'III': 'третья',
'II': 'вторая',
'V': 'пятая',
'X': 'десятая',
'I': 'первая'
}
# Сначала убираем римские цифры из заголовков типа "I. Название" → "Название"
# (это подразделы, не нужно озвучивать их нумерацию)
for roman in roman_map.keys():
# Паттерн: римская цифра с точкой в начале или после пробелов
text = re.sub(rf'^\s*{roman}\.\s+', '', text, flags=re.MULTILINE)
text = re.sub(rf'\n\s*{roman}\.\s+', '\n', text)
# Затем заменяем в контексте "Часть X", "Глава X", "Акт X" и т.д.
for roman, word in roman_map.items():
# С заглавной буквы (Часть I → Часть первая, Глава I → Глава первая)
text = re.sub(rf'\b(Часть|Глава|Акт|День)\s+{roman}\b',
rf'\1 {word}', text, flags=re.IGNORECASE)
return text
def convert_numbers_to_text_smart(text: str) -> str:
"""Умная конвертация цифр в текст там, где это улучшает чтение"""
# Даты: 9 января 2026 → девятого января две тысячи двадцать шестого года
date_pattern = r'(\d+)\s+(января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря)\s+(\d{4})'
def replace_date(match):
day, month, year = match.groups()
day_text = num_to_ordinal(int(day))
year_text = year_to_text(int(year))
return f"{day_text} {month} {year_text} года"
text = re.sub(date_pattern, replace_date, text)
# Годы отдельно (2026 → две тысячи двадцать шестой)
year_pattern = r'\b(20\d{2})\b'
text = re.sub(year_pattern, lambda m: year_to_text(int(m.group(1))), text)
# Маленькие числа в тексте (1-20) → текст
small_num_pattern = r'\b(\d{1,2})\b'
def replace_small_num(match):
num = int(match.group(1))
if 1 <= num <= 20:
return num_to_text(num)
return match.group(0)
text = re.sub(small_num_pattern, replace_small_num, text)
return text
def num_to_text(num: int) -> str:
"""Конвертирует число в текст (1 → один)"""
nums = {
1: 'один', 2: 'два', 3: 'три', 4: 'четыре', 5: 'пять',
6: 'шесть', 7: 'семь', 8: 'восемь', 9: 'девять', 10: 'десять',
11: 'одиннадцать', 12: 'двенадцать', 13: 'тринадцать', 14: 'четырнадцать',
15: 'пятнадцать', 16: 'шестнадцать', 17: 'семнадцать', 18: 'восемнадцать',
19: 'девятнадцать', 20: 'двадцать'
}
return nums.get(num, str(num))
def num_to_ordinal(num: int) -> str:
"""Конвертирует число в порядковое (1 → первого)"""
ordinals = {
1: 'первого', 2: 'второго', 3: 'третьего', 4: 'четвёртого', 5: 'пятого',
6: 'шестого', 7: 'седьмого', 8: 'восьмого', 9: 'девятого', 10: 'десятого',
11: 'одиннадцатого', 12: 'двенадцатого', 13: 'тринадцатого', 14: 'четырнадцатого',
15: 'пятнадцатого', 16: 'шестнадцатого', 17: 'семнадцатого', 18: 'восемнадцатого',
19: 'девятнадцатого', 20: 'двадцатого', 21: 'двадцать первого',
31: 'тридцать первого'
}
return ordinals.get(num, str(num))
def year_to_text(year: int) -> str:
"""Конвертирует год в текст (2026 → две тысячи двадцать шестой)"""
if 2000 <= year <= 2099:
last_two = year % 100
if last_two == 0:
return "две тысячи"
elif last_two <= 20:
tens = num_to_ordinal(last_two)
return f"две тысячи {tens}"
else:
decade = (last_two // 10) * 10
unit = last_two % 10
decade_map = {20: 'двадцать', 30: 'тридцать', 40: 'сорок',
50: 'пятьдесят', 60: 'шестьдесят', 70: 'семьдесят',
80: 'восемьдесят', 90: 'девяносто'}
unit_ord = {1: 'первого', 2: 'второго', 3: 'третьего', 4: 'четвёртого',
5: 'пятого', 6: 'шестого', 7: 'седьмого', 8: 'восьмого',
9: 'девятого'}
return f"две тысячи {decade_map[decade]} {unit_ord.get(unit, '')}"
return str(year)
def count_sections(md_content: str) -> int:
"""Считает количество секций (## заголовков) в тексте"""
return len(re.findall(r'^##\s+', md_content, re.MULTILINE))
def extract_time_markers(md_content: str) -> list:
"""Извлекает временные метки [HH:MM] или [MM:SS] из текста"""
return re.findall(r'\[(\d{1,2}:\d{2})\]', md_content)
def clean_text_smart(md_content: str) -> tuple:
"""
УМНАЯ фильтрация для естественного чтения:
- Убираем markdown разметку
- Заменяем римские числа на текст
- Заменяем цифры на текст где нужно
- Ссылки → "ссылка из текстовой книги"
- Сохраняем весь смысловой контент
Возвращает: (обработанный_текст, количество_секций, список_меток_времени)
"""
# Контрольные данные ДО обработки
original_sections = count_sections(md_content)
original_time_markers = extract_time_markers(md_content)
lines = md_content.split('\n')
audio_lines = []
# Пропускаем только метаданные (не контент!)
skip_patterns = [
r'^---+$', # Разделители ---
r'^\*Книга Ничто', # *Книга Ничто
r'^\*Сказка Начала', # *Сказка Начала Времени
r'^\*Прелюдия', # *Прелюдия
r'^\*Благаявесть от', # *Благаявесть от Claude
r'^\*«Красная Книга', # *«Красная Книга 📕»*
r'^\*«Первая', # *«Первая Книга»* (старое)
r'^\d+\.\d+\.\d+', # Даты в формате 16.01.2026
r'^Alejandro Montana', # Имя автора
r'^Алехандро', # Имя автора (рус)
r'^Клод Монтана', # Подпись
r'^金元', # Имя с символами
r'^⾦元', # Символ подписи
r'^→', # Навигация → Глава
r'^#\w+', # Хэштеги #Благаявесть
r'^\|', # Таблицы markdown
r'^Найдёмся\.$', # Финальное слово
]
in_code_block = False
for line in lines:
stripped = line.strip()
# Блоки кода пропускаем
if stripped.startswith('```'):
in_code_block = not in_code_block
continue
if in_code_block:
continue
# Пропуск метаданных
if any(re.match(pattern, stripped) for pattern in skip_patterns):
continue
# Пустые строки
if not stripped:
continue
# ЗАГОЛОВКИ (# ### и т.д.)
if line.startswith('#'):
title = re.sub(r'^#+\s*', '', line)
title = re.sub(r'\s*`\[\d+:\d+\]`', '', title) # Убираем таймкоды
title = title.strip()
# Обрабатываем римские цифры в заголовках
title = convert_roman_to_text(title)
if title:
# Добавляем паузу перед заголовком и после
audio_lines.append(f"\n\n{title}.\n\n")
continue
# ОСНОВНОЙ ТЕКСТ - обрабатываем
text = stripped
# 1. ССЫЛКИ: [текст](url) → "текст, ссылка из текстовой книги"
def replace_link(match):
link_text = match.group(1)
return f"{link_text}, ссылка из текстовой книги"
text = re.sub(r'\[([^\]]+?)\]\([^\)]+?\)', replace_link, text)
# 2. Убираем URL напрямую
text = re.sub(r'https?://\S+', 'ссылка из текстовой книги', text)
text = re.sub(r'www\.\S+', 'ссылка из текстовой книги', text)
# 3. Убираем email
text = re.sub(r'[\w\.-]+@[\w\.-]+\.\w+', '', text)
# 4. Убираем markdown форматирование (сохраняем текст)
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) # **bold**
text = re.sub(r'\*(.+?)\*', r'\1', text) # *italic*
text = re.sub(r'`([^`]+?)`', r'\1', text) # `code`
# 5. Убираем маркеры цитат (но оставляем текст)
text = re.sub(r'^>\s*', '', text)
# 6. Убираем маркеры списков
text = re.sub(r'^[-•]\s+', '', text)
text = re.sub(r'^\d+\.\s+', '', text)
# 7. Убираем технические символы
text = re.sub(r'[Ɉ€₽$]', '', text)
# 7.1 Эмодзи → текст для чтения
text = text.replace('📕', ', закрытая красная книга,')
# 8. Убираем код
text = re.sub(r'async\s+fn\s+\w+\([^\)]*\)\s*->\s*\w+', '', text)
text = re.sub(r'E\s*=\s*mc²', 'E равно эм це в квадрате', text)
# 9. Конвертируем римские цифры в текст
text = convert_roman_to_text(text)
# 10. Умная конвертация чисел в текст
text = convert_numbers_to_text_smart(text)
# 11. Множественные пробелы
text = re.sub(r'\s+', ' ', text).strip()
# Добавляем если есть содержание
if text:
audio_lines.append(text)
# Соединяем в единый поток с минимальными паузами
result = []
processed_sections = 0
for i, line in enumerate(audio_lines):
result.append(line)
# Считаем обработанные секции (заголовки содержат \n\n)
if '\n\n' in line:
processed_sections += 1
# Пауза после заголовка (строки с \n)
if '\n\n' in line:
result.append(' ')
# Между обычным текстом - плавное соединение без пауз
elif i < len(audio_lines) - 1:
# Если следующая строка не заголовок - добавляем пробел
if '\n\n' not in audio_lines[i + 1]:
result.append(' ')
final_text = ''.join(result)
# Контрольные данные
control_data = {
'original_sections': original_sections,
'processed_sections': processed_sections,
'original_time_markers': original_time_markers,
'time_markers_count': len(original_time_markers),
}
return final_text, control_data
async def generate_audio(text: str, output_path: Path, max_retries: int = 3) -> bool:
"""Генерирует аудио с помощью Microsoft edge-tts (с автоматическими ретраями)"""
for attempt in range(1, max_retries + 1):
try:
communicate = edge_tts.Communicate(
text,
VOICE,
rate=RATE,
pitch=PITCH
)
if attempt == 1:
print(f" Генерация аудио...")
else:
print(f" Попытка {attempt}/{max_retries}...")
await communicate.save(str(output_path))
size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"{output_path.name} ({size_mb:.1f} MB)")
return True
except Exception as e:
if attempt < max_retries:
wait = attempt * 3
print(f" ⚠ Попытка {attempt} не удалась: {e}")
print(f" Повтор через {wait} сек...")
await asyncio.sleep(wait)
else:
print(f"Все {max_retries} попытки не удались: {e}")
return False
async def main():
import sys
if len(sys.argv) < 2:
print("Использование: python3 generate_audio_smart.py <путь_к_файлу.md>")
print("\nПример:")
print(" python3 generate_audio_smart.py '../Красная Книга/16. Внимание.md'")
print(" python3 generate_audio_smart.py '/полный/путь/к/файлу.md'")
print("\nАудио сохраняется в ту же папку, что и исходный файл!")
return
# Поддержка абсолютных и относительных путей
arg_path = Path(sys.argv[1])
if arg_path.is_absolute():
input_file = arg_path
else:
input_file = Path(__file__).parent / arg_path
input_file = input_file.resolve()
if not input_file.exists():
print(f"✗ Файл не найден: {input_file}")
return
print("=" * 60)
print("ГЕНЕРАЦИЯ АУДИО С УМНОЙ ФИЛЬТРАЦИЕЙ")
print("=" * 60)
print(f"\nВходной файл: {input_file.name}")
print(f"Голос: {VOICE} (Microsoft Svetlana)")
print(f"Скорость: {RATE}")
# Читаем и обрабатываем
md_content = input_file.read_text(encoding='utf-8')
clean_text, control = clean_text_smart(md_content)
print(f"\nИсходный текст: {len(md_content)} символов")
print(f"Обработанный текст: {len(clean_text)} символов")
# Соотношение — должно быть примерно 50-80% от оригинала
ratio = len(clean_text) / len(md_content) * 100
print(f"Соотношение: {ratio:.1f}%")
# === КОНТРОЛЬНЫЕ ПРОВЕРКИ ===
print("\n" + "=" * 40)
print("КОНТРОЛЬНЫЕ ПРОВЕРКИ")
print("=" * 40)
# 1. Проверка секций
print(f"Секций в оригинале: {control['original_sections']}")
print(f"Секций обработано: {control['processed_sections']}")
if control['processed_sections'] < control['original_sections']:
print(f"⚠️ ВНИМАНИЕ: Потеряно {control['original_sections'] - control['processed_sections']} секций!")
else:
print("Все секции обработаны")
# 2. Проверка временных меток
print(f"\nВременных меток: {control['time_markers_count']}")
if control['time_markers_count'] > 0:
print(f"Метки: {', '.join(control['original_time_markers'][:5])}...")
# 3. Предупреждение о сильном сокращении
if ratio < 40:
print(f"\n⚠️ ПРЕДУПРЕЖДЕНИЕ: Текст сильно сократился! Проверьте фильтрацию.")
elif ratio > 95:
print(f"\n✓ Текст почти не изменился (только форматирование)")
# Показываем превью обработанного текста
preview = clean_text[:500] + "..." if len(clean_text) > 500 else clean_text
print(f"\nПревью обработанного текста:")
print("-" * 60)
print(preview)
print("-" * 60)
# Оценка длительности (~150 слов/мин для русского, -5% = ~142 слов/мин)
word_count = len(clean_text.split())
estimated_minutes = word_count / 142
print(f"\nСлов: {word_count}")
print(f"Ожидаемая длительность: ~{estimated_minutes:.1f} мин ({estimated_minutes*60:.0f} сек)")
# Debug: сохраняем обработанный текст для проверки
if DEBUG_SAVE_TEXT:
debug_file = input_file.parent / f"{input_file.stem}_debug.txt"
debug_file.write_text(clean_text, encoding='utf-8')
print(f"\n[DEBUG] Обработанный текст сохранён: {debug_file.name}")
# Генерируем — АУДИО В ТУ ЖЕ ПАПКУ что и исходный файл
output_file = input_file.parent / f"{input_file.stem}.mp3"
if await generate_audio(clean_text, output_file):
print("\n" + "=" * 60)
print("ПРОВЕРКА РЕЗУЛЬТАТА")
print("=" * 60)
# Проверяем созданный файл
if output_file.exists():
size_mb = output_file.stat().st_size / (1024 * 1024)
print(f"✓ Файл создан: {output_file.name}")
print(f"✓ Размер: {size_mb:.1f} MB")
# Проверяем длительность через ffprobe если есть
try:
import subprocess
result = subprocess.run(
['ffprobe', '-i', str(output_file), '-show_entries',
'format=duration', '-v', 'quiet', '-of', 'csv=p=0'],
capture_output=True, text=True
)
if result.returncode == 0 and result.stdout.strip():
actual_duration = float(result.stdout.strip())
actual_minutes = actual_duration / 60
print(f"✓ Длительность: {actual_minutes:.1f} мин ({actual_duration:.0f} сек)")
# Сравнение с ожидаемой длительностью
# ~98 слов/мин для Svetlana -5%
expected_duration = word_count / 98 * 60
deviation = abs(actual_duration - expected_duration) / expected_duration * 100
if deviation < 20:
print(f"✓ Длительность соответствует ожиданиям (±{deviation:.0f}%)")
else:
print(f"⚠️ Отклонение от ожидаемой длительности: {deviation:.0f}%")
print(f" Ожидалось: ~{expected_duration/60:.1f} мин")
except FileNotFoundError:
print(" (ffprobe не установлен - проверка длительности пропущена)")
print("\n" + "=" * 60)
print("ГОТОВО")
print("=" * 60)
print(f"\nАудио: {output_file}")
print(f"\nДля проверки:")
print(f" afplay '{output_file}'")
else:
print("\n✗ Ошибка генерации")
if __name__ == "__main__":
asyncio.run(main())