"""FFmpeg engine — command builders, probing, and execution for Transmutate.""" from __future__ import annotations import json import os import subprocess from dataclasses import dataclass, field from importlib import import_module from pathlib import Path from typing import Optional # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class StreamInfo: """Metadata for a single media stream.""" index: int codec_type: str codec_name: str language: Optional[str] = None def label(self) -> str: """Return a human-readable label for this stream.""" part = f"#{self.index} \u2014 {self.codec_name}" lang = self.language if self.language else None if lang: part += f" ({lang})" return f"{self.codec_type.capitalize()} {part}" @dataclass class ProbeResult: """Result of probing a media file with ffprobe.""" streams: list[StreamInfo] = field(default_factory=list) is_animated: bool = False mime_type: str = "" # --------------------------------------------------------------------------- # Helper — _parse_time_to_seconds (used in progress reporting) # --------------------------------------------------------------------------- def _parse_time_to_seconds(t: str) -> float: """Parse an ``HH:MM:SS.frac`` / ``MM:SS.frac`` / ``SS.frac`` / ``SS`` timecode to a float of seconds. Returns 0.0 on failure.""" if not t: return 0.0 try: parts = t.split(":") if len(parts) == 3: h, m, s = parts return int(h) * 3600 + int(m) * 60 + float(s) elif len(parts) == 2: m, s = parts return int(m) * 60 + float(s) else: return float(parts[0]) except (ValueError, IndexError): return 0.0 # --------------------------------------------------------------------------- # FFmpeg / ffmpeg-python availability # --------------------------------------------------------------------------- _ffmpeg = None try: _ffmpeg = import_module("ffmpeg") except ImportError: pass def has_ffmpeg() -> bool: """Return ``True`` when the ``ffmpeg`` binary is available in PATH.""" try: subprocess.run( ["ffmpeg", "-version"], capture_output=True, check=True, timeout=10, ) return True except Exception: return False # --------------------------------------------------------------------------- # MIME detection # --------------------------------------------------------------------------- def _get_magic_mime(filepath: str) -> str: """Attempt to get the MIME type via ``file --mime-type`` / ``file -b``. Falls back to extension-based detection. """ # Try Python ``filetype`` / ``mimetypes`` first try: import filetype # type: ignore[import-not-found] kind = filetype.guess(filepath) if kind is not None and kind.mime: return kind.mime except ImportError: pass # Try ``file --mime-type`` try: result = subprocess.run( ["file", "--mime-type", "-b", filepath], capture_output=True, text=True, timeout=10, ) if result.returncode == 0: return result.stdout.strip() except Exception: pass # Fallback: extension-based ext = Path(filepath).suffix.lstrip(".").lower() _EXT_MIME = { "png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", "gif": "image/gif", "webp": "image/webp", "bmp": "image/bmp", "tiff": "image/tiff", "tif": "image/tiff", "avif": "image/avif", "heic": "image/heic", "mp4": "video/mp4", "mkv": "video/x-matroska", "webm": "video/webm", "avi": "video/x-msvideo", "mov": "video/quicktime", "flv": "video/x-flv", "wmv": "video/x-ms-wmv", "m4v": "video/x-m4v", "mpg": "video/mpeg", "mpeg": "video/mpeg", "3gp": "video/3gpp", "ts": "video/mp2t", "ogv": "video/ogg", "m2ts": "video/MP2T", "mp3": "audio/mpeg", "flac": "audio/flac", "wav": "audio/wav", "ogg": "audio/ogg", "m4a": "audio/mp4", "aac": "audio/aac", "opus": "audio/opus", "wma": "audio/x-ms-wma", "aiff": "audio/aiff", "ape": "audio/ape", "alac": "audio/x-alac", } return _EXT_MIME.get(ext, "") def detect_mime(filepath: str) -> str: """Detect the MIME type of *filepath*.""" mime = _get_magic_mime(filepath) if mime: return mime # Also try ffprobe as fallback probe_result = _probe_json(filepath) if probe_result: fmt = probe_result.get("format", {}) for k in ("mime_type", "format_name"): v = fmt.get(k) if v: return v return "" def detect_media_type(mime: str) -> str: """Return ``image``, ``video``, or ``audio`` based on MIME, or empty.""" if not mime: return "" if mime.startswith("image/"): return "image" if mime.startswith("video/"): return "video" if mime.startswith("audio/"): return "audio" # Map some common format names name_map = { "image/png": "image", "image/jpeg": "image", "image/gif": "image", "image/webp": "image", "image/avif": "image", "image/bmp": "image", "image/tiff": "image", "video/mp4": "video", "video/x-matroska": "video", "video/webm": "video", "video/x-msvideo": "video", "video/quicktime": "video", "video/x-flv": "video", "video/x-ms-wmv": "video", "video/mpeg": "video", "video/3gpp": "video", "video/ogg": "video", "audio/mpeg": "audio", "audio/flac": "audio", "audio/wav": "audio", "audio/ogg": "audio", "audio/mp4": "audio", "audio/aac": "audio", "audio/opus": "audio", } return name_map.get(mime, "") # --------------------------------------------------------------------------- # Probing — ffprobe # --------------------------------------------------------------------------- def _probe_json(filepath: str, timeout: int = 10) -> Optional[dict]: """Run ffprobe via ffmpeg-python and return parsed JSON, or None on failure. Falls back to a raw subprocess call if ffmpeg-python fails. """ # Try ffmpeg-python first if _ffmpeg is not None: try: metadata: dict = _ffmpeg.probe(filepath) # type: ignore[attr-defined] if isinstance(metadata, dict) and metadata.get("streams"): return metadata except Exception: # noqa: BLE001 pass # Fallback: use ffprobe directly via subprocess try: result = subprocess.run( ["ffprobe", "-v", "quiet", "-show_format", "-show_streams", "-of", "json", filepath], capture_output=True, text=False, timeout=timeout, ) if result.returncode == 0: return json.loads(result.stdout.decode("utf-8")) except Exception: # noqa: BLE001 pass return None def _get_animated_image_mime(filepath: str) -> Optional[str]: """Attempt to detect if an image is animated using Pillow, and return its MIME.""" try: from PIL import Image # type: ignore[import-not-found] img = Image.open(filepath) # Animated images have multiple frames (GIF, WebP) if hasattr(img, "n_frames") and int(getattr(img, "n_frames", 0)) > 1: ext = Path(filepath).suffix.lower() return { ".gif": "image/gif", ".webp": "image/webp", }.get(ext) if hasattr(img, "is_animated") and img.is_animated: ext = Path(filepath).suffix.lower() return { ".gif": "image/gif", ".webp": "image/webp", }.get(ext) except Exception: pass return None def probe_file(filepath: str, mime: str = "") -> ProbeResult: """Probe *filepath* with ffprobe and return a :class:`ProbeResult`. Handles both media files and animated images (GIF/WebP). """ result = ProbeResult() result.mime_type = mime # Special case: animated images if mime in ("image/gif", "image/webp"): animated_mime = _get_animated_image_mime(filepath) if animated_mime: result.is_animated = True result.mime_type = animated_mime data = _probe_json(filepath) if not data: return result # Parse streams for s in data.get("streams", []): stype = s.get("codec_type", "unknown") cname = s.get("codec_name", "unknown") lang = s.get("tags", {}).get("language", None) if lang == "und": lang = None idx = s.get("index", 0) result.streams.append(StreamInfo( index=idx, codec_type=stype, codec_name=cname, language=lang, )) return result # --------------------------------------------------------------------------- # Preflight — make sure ffmpeg can read the source file # --------------------------------------------------------------------------- def preflight_check(filepath: str) -> tuple[bool, str]: """Verify that *filepath* is readable by ffmpeg. Returns ``(ok, error_message)``. """ if not os.path.isfile(filepath): return False, f"File does not exist: {filepath}" try: result = subprocess.run( ["ffprobe", "-v", "error", "-show_format", "-show_streams", filepath], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: return False, f"ffprobe failed: {result.stderr.strip()}" return True, "" except subprocess.TimeoutExpired: return False, "ffprobe timed out" except Exception as exc: return False, f"ffprobe error: {exc}" # --------------------------------------------------------------------------- # Execution — run_command # --------------------------------------------------------------------------- def run_command(cmd: list[str]) -> tuple[bool, str]: """Execute *cmd* (a list of strings) via subprocess. Returns ``(success, message)``. *message* is empty on success. """ try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=3600, # generous 1-hour timeout ) if proc.returncode == 0: return True, "" stderr = proc.stderr.strip() # Summarise the last few lines of ffmpeg output for the user lines = stderr.split("\n") summary = "\n".join(lines[-10:]) if len(lines) > 10 else stderr return False, summary except subprocess.TimeoutExpired: return False, "Conversion timed out (exceeded 1 hour)" except Exception as exc: return False, str(exc) # --------------------------------------------------------------------------- # Quality helpers # --------------------------------------------------------------------------- def _quality_to_crf(quality: int) -> int: """Map user quality (0–100) to a CRF value (0–51). Higher quality → lower CRF. """ # Invert so quality=100 → CRF=0, quality=0 → CRF=51 return max(0, min(51, 51 - int(quality * 51 / 100))) def _audio_quality_to_abr(codec: str, quality: int) -> tuple[str, str]: """Return (param_flag, value) for audio quality. Supports AAC, MP3, Opus, FLAC, Vorbis. """ quality = max(0, min(100, quality)) if codec == "aac": # AAC uses -b:a with a rate mapping # Map 0-100 → 32-320 kbps abr = max(32, int(32 + quality * 288 / 100)) return ("-b:a", f"{abr}k") elif codec == "libmp3lame": # MP3 uses -q:a (VBR) 0-9 → map quality q = max(0, min(9, int(9 * (100 - quality) / 100))) return ("-q:a", str(q)) elif codec == "libvorbis": # Vorbis uses -q:a 0-10 → map quality q = max(0, min(10, round(quality * 10 / 100))) return ("-q:a", str(q)) elif codec == "libopus": # Opus uses -b:a abr = max(32, int(32 + quality * 256 / 100)) return ("-b:a", f"{abr}k") elif codec == "flac": # FLAC is lossless — quality is irrelevant, just use default return ("-compression_level", "5") elif codec == "pcm_s16le": # WAV is lossless return ("", "") else: # Generic: use bitrate abr = max(32, int(32 + quality * 288 / 100)) return ("-b:a", f"{abr}k") # --------------------------------------------------------------------------- # Image → Image command builder # --------------------------------------------------------------------------- def _build_image_command( src: str, dst: str, fmt: str, quality: int, is_animated: bool, loop: bool, mime: str = "", ) -> list[str]: """Build an ffmpeg command for image → image conversion. Handles both single-frame and animated images. """ ext = fmt.lower() cmd = ["ffmpeg", "-y", "-i", src] if is_animated: # Animated: preserve all frames cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"]) if ext == "png": # For animated PNG, extract frames then use ImageMagick or ffmpeg # We'll use ffmpeg with a numbered sequence pass elif ext == "webp": cmd.extend(["-c:v", "libwebp_anim", "-loop", "0"]) elif ext == "gif": # Palettegen-based GIF return _build_video_gif(src, dst, quality) elif ext in ("mp4", "mkv", "webm", "avi", "mov"): cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"]) if ext == "webm": cmd.extend(["-c:a", "libvorbis"]) if loop and ext in ("webp", "gif"): cmd.extend(["-loop", "0"]) else: # Single frame — just encode it cmd.extend(["-frames:v", "1"]) if ext == "png": cmd.extend(["-c:v", "png"]) elif ext in ("jpg", "jpeg"): q = _quality_to_crf(quality) cmd.extend(["-c:v", "libjpeg-turbo", "-q:v", str(q)]) elif ext == "webp": lossless = quality >= 100 if lossless: cmd.extend(["-c:v", "libwebp", "-lossless", "1"]) else: q = _quality_to_crf(quality) cmd.extend(["-c:v", "libwebp", "-lossless", "0", "-q:v", str(q)]) elif ext == "avif": q = _quality_to_crf(quality) cmd.extend(["-c:v", "libaom-av1", "-cpu-used", "4", "-q:v", str(q)]) elif ext == "bmp": cmd.extend(["-c:v", "bmp"]) cmd.extend(["-pix_fmt", "yuv420p", dst]) return cmd # --------------------------------------------------------------------------- # Video → Video command builder # --------------------------------------------------------------------------- def _build_video_command( src: str, dst: str, fmt: str, quality: int, audio_quality: int, audio_streams: list[int], sub_streams: list[int], mime: str = "", ) -> list[str]: """Build an ffmpeg command for video → video conversion. Handles audio stream selection and subtitle streams. """ cmd = ["ffmpeg", "-y"] # Input cmd.extend(["-i", src]) # Select audio streams if specified # audio_streams contains ffprobe stream indices (global), e.g. [1] # Use the global-index form ``0:`` — not ``0:a:N`` (invalid syntax). if audio_streams: for idx in audio_streams: cmd.extend(["-map", f"0:{idx}"]) else: # No specific audio selection — copy first audio stream if present cmd.extend(["-map", "0:a:0"]) # Subtitles # sub_streams contains ffprobe stream indices (global), e.g. [2] if sub_streams: for idx in sub_streams: cmd.extend(["-map", f"0:{idx}"]) else: # Try to copy subtitle streams cmd.extend(["-map", "0:s?"]) # Video encoding cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"]) # CRF mode crf = _quality_to_crf(quality) cmd.extend(["-crf", str(crf)]) # Audio encoding ext = fmt.lower() audio_codec_map = { "mp4": "aac", "mkv": "aac", "webm": "libvorbis", "avi": "aac", "mov": "aac", } ac = audio_codec_map.get(ext, "aac") af, av = _audio_quality_to_abr(ac, audio_quality) if af: cmd.extend([af, av]) # Container-specific options if ext == "webm": cmd.extend(["-c:a", "libvorbis"]) elif ext == "mp4": cmd.extend(["-movflags", "+faststart"]) cmd.extend(["-c:s", "mov_text"]) # Output cmd.append(dst) return cmd # --------------------------------------------------------------------------- # Video → Audio command builder # --------------------------------------------------------------------------- def _build_audio_command( src: str, dst: str, fmt: str, quality: int, audio_streams: list[int], ) -> list[str]: """Build an ffmpeg command for audio extraction / format conversion. Can be called from video→audio or audio→audio conversions. """ cmd = ["ffmpeg", "-y", "-i", src] # Select audio stream(s) # audio_streams contains ffprobe stream indices (global), e.g. [1] # We must use the global-index form ``0:`` — not ``0:a:N`` # which is invalid ffmpeg syntax. if audio_streams: for idx in audio_streams: cmd.extend(["-map", f"0:{idx}"]) else: cmd.extend(["-map", "0:a:0"]) # Discard video — keep audio only cmd.extend(["-vn"]) # Determine codec from format fmt_lower = fmt.lower() _codec_defaults: dict[str, tuple[str, str, str]] = { "mp3": ("libmp3lame", "", ""), "flac": ("flac", "-compression_level", "5"), "wav": ("pcm_s16le", "", ""), "ogg": ("libvorbis", "", ""), "m4a": ("aac", "", ""), "aac": ("aac", "", ""), } ac, af1, af2 = _codec_defaults.get(fmt_lower, ("aac", "-b:a", "128k")) cmd.extend(["-c:a", ac]) if af1: cmd.extend([af1]) if af2: cmd.extend([af2]) # Quality override qf, qv = _audio_quality_to_abr(ac, quality) if qf: # Replace the default codec params with quality-based ones cmd = cmd[:-2] if len(cmd) >= 2 and cmd[-2] == af1 else cmd cmd.extend([qf, qv]) # Output cmd.append(dst) return cmd # --------------------------------------------------------------------------- # Video → GIF (palettegen-based) # --------------------------------------------------------------------------- def _build_video_gif(src: str, dst: str, quality: int) -> list[str]: """Build a palettegen-based ffmpeg command for video → GIF. Uses a two-pass approach: first generate a palette, then use it. """ quality = max(0, min(100, quality)) # Map quality to dithering mode and palette quality # Higher quality = better palette + more dithering if quality > 80: dither = "bayer" bayer_scale = "5" elif quality > 60: dither = "sierra2_4a" bayer_scale = "4" elif quality > 40: dither = "burkes" bayer_scale = "3" else: dither = "none" bayer_scale = "1" # Palette generation palette_temp = dst + ".palette" # Two-pass command: # Pass 1: generate palette # Pass 2: use palette for GIF cmd = [ "ffmpeg", "-y", "-i", src, "-vf", "fps=24,scale=640:-1:flags=lanczos,palettegen=max_colors=256:stats_mode=diff", "-y", palette_temp, ] # Run pass 1 separately, then append pass 2 subprocess.run(cmd, capture_output=True, timeout=300) # Pass 2: apply palette cmd = [ "ffmpeg", "-y", "-i", src, "-i", palette_temp, "-lavfi", f"fps=24,scale=640:-1:flags=lanczos [x]; [x][1:v] paletteuse=dither={dither}:bayer_scale={bayer_scale}:diff_mode=rectangle", dst, ] # Cleanup palette temp file try: os.remove(palette_temp) except OSError: pass return cmd # --------------------------------------------------------------------------- # Video → Animated WebP (libwebp_anim) # --------------------------------------------------------------------------- def _build_video_webp( src: str, dst: str, quality: int, loop: bool, ) -> list[str]: """Build an ffmpeg command for video → animated WebP using libwebp_anim.""" quality = max(0, min(100, quality)) # WebP uses lossless/lossy quality # Map quality: 0-100 → lossless or quality factor 0-100 lossiness = max(0, int((100 - quality) * 10)) cmd = [ "ffmpeg", "-y", "-i", src, "-c:v", "libwebp_anim", "-pix_fmt", "yuv420p", "-lossless", "0", "-lossiness", str(lossiness), ] if loop: cmd.extend(["-loop", "0"]) # Frame rate cmd.extend(["-r", "24"]) # Output cmd.append(dst) return cmd