Files
Transmutate/transmutate_app/engine/ffmpeg_engine.py
2026-06-01 02:06:49 +03:00

729 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""FFmpeg engine — command builders, probing, and execution for Transmutate."""
from __future__ import annotations
import json
import os
import subprocess
from dataclasses import dataclass, field
from importlib import import_module
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class StreamInfo:
"""Metadata for a single media stream."""
index: int
codec_type: str
codec_name: str
language: Optional[str] = None
def label(self) -> str:
"""Return a human-readable label for this stream."""
part = f"#{self.index} \u2014 {self.codec_name}"
lang = self.language if self.language else None
if lang:
part += f" ({lang})"
return f"{self.codec_type.capitalize()} {part}"
@dataclass
class ProbeResult:
"""Result of probing a media file with ffprobe."""
streams: list[StreamInfo] = field(default_factory=list)
is_animated: bool = False
mime_type: str = ""
# ---------------------------------------------------------------------------
# Helper — _parse_time_to_seconds (used in progress reporting)
# ---------------------------------------------------------------------------
def _parse_time_to_seconds(t: str) -> float:
"""Parse an ``HH:MM:SS.frac`` / ``MM:SS.frac`` / ``SS.frac`` / ``SS``
timecode to a float of seconds. Returns 0.0 on failure."""
if not t:
return 0.0
try:
parts = t.split(":")
if len(parts) == 3:
h, m, s = parts
return int(h) * 3600 + int(m) * 60 + float(s)
elif len(parts) == 2:
m, s = parts
return int(m) * 60 + float(s)
else:
return float(parts[0])
except (ValueError, IndexError):
return 0.0
# ---------------------------------------------------------------------------
# FFmpeg / ffmpeg-python availability
# ---------------------------------------------------------------------------
_ffmpeg = None
try:
_ffmpeg = import_module("ffmpeg")
except ImportError:
pass
def has_ffmpeg() -> bool:
"""Return ``True`` when the ``ffmpeg`` binary is available in PATH."""
try:
subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
check=True,
timeout=10,
)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# MIME detection
# ---------------------------------------------------------------------------
def _get_magic_mime(filepath: str) -> str:
"""Attempt to get the MIME type via ``file --mime-type`` / ``file -b``.
Falls back to extension-based detection.
"""
# Try Python ``filetype`` / ``mimetypes`` first
try:
import filetype # type: ignore[import-not-found]
kind = filetype.guess(filepath)
if kind is not None and kind.mime:
return kind.mime
except ImportError:
pass
# Try ``file --mime-type``
try:
result = subprocess.run(
["file", "--mime-type", "-b", filepath],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
# Fallback: extension-based
ext = Path(filepath).suffix.lstrip(".").lower()
_EXT_MIME = {
"png": "image/png",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"gif": "image/gif",
"webp": "image/webp",
"bmp": "image/bmp",
"tiff": "image/tiff",
"tif": "image/tiff",
"avif": "image/avif",
"heic": "image/heic",
"mp4": "video/mp4",
"mkv": "video/x-matroska",
"webm": "video/webm",
"avi": "video/x-msvideo",
"mov": "video/quicktime",
"flv": "video/x-flv",
"wmv": "video/x-ms-wmv",
"m4v": "video/x-m4v",
"mpg": "video/mpeg",
"mpeg": "video/mpeg",
"3gp": "video/3gpp",
"ts": "video/mp2t",
"ogv": "video/ogg",
"m2ts": "video/MP2T",
"mp3": "audio/mpeg",
"flac": "audio/flac",
"wav": "audio/wav",
"ogg": "audio/ogg",
"m4a": "audio/mp4",
"aac": "audio/aac",
"opus": "audio/opus",
"wma": "audio/x-ms-wma",
"aiff": "audio/aiff",
"ape": "audio/ape",
"alac": "audio/x-alac",
}
return _EXT_MIME.get(ext, "")
def detect_mime(filepath: str) -> str:
"""Detect the MIME type of *filepath*."""
mime = _get_magic_mime(filepath)
if mime:
return mime
# Also try ffprobe as fallback
probe_result = _probe_json(filepath)
if probe_result:
fmt = probe_result.get("format", {})
for k in ("mime_type", "format_name"):
v = fmt.get(k)
if v:
return v
return ""
def detect_media_type(mime: str) -> str:
"""Return ``image``, ``video``, or ``audio`` based on MIME, or empty."""
if not mime:
return ""
if mime.startswith("image/"):
return "image"
if mime.startswith("video/"):
return "video"
if mime.startswith("audio/"):
return "audio"
# Map some common format names
name_map = {
"image/png": "image",
"image/jpeg": "image",
"image/gif": "image",
"image/webp": "image",
"image/avif": "image",
"image/bmp": "image",
"image/tiff": "image",
"video/mp4": "video",
"video/x-matroska": "video",
"video/webm": "video",
"video/x-msvideo": "video",
"video/quicktime": "video",
"video/x-flv": "video",
"video/x-ms-wmv": "video",
"video/mpeg": "video",
"video/3gpp": "video",
"video/ogg": "video",
"audio/mpeg": "audio",
"audio/flac": "audio",
"audio/wav": "audio",
"audio/ogg": "audio",
"audio/mp4": "audio",
"audio/aac": "audio",
"audio/opus": "audio",
}
return name_map.get(mime, "")
# ---------------------------------------------------------------------------
# Probing — ffprobe
# ---------------------------------------------------------------------------
def _probe_json(filepath: str, timeout: int = 10) -> Optional[dict]:
"""Run ffprobe via ffmpeg-python and return parsed JSON, or None on failure.
Falls back to a raw subprocess call if ffmpeg-python fails.
"""
# Try ffmpeg-python first
if _ffmpeg is not None:
try:
metadata: dict = _ffmpeg.probe(filepath) # type: ignore[attr-defined]
if isinstance(metadata, dict) and metadata.get("streams"):
return metadata
except Exception: # noqa: BLE001
pass
# Fallback: use ffprobe directly via subprocess
try:
result = subprocess.run(
["ffprobe", "-v", "quiet", "-show_format", "-show_streams",
"-of", "json", filepath],
capture_output=True, text=False, timeout=timeout,
)
if result.returncode == 0:
return json.loads(result.stdout.decode("utf-8"))
except Exception: # noqa: BLE001
pass
return None
def _get_animated_image_mime(filepath: str) -> Optional[str]:
"""Attempt to detect if an image is animated using Pillow, and return its MIME."""
try:
from PIL import Image # type: ignore[import-not-found]
img = Image.open(filepath)
# Animated images have multiple frames (GIF, WebP)
if hasattr(img, "n_frames") and int(getattr(img, "n_frames", 0)) > 1:
ext = Path(filepath).suffix.lower()
return {
".gif": "image/gif",
".webp": "image/webp",
}.get(ext)
if hasattr(img, "is_animated") and img.is_animated:
ext = Path(filepath).suffix.lower()
return {
".gif": "image/gif",
".webp": "image/webp",
}.get(ext)
except Exception:
pass
return None
def probe_file(filepath: str, mime: str = "") -> ProbeResult:
"""Probe *filepath* with ffprobe and return a :class:`ProbeResult`.
Handles both media files and animated images (GIF/WebP).
"""
result = ProbeResult()
result.mime_type = mime
# Special case: animated images
if mime in ("image/gif", "image/webp"):
animated_mime = _get_animated_image_mime(filepath)
if animated_mime:
result.is_animated = True
result.mime_type = animated_mime
data = _probe_json(filepath)
if not data:
return result
# Parse streams
for s in data.get("streams", []):
stype = s.get("codec_type", "unknown")
cname = s.get("codec_name", "unknown")
lang = s.get("tags", {}).get("language", None)
if lang == "und":
lang = None
idx = s.get("index", 0)
result.streams.append(StreamInfo(
index=idx,
codec_type=stype,
codec_name=cname,
language=lang,
))
return result
# ---------------------------------------------------------------------------
# Preflight — make sure ffmpeg can read the source file
# ---------------------------------------------------------------------------
def preflight_check(filepath: str) -> tuple[bool, str]:
"""Verify that *filepath* is readable by ffmpeg.
Returns ``(ok, error_message)``.
"""
if not os.path.isfile(filepath):
return False, f"File does not exist: {filepath}"
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-show_format", "-show_streams",
filepath],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return False, f"ffprobe failed: {result.stderr.strip()}"
return True, ""
except subprocess.TimeoutExpired:
return False, "ffprobe timed out"
except Exception as exc:
return False, f"ffprobe error: {exc}"
# ---------------------------------------------------------------------------
# Execution — run_command
# ---------------------------------------------------------------------------
def run_command(cmd: list[str]) -> tuple[bool, str]:
"""Execute *cmd* (a list of strings) via subprocess.
Returns ``(success, message)``. *message* is empty on success.
"""
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=3600, # generous 1-hour timeout
)
if proc.returncode == 0:
return True, ""
stderr = proc.stderr.strip()
# Summarise the last few lines of ffmpeg output for the user
lines = stderr.split("\n")
summary = "\n".join(lines[-10:]) if len(lines) > 10 else stderr
return False, summary
except subprocess.TimeoutExpired:
return False, "Conversion timed out (exceeded 1 hour)"
except Exception as exc:
return False, str(exc)
# ---------------------------------------------------------------------------
# Quality helpers
# ---------------------------------------------------------------------------
def _quality_to_crf(quality: int) -> int:
"""Map user quality (0100) to a CRF value (051).
Higher quality → lower CRF.
"""
# Invert so quality=100 → CRF=0, quality=0 → CRF=51
return max(0, min(51, 51 - int(quality * 51 / 100)))
def _audio_quality_to_abr(codec: str, quality: int) -> tuple[str, str]:
"""Return (param_flag, value) for audio quality.
Supports AAC, MP3, Opus, FLAC, Vorbis.
"""
quality = max(0, min(100, quality))
if codec == "aac":
# AAC uses -b:a with a rate mapping
# Map 0-100 → 32-320 kbps
abr = max(32, int(32 + quality * 288 / 100))
return ("-b:a", f"{abr}k")
elif codec == "libmp3lame":
# MP3 uses -q:a (VBR) 0-9 → map quality
q = max(0, min(9, int(9 * (100 - quality) / 100)))
return ("-q:a", str(q))
elif codec == "libvorbis":
# Vorbis uses -q:a 0-10 → map quality
q = max(0, min(10, round(quality * 10 / 100)))
return ("-q:a", str(q))
elif codec == "libopus":
# Opus uses -b:a
abr = max(32, int(32 + quality * 256 / 100))
return ("-b:a", f"{abr}k")
elif codec == "flac":
# FLAC is lossless — quality is irrelevant, just use default
return ("-compression_level", "5")
elif codec == "pcm_s16le":
# WAV is lossless
return ("", "")
else:
# Generic: use bitrate
abr = max(32, int(32 + quality * 288 / 100))
return ("-b:a", f"{abr}k")
# ---------------------------------------------------------------------------
# Image → Image command builder
# ---------------------------------------------------------------------------
def _build_image_command(
src: str,
dst: str,
fmt: str,
quality: int,
is_animated: bool,
loop: bool,
mime: str = "",
) -> list[str]:
"""Build an ffmpeg command for image → image conversion.
Handles both single-frame and animated images.
"""
ext = fmt.lower()
cmd = ["ffmpeg", "-y", "-i", src]
if is_animated:
# Animated: preserve all frames
cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"])
if ext == "png":
# For animated PNG, extract frames then use ImageMagick or ffmpeg
# We'll use ffmpeg with a numbered sequence
pass
elif ext == "webp":
cmd.extend(["-c:v", "libwebp_anim", "-loop", "0"])
elif ext == "gif":
# Palettegen-based GIF
return _build_video_gif(src, dst, quality)
elif ext in ("mp4", "mkv", "webm", "avi", "mov"):
cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"])
if ext == "webm":
cmd.extend(["-c:a", "libvorbis"])
if loop and ext in ("webp", "gif"):
cmd.extend(["-loop", "0"])
else:
# Single frame — just encode it
cmd.extend(["-frames:v", "1"])
if ext == "png":
cmd.extend(["-c:v", "png"])
elif ext in ("jpg", "jpeg"):
q = _quality_to_crf(quality)
cmd.extend(["-c:v", "libjpeg-turbo", "-q:v", str(q)])
elif ext == "webp":
lossless = quality >= 100
if lossless:
cmd.extend(["-c:v", "libwebp", "-lossless", "1"])
else:
q = _quality_to_crf(quality)
cmd.extend(["-c:v", "libwebp", "-lossless", "0", "-q:v", str(q)])
elif ext == "avif":
q = _quality_to_crf(quality)
cmd.extend(["-c:v", "libaom-av1", "-cpu-used", "4", "-q:v", str(q)])
elif ext == "bmp":
cmd.extend(["-c:v", "bmp"])
cmd.extend(["-pix_fmt", "yuv420p", dst])
return cmd
# ---------------------------------------------------------------------------
# Video → Video command builder
# ---------------------------------------------------------------------------
def _build_video_command(
src: str,
dst: str,
fmt: str,
quality: int,
audio_quality: int,
audio_streams: list[int],
sub_streams: list[int],
mime: str = "",
) -> list[str]:
"""Build an ffmpeg command for video → video conversion.
Handles audio stream selection and subtitle streams.
"""
cmd = ["ffmpeg", "-y"]
# Input
cmd.extend(["-i", src])
# Select audio streams if specified
# audio_streams contains ffprobe stream indices (global), e.g. [1]
# Use the global-index form ``0:<index>`` — not ``0:a:N`` (invalid syntax).
if audio_streams:
for idx in audio_streams:
cmd.extend(["-map", f"0:{idx}"])
else:
# No specific audio selection — copy first audio stream if present
cmd.extend(["-map", "0:a:0"])
# Subtitles
# sub_streams contains ffprobe stream indices (global), e.g. [2]
if sub_streams:
for idx in sub_streams:
cmd.extend(["-map", f"0:{idx}"])
else:
# Try to copy subtitle streams
cmd.extend(["-map", "0:s?"])
# Video encoding
cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"])
# CRF mode
crf = _quality_to_crf(quality)
cmd.extend(["-crf", str(crf)])
# Audio encoding
ext = fmt.lower()
audio_codec_map = {
"mp4": "aac",
"mkv": "aac",
"webm": "libvorbis",
"avi": "aac",
"mov": "aac",
}
ac = audio_codec_map.get(ext, "aac")
af, av = _audio_quality_to_abr(ac, audio_quality)
if af:
cmd.extend([af, av])
# Container-specific options
if ext == "webm":
cmd.extend(["-c:a", "libvorbis"])
elif ext == "mp4":
cmd.extend(["-movflags", "+faststart"])
cmd.extend(["-c:s", "mov_text"])
# Output
cmd.append(dst)
return cmd
# ---------------------------------------------------------------------------
# Video → Audio command builder
# ---------------------------------------------------------------------------
def _build_audio_command(
src: str,
dst: str,
fmt: str,
quality: int,
audio_streams: list[int],
) -> list[str]:
"""Build an ffmpeg command for audio extraction / format conversion.
Can be called from video→audio or audio→audio conversions.
"""
cmd = ["ffmpeg", "-y", "-i", src]
# Select audio stream(s)
# audio_streams contains ffprobe stream indices (global), e.g. [1]
# We must use the global-index form ``0:<index>`` — not ``0:a:N``
# which is invalid ffmpeg syntax.
if audio_streams:
for idx in audio_streams:
cmd.extend(["-map", f"0:{idx}"])
else:
cmd.extend(["-map", "0:a:0"])
# Discard video — keep audio only
cmd.extend(["-vn"])
# Determine codec from format
fmt_lower = fmt.lower()
_codec_defaults: dict[str, tuple[str, str, str]] = {
"mp3": ("libmp3lame", "", ""),
"flac": ("flac", "-compression_level", "5"),
"wav": ("pcm_s16le", "", ""),
"ogg": ("libvorbis", "", ""),
"m4a": ("aac", "", ""),
"aac": ("aac", "", ""),
}
ac, af1, af2 = _codec_defaults.get(fmt_lower, ("aac", "-b:a", "128k"))
cmd.extend(["-c:a", ac])
if af1:
cmd.extend([af1])
if af2:
cmd.extend([af2])
# Quality override
qf, qv = _audio_quality_to_abr(ac, quality)
if qf:
# Replace the default codec params with quality-based ones
cmd = cmd[:-2] if len(cmd) >= 2 and cmd[-2] == af1 else cmd
cmd.extend([qf, qv])
# Output
cmd.append(dst)
return cmd
# ---------------------------------------------------------------------------
# Video → GIF (palettegen-based)
# ---------------------------------------------------------------------------
def _build_video_gif(src: str, dst: str, quality: int) -> list[str]:
"""Build a palettegen-based ffmpeg command for video → GIF.
Uses a two-pass approach: first generate a palette, then use it.
"""
quality = max(0, min(100, quality))
# Map quality to dithering mode and palette quality
# Higher quality = better palette + more dithering
if quality > 80:
dither = "bayer"
bayer_scale = "5"
elif quality > 60:
dither = "sierra2_4a"
bayer_scale = "4"
elif quality > 40:
dither = "burkes"
bayer_scale = "3"
else:
dither = "none"
bayer_scale = "1"
# Palette generation
palette_temp = dst + ".palette"
# Two-pass command:
# Pass 1: generate palette
# Pass 2: use palette for GIF
cmd = [
"ffmpeg", "-y",
"-i", src,
"-vf",
"fps=24,scale=640:-1:flags=lanczos,palettegen=max_colors=256:stats_mode=diff",
"-y", palette_temp,
]
# Run pass 1 separately, then append pass 2
subprocess.run(cmd, capture_output=True, timeout=300)
# Pass 2: apply palette
cmd = [
"ffmpeg", "-y",
"-i", src,
"-i", palette_temp,
"-lavfi",
f"fps=24,scale=640:-1:flags=lanczos [x]; [x][1:v] paletteuse=dither={dither}:bayer_scale={bayer_scale}:diff_mode=rectangle",
dst,
]
# Cleanup palette temp file
try:
os.remove(palette_temp)
except OSError:
pass
return cmd
# ---------------------------------------------------------------------------
# Video → Animated WebP (libwebp_anim)
# ---------------------------------------------------------------------------
def _build_video_webp(
src: str,
dst: str,
quality: int,
loop: bool,
) -> list[str]:
"""Build an ffmpeg command for video → animated WebP using libwebp_anim."""
quality = max(0, min(100, quality))
# WebP uses lossless/lossy quality
# Map quality: 0-100 → lossless or quality factor 0-100
lossiness = max(0, int((100 - quality) * 10))
cmd = [
"ffmpeg", "-y",
"-i", src,
"-c:v", "libwebp_anim",
"-pix_fmt", "yuv420p",
"-lossless", "0",
"-lossiness", str(lossiness),
]
if loop:
cmd.extend(["-loop", "0"])
# Frame rate
cmd.extend(["-r", "24"])
# Output
cmd.append(dst)
return cmd