♻ | New project structure
This commit is contained in:
728
transmutate_app/engine/ffmpeg_engine.py
Normal file
728
transmutate_app/engine/ffmpeg_engine.py
Normal file
@@ -0,0 +1,728 @@
|
||||
"""FFmpeg engine — command builders, probing, and execution for Transmutate."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass, field
|
||||
from importlib import import_module
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data classes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class StreamInfo:
|
||||
"""Metadata for a single media stream."""
|
||||
|
||||
index: int
|
||||
codec_type: str
|
||||
codec_name: str
|
||||
language: Optional[str] = None
|
||||
|
||||
def label(self) -> str:
|
||||
"""Return a human-readable label for this stream."""
|
||||
part = f"#{self.index} \u2014 {self.codec_name}"
|
||||
lang = self.language if self.language else None
|
||||
if lang:
|
||||
part += f" ({lang})"
|
||||
return f"{self.codec_type.capitalize()} {part}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProbeResult:
|
||||
"""Result of probing a media file with ffprobe."""
|
||||
|
||||
streams: list[StreamInfo] = field(default_factory=list)
|
||||
is_animated: bool = False
|
||||
mime_type: str = ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper — _parse_time_to_seconds (used in progress reporting)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _parse_time_to_seconds(t: str) -> float:
|
||||
"""Parse an ``HH:MM:SS.frac`` / ``MM:SS.frac`` / ``SS.frac`` / ``SS``
|
||||
timecode to a float of seconds. Returns 0.0 on failure."""
|
||||
if not t:
|
||||
return 0.0
|
||||
try:
|
||||
parts = t.split(":")
|
||||
if len(parts) == 3:
|
||||
h, m, s = parts
|
||||
return int(h) * 3600 + int(m) * 60 + float(s)
|
||||
elif len(parts) == 2:
|
||||
m, s = parts
|
||||
return int(m) * 60 + float(s)
|
||||
else:
|
||||
return float(parts[0])
|
||||
except (ValueError, IndexError):
|
||||
return 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FFmpeg / ffmpeg-python availability
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ffmpeg = None
|
||||
|
||||
try:
|
||||
_ffmpeg = import_module("ffmpeg")
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def has_ffmpeg() -> bool:
|
||||
"""Return ``True`` when the ``ffmpeg`` binary is available in PATH."""
|
||||
try:
|
||||
subprocess.run(
|
||||
["ffmpeg", "-version"],
|
||||
capture_output=True,
|
||||
check=True,
|
||||
timeout=10,
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MIME detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _get_magic_mime(filepath: str) -> str:
|
||||
"""Attempt to get the MIME type via ``file --mime-type`` / ``file -b``.
|
||||
|
||||
Falls back to extension-based detection.
|
||||
"""
|
||||
# Try Python ``filetype`` / ``mimetypes`` first
|
||||
try:
|
||||
import filetype # type: ignore[import-not-found]
|
||||
kind = filetype.guess(filepath)
|
||||
if kind is not None and kind.mime:
|
||||
return kind.mime
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Try ``file --mime-type``
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["file", "--mime-type", "-b", filepath],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback: extension-based
|
||||
ext = Path(filepath).suffix.lstrip(".").lower()
|
||||
_EXT_MIME = {
|
||||
"png": "image/png",
|
||||
"jpg": "image/jpeg",
|
||||
"jpeg": "image/jpeg",
|
||||
"gif": "image/gif",
|
||||
"webp": "image/webp",
|
||||
"bmp": "image/bmp",
|
||||
"tiff": "image/tiff",
|
||||
"tif": "image/tiff",
|
||||
"avif": "image/avif",
|
||||
"heic": "image/heic",
|
||||
"mp4": "video/mp4",
|
||||
"mkv": "video/x-matroska",
|
||||
"webm": "video/webm",
|
||||
"avi": "video/x-msvideo",
|
||||
"mov": "video/quicktime",
|
||||
"flv": "video/x-flv",
|
||||
"wmv": "video/x-ms-wmv",
|
||||
"m4v": "video/x-m4v",
|
||||
"mpg": "video/mpeg",
|
||||
"mpeg": "video/mpeg",
|
||||
"3gp": "video/3gpp",
|
||||
"ts": "video/mp2t",
|
||||
"ogv": "video/ogg",
|
||||
"m2ts": "video/MP2T",
|
||||
"mp3": "audio/mpeg",
|
||||
"flac": "audio/flac",
|
||||
"wav": "audio/wav",
|
||||
"ogg": "audio/ogg",
|
||||
"m4a": "audio/mp4",
|
||||
"aac": "audio/aac",
|
||||
"opus": "audio/opus",
|
||||
"wma": "audio/x-ms-wma",
|
||||
"aiff": "audio/aiff",
|
||||
"ape": "audio/ape",
|
||||
"alac": "audio/x-alac",
|
||||
}
|
||||
return _EXT_MIME.get(ext, "")
|
||||
|
||||
|
||||
def detect_mime(filepath: str) -> str:
|
||||
"""Detect the MIME type of *filepath*."""
|
||||
mime = _get_magic_mime(filepath)
|
||||
if mime:
|
||||
return mime
|
||||
# Also try ffprobe as fallback
|
||||
probe_result = _probe_json(filepath)
|
||||
if probe_result:
|
||||
fmt = probe_result.get("format", {})
|
||||
for k in ("mime_type", "format_name"):
|
||||
v = fmt.get(k)
|
||||
if v:
|
||||
return v
|
||||
return ""
|
||||
|
||||
|
||||
def detect_media_type(mime: str) -> str:
|
||||
"""Return ``image``, ``video``, or ``audio`` based on MIME, or empty."""
|
||||
if not mime:
|
||||
return ""
|
||||
if mime.startswith("image/"):
|
||||
return "image"
|
||||
if mime.startswith("video/"):
|
||||
return "video"
|
||||
if mime.startswith("audio/"):
|
||||
return "audio"
|
||||
# Map some common format names
|
||||
name_map = {
|
||||
"image/png": "image",
|
||||
"image/jpeg": "image",
|
||||
"image/gif": "image",
|
||||
"image/webp": "image",
|
||||
"image/avif": "image",
|
||||
"image/bmp": "image",
|
||||
"image/tiff": "image",
|
||||
"video/mp4": "video",
|
||||
"video/x-matroska": "video",
|
||||
"video/webm": "video",
|
||||
"video/x-msvideo": "video",
|
||||
"video/quicktime": "video",
|
||||
"video/x-flv": "video",
|
||||
"video/x-ms-wmv": "video",
|
||||
"video/mpeg": "video",
|
||||
"video/3gpp": "video",
|
||||
"video/ogg": "video",
|
||||
"audio/mpeg": "audio",
|
||||
"audio/flac": "audio",
|
||||
"audio/wav": "audio",
|
||||
"audio/ogg": "audio",
|
||||
"audio/mp4": "audio",
|
||||
"audio/aac": "audio",
|
||||
"audio/opus": "audio",
|
||||
}
|
||||
return name_map.get(mime, "")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Probing — ffprobe
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _probe_json(filepath: str, timeout: int = 10) -> Optional[dict]:
|
||||
"""Run ffprobe via ffmpeg-python and return parsed JSON, or None on failure.
|
||||
|
||||
Falls back to a raw subprocess call if ffmpeg-python fails.
|
||||
"""
|
||||
# Try ffmpeg-python first
|
||||
if _ffmpeg is not None:
|
||||
try:
|
||||
metadata: dict = _ffmpeg.probe(filepath) # type: ignore[attr-defined]
|
||||
if isinstance(metadata, dict) and metadata.get("streams"):
|
||||
return metadata
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
|
||||
# Fallback: use ffprobe directly via subprocess
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ffprobe", "-v", "quiet", "-show_format", "-show_streams",
|
||||
"-of", "json", filepath],
|
||||
capture_output=True, text=False, timeout=timeout,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return json.loads(result.stdout.decode("utf-8"))
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _get_animated_image_mime(filepath: str) -> Optional[str]:
|
||||
"""Attempt to detect if an image is animated using Pillow, and return its MIME."""
|
||||
try:
|
||||
from PIL import Image # type: ignore[import-not-found]
|
||||
img = Image.open(filepath)
|
||||
# Animated images have multiple frames (GIF, WebP)
|
||||
if hasattr(img, "n_frames") and int(getattr(img, "n_frames", 0)) > 1:
|
||||
ext = Path(filepath).suffix.lower()
|
||||
return {
|
||||
".gif": "image/gif",
|
||||
".webp": "image/webp",
|
||||
}.get(ext)
|
||||
if hasattr(img, "is_animated") and img.is_animated:
|
||||
ext = Path(filepath).suffix.lower()
|
||||
return {
|
||||
".gif": "image/gif",
|
||||
".webp": "image/webp",
|
||||
}.get(ext)
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def probe_file(filepath: str, mime: str = "") -> ProbeResult:
|
||||
"""Probe *filepath* with ffprobe and return a :class:`ProbeResult`.
|
||||
|
||||
Handles both media files and animated images (GIF/WebP).
|
||||
"""
|
||||
result = ProbeResult()
|
||||
result.mime_type = mime
|
||||
|
||||
# Special case: animated images
|
||||
if mime in ("image/gif", "image/webp"):
|
||||
animated_mime = _get_animated_image_mime(filepath)
|
||||
if animated_mime:
|
||||
result.is_animated = True
|
||||
result.mime_type = animated_mime
|
||||
|
||||
data = _probe_json(filepath)
|
||||
if not data:
|
||||
return result
|
||||
|
||||
# Parse streams
|
||||
for s in data.get("streams", []):
|
||||
stype = s.get("codec_type", "unknown")
|
||||
cname = s.get("codec_name", "unknown")
|
||||
lang = s.get("tags", {}).get("language", None)
|
||||
if lang == "und":
|
||||
lang = None
|
||||
idx = s.get("index", 0)
|
||||
result.streams.append(StreamInfo(
|
||||
index=idx,
|
||||
codec_type=stype,
|
||||
codec_name=cname,
|
||||
language=lang,
|
||||
))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Preflight — make sure ffmpeg can read the source file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def preflight_check(filepath: str) -> tuple[bool, str]:
|
||||
"""Verify that *filepath* is readable by ffmpeg.
|
||||
|
||||
Returns ``(ok, error_message)``.
|
||||
"""
|
||||
if not os.path.isfile(filepath):
|
||||
return False, f"File does not exist: {filepath}"
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ffprobe", "-v", "error", "-show_format", "-show_streams",
|
||||
filepath],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return False, f"ffprobe failed: {result.stderr.strip()}"
|
||||
return True, ""
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "ffprobe timed out"
|
||||
except Exception as exc:
|
||||
return False, f"ffprobe error: {exc}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Execution — run_command
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def run_command(cmd: list[str]) -> tuple[bool, str]:
|
||||
"""Execute *cmd* (a list of strings) via subprocess.
|
||||
|
||||
Returns ``(success, message)``. *message* is empty on success.
|
||||
"""
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=3600, # generous 1-hour timeout
|
||||
)
|
||||
if proc.returncode == 0:
|
||||
return True, ""
|
||||
|
||||
stderr = proc.stderr.strip()
|
||||
# Summarise the last few lines of ffmpeg output for the user
|
||||
lines = stderr.split("\n")
|
||||
summary = "\n".join(lines[-10:]) if len(lines) > 10 else stderr
|
||||
return False, summary
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "Conversion timed out (exceeded 1 hour)"
|
||||
except Exception as exc:
|
||||
return False, str(exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Quality helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _quality_to_crf(quality: int) -> int:
|
||||
"""Map user quality (0–100) to a CRF value (0–51).
|
||||
|
||||
Higher quality → lower CRF.
|
||||
"""
|
||||
# Invert so quality=100 → CRF=0, quality=0 → CRF=51
|
||||
return max(0, min(51, 51 - int(quality * 51 / 100)))
|
||||
|
||||
|
||||
def _audio_quality_to_abr(codec: str, quality: int) -> tuple[str, str]:
|
||||
"""Return (param_flag, value) for audio quality.
|
||||
|
||||
Supports AAC, MP3, Opus, FLAC, Vorbis.
|
||||
"""
|
||||
quality = max(0, min(100, quality))
|
||||
|
||||
if codec == "aac":
|
||||
# AAC uses -b:a with a rate mapping
|
||||
# Map 0-100 → 32-320 kbps
|
||||
abr = max(32, int(32 + quality * 288 / 100))
|
||||
return ("-b:a", f"{abr}k")
|
||||
elif codec == "libmp3lame":
|
||||
# MP3 uses -q:a (VBR) 0-9 → map quality
|
||||
q = max(0, min(9, int(9 * (100 - quality) / 100)))
|
||||
return ("-q:a", str(q))
|
||||
elif codec == "libvorbis":
|
||||
# Vorbis uses -q:a 0-10 → map quality
|
||||
q = max(0, min(10, round(quality * 10 / 100)))
|
||||
return ("-q:a", str(q))
|
||||
elif codec == "libopus":
|
||||
# Opus uses -b:a
|
||||
abr = max(32, int(32 + quality * 256 / 100))
|
||||
return ("-b:a", f"{abr}k")
|
||||
elif codec == "flac":
|
||||
# FLAC is lossless — quality is irrelevant, just use default
|
||||
return ("-compression_level", "5")
|
||||
elif codec == "pcm_s16le":
|
||||
# WAV is lossless
|
||||
return ("", "")
|
||||
else:
|
||||
# Generic: use bitrate
|
||||
abr = max(32, int(32 + quality * 288 / 100))
|
||||
return ("-b:a", f"{abr}k")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Image → Image command builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_image_command(
|
||||
src: str,
|
||||
dst: str,
|
||||
fmt: str,
|
||||
quality: int,
|
||||
is_animated: bool,
|
||||
loop: bool,
|
||||
mime: str = "",
|
||||
) -> list[str]:
|
||||
"""Build an ffmpeg command for image → image conversion.
|
||||
|
||||
Handles both single-frame and animated images.
|
||||
"""
|
||||
ext = fmt.lower()
|
||||
cmd = ["ffmpeg", "-y", "-i", src]
|
||||
|
||||
if is_animated:
|
||||
# Animated: preserve all frames
|
||||
cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"])
|
||||
if ext == "png":
|
||||
# For animated PNG, extract frames then use ImageMagick or ffmpeg
|
||||
# We'll use ffmpeg with a numbered sequence
|
||||
pass
|
||||
elif ext == "webp":
|
||||
cmd.extend(["-c:v", "libwebp_anim", "-loop", "0"])
|
||||
elif ext == "gif":
|
||||
# Palettegen-based GIF
|
||||
return _build_video_gif(src, dst, quality)
|
||||
elif ext in ("mp4", "mkv", "webm", "avi", "mov"):
|
||||
cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"])
|
||||
if ext == "webm":
|
||||
cmd.extend(["-c:a", "libvorbis"])
|
||||
if loop and ext in ("webp", "gif"):
|
||||
cmd.extend(["-loop", "0"])
|
||||
else:
|
||||
# Single frame — just encode it
|
||||
cmd.extend(["-frames:v", "1"])
|
||||
if ext == "png":
|
||||
cmd.extend(["-c:v", "png"])
|
||||
elif ext in ("jpg", "jpeg"):
|
||||
q = _quality_to_crf(quality)
|
||||
cmd.extend(["-c:v", "libjpeg-turbo", "-q:v", str(q)])
|
||||
elif ext == "webp":
|
||||
lossless = quality >= 100
|
||||
if lossless:
|
||||
cmd.extend(["-c:v", "libwebp", "-lossless", "1"])
|
||||
else:
|
||||
q = _quality_to_crf(quality)
|
||||
cmd.extend(["-c:v", "libwebp", "-lossless", "0", "-q:v", str(q)])
|
||||
elif ext == "avif":
|
||||
q = _quality_to_crf(quality)
|
||||
cmd.extend(["-c:v", "libaom-av1", "-cpu-used", "4", "-q:v", str(q)])
|
||||
elif ext == "bmp":
|
||||
cmd.extend(["-c:v", "bmp"])
|
||||
|
||||
cmd.extend(["-pix_fmt", "yuv420p", dst])
|
||||
return cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Video → Video command builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_video_command(
|
||||
src: str,
|
||||
dst: str,
|
||||
fmt: str,
|
||||
quality: int,
|
||||
audio_quality: int,
|
||||
audio_streams: list[int],
|
||||
sub_streams: list[int],
|
||||
mime: str = "",
|
||||
) -> list[str]:
|
||||
"""Build an ffmpeg command for video → video conversion.
|
||||
|
||||
Handles audio stream selection and subtitle streams.
|
||||
"""
|
||||
cmd = ["ffmpeg", "-y"]
|
||||
|
||||
# Input
|
||||
cmd.extend(["-i", src])
|
||||
|
||||
# Select audio streams if specified
|
||||
# audio_streams contains ffprobe stream indices (global), e.g. [1]
|
||||
# Use the global-index form ``0:<index>`` — not ``0:a:N`` (invalid syntax).
|
||||
if audio_streams:
|
||||
for idx in audio_streams:
|
||||
cmd.extend(["-map", f"0:{idx}"])
|
||||
else:
|
||||
# No specific audio selection — copy first audio stream if present
|
||||
cmd.extend(["-map", "0:a:0"])
|
||||
|
||||
# Subtitles
|
||||
# sub_streams contains ffprobe stream indices (global), e.g. [2]
|
||||
if sub_streams:
|
||||
for idx in sub_streams:
|
||||
cmd.extend(["-map", f"0:{idx}"])
|
||||
else:
|
||||
# Try to copy subtitle streams
|
||||
cmd.extend(["-map", "0:s?"])
|
||||
|
||||
# Video encoding
|
||||
cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"])
|
||||
|
||||
# CRF mode
|
||||
crf = _quality_to_crf(quality)
|
||||
cmd.extend(["-crf", str(crf)])
|
||||
|
||||
# Audio encoding
|
||||
ext = fmt.lower()
|
||||
audio_codec_map = {
|
||||
"mp4": "aac",
|
||||
"mkv": "aac",
|
||||
"webm": "libvorbis",
|
||||
"avi": "aac",
|
||||
"mov": "aac",
|
||||
}
|
||||
ac = audio_codec_map.get(ext, "aac")
|
||||
|
||||
af, av = _audio_quality_to_abr(ac, audio_quality)
|
||||
if af:
|
||||
cmd.extend([af, av])
|
||||
|
||||
# Container-specific options
|
||||
if ext == "webm":
|
||||
cmd.extend(["-c:a", "libvorbis"])
|
||||
elif ext == "mp4":
|
||||
cmd.extend(["-movflags", "+faststart"])
|
||||
|
||||
cmd.extend(["-c:s", "mov_text"])
|
||||
|
||||
# Output
|
||||
cmd.append(dst)
|
||||
return cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Video → Audio command builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_audio_command(
|
||||
src: str,
|
||||
dst: str,
|
||||
fmt: str,
|
||||
quality: int,
|
||||
audio_streams: list[int],
|
||||
) -> list[str]:
|
||||
"""Build an ffmpeg command for audio extraction / format conversion.
|
||||
|
||||
Can be called from video→audio or audio→audio conversions.
|
||||
"""
|
||||
cmd = ["ffmpeg", "-y", "-i", src]
|
||||
|
||||
# Select audio stream(s)
|
||||
# audio_streams contains ffprobe stream indices (global), e.g. [1]
|
||||
# We must use the global-index form ``0:<index>`` — not ``0:a:N``
|
||||
# which is invalid ffmpeg syntax.
|
||||
if audio_streams:
|
||||
for idx in audio_streams:
|
||||
cmd.extend(["-map", f"0:{idx}"])
|
||||
else:
|
||||
cmd.extend(["-map", "0:a:0"])
|
||||
|
||||
# Discard video — keep audio only
|
||||
cmd.extend(["-vn"])
|
||||
|
||||
# Determine codec from format
|
||||
fmt_lower = fmt.lower()
|
||||
_codec_defaults: dict[str, tuple[str, str, str]] = {
|
||||
"mp3": ("libmp3lame", "", ""),
|
||||
"flac": ("flac", "-compression_level", "5"),
|
||||
"wav": ("pcm_s16le", "", ""),
|
||||
"ogg": ("libvorbis", "", ""),
|
||||
"m4a": ("aac", "", ""),
|
||||
"aac": ("aac", "", ""),
|
||||
}
|
||||
ac, af1, af2 = _codec_defaults.get(fmt_lower, ("aac", "-b:a", "128k"))
|
||||
|
||||
cmd.extend(["-c:a", ac])
|
||||
if af1:
|
||||
cmd.extend([af1])
|
||||
if af2:
|
||||
cmd.extend([af2])
|
||||
|
||||
# Quality override
|
||||
qf, qv = _audio_quality_to_abr(ac, quality)
|
||||
if qf:
|
||||
# Replace the default codec params with quality-based ones
|
||||
cmd = cmd[:-2] if len(cmd) >= 2 and cmd[-2] == af1 else cmd
|
||||
cmd.extend([qf, qv])
|
||||
|
||||
# Output
|
||||
cmd.append(dst)
|
||||
return cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Video → GIF (palettegen-based)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_video_gif(src: str, dst: str, quality: int) -> list[str]:
|
||||
"""Build a palettegen-based ffmpeg command for video → GIF.
|
||||
|
||||
Uses a two-pass approach: first generate a palette, then use it.
|
||||
"""
|
||||
quality = max(0, min(100, quality))
|
||||
|
||||
# Map quality to dithering mode and palette quality
|
||||
# Higher quality = better palette + more dithering
|
||||
if quality > 80:
|
||||
dither = "bayer"
|
||||
bayer_scale = "5"
|
||||
elif quality > 60:
|
||||
dither = "sierra2_4a"
|
||||
bayer_scale = "4"
|
||||
elif quality > 40:
|
||||
dither = "burkes"
|
||||
bayer_scale = "3"
|
||||
else:
|
||||
dither = "none"
|
||||
bayer_scale = "1"
|
||||
|
||||
# Palette generation
|
||||
palette_temp = dst + ".palette"
|
||||
|
||||
# Two-pass command:
|
||||
# Pass 1: generate palette
|
||||
# Pass 2: use palette for GIF
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-i", src,
|
||||
"-vf",
|
||||
"fps=24,scale=640:-1:flags=lanczos,palettegen=max_colors=256:stats_mode=diff",
|
||||
"-y", palette_temp,
|
||||
]
|
||||
# Run pass 1 separately, then append pass 2
|
||||
subprocess.run(cmd, capture_output=True, timeout=300)
|
||||
|
||||
# Pass 2: apply palette
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-i", src,
|
||||
"-i", palette_temp,
|
||||
"-lavfi",
|
||||
f"fps=24,scale=640:-1:flags=lanczos [x]; [x][1:v] paletteuse=dither={dither}:bayer_scale={bayer_scale}:diff_mode=rectangle",
|
||||
dst,
|
||||
]
|
||||
|
||||
# Cleanup palette temp file
|
||||
try:
|
||||
os.remove(palette_temp)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Video → Animated WebP (libwebp_anim)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_video_webp(
|
||||
src: str,
|
||||
dst: str,
|
||||
quality: int,
|
||||
loop: bool,
|
||||
) -> list[str]:
|
||||
"""Build an ffmpeg command for video → animated WebP using libwebp_anim."""
|
||||
quality = max(0, min(100, quality))
|
||||
|
||||
# WebP uses lossless/lossy quality
|
||||
# Map quality: 0-100 → lossless or quality factor 0-100
|
||||
lossiness = max(0, int((100 - quality) * 10))
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-i", src,
|
||||
"-c:v", "libwebp_anim",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-lossless", "0",
|
||||
"-lossiness", str(lossiness),
|
||||
]
|
||||
|
||||
if loop:
|
||||
cmd.extend(["-loop", "0"])
|
||||
|
||||
# Frame rate
|
||||
cmd.extend(["-r", "24"])
|
||||
|
||||
# Output
|
||||
cmd.append(dst)
|
||||
return cmd
|
||||
Reference in New Issue
Block a user