"""Audio player using QMediaPlayer + yt-dlp with queue and local-file playback. Instead of streaming YouTube audio URLs directly (which expire within seconds and cause "Connection reset by peer" / "Demuxing failed" errors), we download each song to a temporary file and play from the local filesystem. This is reliable because QMediaPlayer never touches the network directly. Architecture ------------ A single persistent ``DownloadWorker`` lives on a background ``QThread`` for the entire player lifetime. Work is sent via Qt signals — no thread creation/teardown, no signal-disconnect dances, no ``quit()``/``wait()`` timeouts. Every download is tagged with a monotonically-increasing ``task_id``; stale results (from superseded requests) are simply discarded. State machine:: IDLE ──▶ LOADING ──▶ READY ──▶ PLAYING ──▶ ENDED ▲ │ │ │ ▲ │ │ ▼ │ │ │ │ │ FAILED ───────┘ ▼ │ │ └───────────────────────────────── PAUSED │ │ ▲ │ └──┘ │ │ (queue empty)─┘ Set ``TEST_MODE = True`` below to bypass yt-dlp and generate a short test-tone WAV instead — useful for UI development without network access. """ import logging import os import struct import tempfile import atexit import math import shutil import threading from typing import Optional import yt_dlp from PySide6.QtCore import QObject, QThread, Signal, Slot, QUrl from PySide6.QtMultimedia import QMediaPlayer, QAudioOutput log = logging.getLogger("tunetti.player") # ── User-settable flags ──────────────────────────────────────────────────── TEST_MODE: bool = False # skip yt-dlp, use local test-tone WAV instead VERBOSE: bool = False # enable yt-dlp debug output # ═══════════════════════════════════════════════════════════════════════════════ # Temp-file housekeeping # ═══════════════════════════════════════════════════════════════════════════════ _TUNETTI_TMP: str | None = None def _tmp_dir() -> str: """Return a scoped temp directory that lives for the process lifetime.""" global _TUNETTI_TMP if _TUNETTI_TMP is None: _TUNETTI_TMP = tempfile.mkdtemp(prefix="tunetti_") atexit.register(shutil.rmtree, _TUNETTI_TMP, ignore_errors=True) return _TUNETTI_TMP def _stale_files(video_id: str) -> list[str]: """Return paths of existing non-temp files for *video_id*, newest first.""" td = _tmp_dir() result = [] for fn in os.listdir(td): if fn.startswith(video_id) and ".temp." not in fn: path = os.path.join(td, fn) if os.path.isfile(path) and os.path.getsize(path) > 0: result.append(path) result.sort(key=lambda p: os.path.getmtime(p), reverse=True) return result # ═══════════════════════════════════════════════════════════════════════════════ # Test-tone generator (used when TEST_MODE = True) # ═══════════════════════════════════════════════════════════════════════════════ def _generate_test_wav(path: str, duration_s: int = 15) -> None: """Write a 440 Hz sine wave to *path* as a 16-bit mono WAV file.""" sample_rate = 44100 num_samples = sample_rate * duration_s frequency = 440.0 amplitude = 0.3 with open(path, "wb") as f: data_size = num_samples * 2 f.write(b"RIFF") f.write(struct.pack(" None: """Entry point — invoked on the worker thread.""" self._task_id = task_id self._cancelled = False video_id = song.get("videoId") or song.get("video_id", "") temp_path: Optional[str] = None try: # Keep the stale cleanup at the start of each new download. for stale in _stale_files(video_id): try: os.unlink(stale) except OSError: pass # ── TEST MODE ──────────────────────────────────────────── if TEST_MODE: wav_path = os.path.join(_tmp_dir(), f"{video_id}.wav") _generate_test_wav(wav_path) resolved = dict(song) resolved["_local_path"] = wav_path resolved["duration"] = 15 resolved["thumbnail"] = "" resolved["title"] = song.get("title", f"Test {video_id}") with self._lock: if not self._cancelled: self.download_succeeded.emit(task_id, resolved) return # ── Real download ──────────────────────────────────────── tmpl = os.path.join(_tmp_dir(), f"{video_id}.%(ext)s") ydl_opts: dict = { "quiet": not VERBOSE, "no_warnings": not VERBOSE, "verbose": VERBOSE, "format": "worstaudio/worst", "outtmpl": tmpl, "noprogress": not VERBOSE, "extract_flat": False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info( f"https://music.youtube.com/watch?v={video_id}", download=True, ) # Construct the expected path from the output template. # The outtmpl is /.%(ext)s and yt-dlp resolves # %(ext)s from the downloaded format. We avoid relying on # info["requested_downloads"][0]["filepath"] because that key # may be set *before* post-processor fixup renames the file. actual_ext = info.get("ext", "") expected_path = os.path.join(_tmp_dir(), f"{video_id}.{actual_ext}") if actual_ext else "" temp_path = self._resolve_temp_path(video_id, expected_path) if not temp_path: raise RuntimeError( f"Download produced empty/missing file for {video_id}" ) resolved = self._build_resolved_dict(song, info, temp_path) with self._lock: if not self._cancelled: self.download_succeeded.emit(task_id, resolved) except Exception as exc: log.error("Download error for %s: %s", video_id, exc) self._cleanup_path(temp_path) with self._lock: if not self._cancelled: self.download_failed.emit(task_id) def _resolve_temp_path(self, video_id: str, expected_path: str) -> Optional[str]: """Resolve the actual temp file path after a download. First checks the *expected_path* (built from the extension yt-dlp reported), then falls back to scanning the temp directory for stale files. """ # Debug: list every file in the temp dir for this video_id td = _tmp_dir() try: all_matches = [ (fn, os.path.getsize(os.path.join(td, fn))) for fn in os.listdir(td) if fn.startswith(video_id) ] all_matches.sort() log.debug("Temp files for %s: %s", video_id, all_matches) except OSError as exc: log.debug("Temp dir listing failed: %s", exc) if expected_path: log.debug("Checking expected path: %s", expected_path) if expected_path and os.path.isfile(expected_path) and os.path.getsize(expected_path) > 0: return expected_path # Fallback: scan the temp directory for the most recent valid file. candidates = _stale_files(video_id) log.debug("_stale_files fallback for %s: %s", video_id, candidates) if candidates: return candidates[0] return None @staticmethod def _build_resolved_dict(song: dict, info: dict, temp_path: str) -> dict: """Build the resolved song dict with local path and metadata.""" resolved = dict(song) resolved["_local_path"] = temp_path resolved["duration"] = info.get("duration") or song.get("duration", 0) resolved["thumbnail"] = info.get("thumbnail") or song.get("thumbnail", "") resolved["title"] = info.get("title", song.get("title", "")) return resolved @Slot() def _do_cancel(self) -> None: """Mark the current download as cancelled.""" with self._lock: self._cancelled = True @staticmethod def _cleanup_path(path: Optional[str]) -> None: if path and os.path.isfile(path): try: os.unlink(path) except OSError: pass # ═══════════════════════════════════════════════════════════════════════════════ # Search worker # ═══════════════════════════════════════════════════════════════════════════════ class SearchWorker(QObject): """Runs a ytmusicapi search in a background thread. Searches both songs and videos, returning categorised results as a dict with ``"songs"`` and ``"videos"`` keys each containing a list of normalised song dicts. """ results_ready = Signal(dict) failed = Signal(str) def __init__(self, query: str): super().__init__() self._query = query @Slot() def run(self) -> None: if TEST_MODE: dummy = [ {"videoId": f"dummy_{i}", "title": f"Test Song {i}", "artists": [{"name": f"Artist {i}"}], "album": None, "duration_seconds": 15, "duration": "0:15", "thumbnails": [], "videoType": "MUSIC_VIDEO_TYPE_ATV", "resultType": "song"} for i in range(5) ] self.results_ready.emit({"songs": dummy, "videos": []}) return from ytmusicapi import YTMusic try: yt = YTMusic() # Always search both categories so the UI can display them # side by side with a "Videos" and "Songs" header. raw_songs = yt.search(self._query, filter="songs", limit=20) raw_videos = yt.search(self._query, filter="videos", limit=20) songs = [ _build_song_dict(r) for r in raw_songs if r.get("videoId") ] videos = [ _build_song_dict(r) for r in raw_videos if r.get("videoId") ] self.results_ready.emit({"songs": songs, "videos": videos}) except Exception as exc: log.error("Search failed: %s", exc) self.failed.emit(str(exc)) # ═══════════════════════════════════════════════════════════════════════════════ # Main AudioPlayer # ═══════════════════════════════════════════════════════════════════════════════ class AudioPlayer(QObject): """QObject-based audio player wrapping QMediaPlayer with local-file playback. Every song is downloaded to a temporary file before playing. A single persistent ``DownloadWorker`` on a background ``QThread`` handles all downloads — no thread creation/teardown, no signal-disconnect races, no ``quit()``/``wait()`` timeouts. Signals ------- loading(video_id) song_started(song_dict) song_ended(video_id) paused(song_dict) resumed(song_dict) error(dict) position_changed(ms: int) duration_changed(ms: int) playback_state_changed(state: str) — "playing"|"paused"|"stopped" """ loading = Signal(str) song_loaded = Signal(bool) song_started = Signal(dict) song_ended = Signal(str) paused = Signal(dict) resumed = Signal(dict) error = Signal(dict) position_changed = Signal(int) duration_changed = Signal(int) playback_state_changed = Signal(str) seeked = Signal(int) # position_ms after seek def __init__(self, parent: Optional[QObject] = None): super().__init__(parent) # ── QMediaPlayer ────────────────────────────────────────────── self._player = QMediaPlayer(self) self._audio_output = QAudioOutput(self) self._player.setAudioOutput(self._audio_output) self._audio_output.setVolume(0.5) self._player.mediaStatusChanged.connect(self._on_media_status) self._player.positionChanged.connect(self._on_position) self._player.durationChanged.connect(self._on_duration) self._player.playbackStateChanged.connect(self._on_playback_state) self._player.errorOccurred.connect(self._on_player_error_occurred) # ── Queue & state ───────────────────────────────────────────── self._queue: list[dict] = [] self._current: Optional[dict] = None self._history: list[dict] = [] self._loop_mode = False # ── Download worker (single persistent thread) ──────────────── self._next_task_id = 0 # The task ID that is "current" — results with other IDs are stale. self._active_task_id: Optional[int] = None # The task ID of the in-progress prefetch (if any). self._prefetch_task_id: Optional[int] = None # Cache of prefetched results: video_id → resolved dict with _local_path. self._prefetch_cache: dict[str, dict] = {} self._worker_thread = QThread(self) self._worker = DownloadWorker() self._worker.moveToThread(self._worker_thread) # Connect worker signals (cross-thread → queued delivery). self._worker.download_requested.connect(self._worker._do_download) self._worker.cancel_requested.connect(self._worker._do_cancel) self._worker.download_succeeded.connect(self._on_download_succeeded) self._worker.download_failed.connect(self._on_download_failed) self._worker_thread.start() # ═══════════════════════════════════════════════════════════════════════ # Public API # ═══════════════════════════════════════════════════════════════════════ # ── Volume ────────────────────────────────────────────────────────────── def set_volume(self, percent: int) -> None: self._audio_output.setVolume(max(0, min(100, percent)) / 100.0) def volume(self) -> int: return int(self._audio_output.volume() * 100) # ── Seek ──────────────────────────────────────────────────────────────── def seek(self, ms: int) -> None: self._player.setPosition(max(0, ms)) self.seeked.emit(max(0, ms)) # ── Play / Queue ──────────────────────────────────────────────────────── def play(self, song: dict) -> None: """Clear the queue and play *song* immediately.""" self._cleanup_temp_files() self._queue.clear() self._prefetch_cache.clear() self._loop_mode = False self._request_download(song) def queue_song(self, song: dict) -> None: self._queue.append(song) if self._current is None and self._active_task_id is None: self._pop_and_play() def queue_next(self, song: dict) -> None: """Insert *song* to play immediately after the current one.""" self._queue.insert(0, song) def queue_list(self, songs: list[dict]) -> None: self._queue.extend(songs) def skip(self) -> None: """Skip to the next song.""" # Push current song to history before moving on. if self._current is not None: self._history.append(self._current) self._cancel_active_downloads() self._player.stop() self._advance() def previous(self) -> None: """Go back to the previous song, if available.""" if not self._history: return prev = self._history.pop() self._cancel_active_downloads() self._player.stop() self._queue.insert(0, prev) self._try_next_song() def has_previous(self) -> bool: """Whether a previous song is available.""" return len(self._history) > 0 def is_loaded(self) -> bool: """Whether any song is currently loaded (playing or paused).""" return self._current is not None def stop_playback(self) -> None: """Soft stop: halt playback, clear queue, cancel downloads.""" self._cancel_active_downloads() self._cleanup_temp_files() self._queue.clear() self._prefetch_cache.clear() self._history.clear() self._loop_mode = False self._current = None self._player.stop() self.song_loaded.emit(False) self.playback_state_changed.emit("stopped") @Slot() def _pop_and_play(self) -> None: """Pop the next song from the queue and start downloading it. If it was already prefetched, the download is instant (already cached). """ if not self._queue: return song = self._queue.pop(0) vid = song.get("videoId") or song.get("video_id", "") cached = self._prefetch_cache.pop(vid, None) if cached is not None: # Prefetched — play directly without downloading. self._start_playback(cached) else: self._request_download(song) # ── Pause / Resume ────────────────────────────────────────────────────── def pause(self) -> None: if self._player.playbackState() == QMediaPlayer.PlaybackState.PlayingState: self._player.pause() def resume(self) -> None: if self._player.playbackState() == QMediaPlayer.PlaybackState.PausedState: self._player.play() def toggle_pause(self) -> bool: state = self._player.playbackState() if state == QMediaPlayer.PlaybackState.PlayingState: self._player.pause() return True elif state == QMediaPlayer.PlaybackState.PausedState: self._player.play() return False return False def is_paused(self) -> bool: return self._player.playbackState() == QMediaPlayer.PlaybackState.PausedState def is_playing(self) -> bool: return self._player.playbackState() == QMediaPlayer.PlaybackState.PlayingState def is_stopped(self) -> bool: return self._player.playbackState() == QMediaPlayer.PlaybackState.StoppedState # ── Queue introspection ──────────────────────────────────────────────── def get_current(self) -> Optional[dict]: return self._current def get_queue(self) -> list[dict]: return list(self._queue) def get_queue_length(self) -> int: return len(self._queue) def get_loop(self) -> bool: return self._loop_mode def get_position(self) -> int: return self._player.position() def set_loop(self, enabled: bool) -> None: self._loop_mode = enabled # ═══════════════════════════════════════════════════════════════════════ # Download lifecycle # ═══════════════════════════════════════════════════════════════════════ def _request_download(self, song: dict) -> None: """Request a download via the persistent worker. Cancels any in-progress download and prefetch first, then assigns a fresh task ID and emits the request signal. """ vid = song.get("videoId") or song.get("video_id", "") # Guard: don't re-download what's already playing. current_vid = ( (self._current or {}).get("videoId") or (self._current or {}).get("video_id", "") ) if vid and vid == current_vid: log.debug("Ignoring duplicate download request for %s", vid) return # Fast path: already has a local file (e.g. from prefetch cache). if song.get("_local_path"): self._start_playback(song) return # Cancel any in-flight work — this is a new intent. self._cancel_active_downloads() task_id = self._next_task_id self._next_task_id += 1 self._active_task_id = task_id self.loading.emit(vid) self._worker.download_requested.emit(task_id, song) def _cancel_active_downloads(self) -> None: """Tell the worker to discard its current work and forget about it.""" if self._active_task_id is not None or self._prefetch_task_id is not None: self._worker.cancel_requested.emit() self._active_task_id = None self._prefetch_task_id = None @Slot(int, dict) def _on_download_succeeded(self, task_id: int, song: dict) -> None: """Called on the main thread when a download succeeds. Checks *task_id* against ``_active_task_id`` and ``_prefetch_task_id`` to discard stale results from superseded requests. """ log.debug("Download succeeded: task_id=%d, active=%s, prefetch=%s", task_id, self._active_task_id, self._prefetch_task_id) # ── Stale check for active download ────────────────────────── if task_id == self._active_task_id: self._active_task_id = None local_path = song.get("_local_path") if not local_path or not os.path.isfile(local_path): log.error("Download succeeded but file missing: %s", local_path) self.error.emit({ "video_id": song.get("videoId", ""), "error": "Downloaded file went missing", }) self._try_next_song() return self._start_playback(song) return # ── Stale check for prefetch download ──────────────────────── if task_id == self._prefetch_task_id: self._prefetch_task_id = None vid = song.get("videoId") or song.get("video_id", "") # Only cache if the song is still in the active queue. if any( (s.get("videoId") or s.get("video_id", "")) == vid for s in self._queue ): log.debug("Caching prefetched result for %s (task %d)", vid, task_id) self._prefetch_cache[vid] = song else: log.debug("Prefetched %s no longer in queue, discarding", vid) return # ── Otherwise: truly stale — discard ──────────────────────────── log.debug("Discarding stale download result for task %d (task_id=%d, active=%s, prefetch=%s)", task_id, task_id, self._active_task_id, self._prefetch_task_id) @Slot(int) def _on_download_failed(self, task_id: int) -> None: """Called on the main thread when a download fails.""" log.debug("Download failed: task_id=%d, active=%s, prefetch=%s", task_id, self._active_task_id, self._prefetch_task_id) if task_id == self._active_task_id: self._active_task_id = None log.warning("Download failed for task %d", task_id) self.error.emit({ "video_id": "", "error": "Failed to resolve audio URL", }) self._try_next_song() elif task_id == self._prefetch_task_id: self._prefetch_task_id = None log.debug("Prefetch failed for task %d (harmless)", task_id) else: log.debug("Ignoring stale download failure for task %d", task_id) # ── Playback ─────────────────────────────────────────────────────── def _start_playback(self, song: dict) -> None: """Begin playing a resolved song (already has ``_local_path``). Always called on the main thread. """ # Remove the previous song's temp file BEFORE updating _current. self._cleanup_current_temp() local_path = song.get("_local_path", "") log.debug("Starting playback: path=%s, exists=%s, size=%s", local_path, os.path.isfile(local_path) if local_path else "N/A", os.path.getsize(local_path) if local_path and os.path.isfile(local_path) else "N/A") self._player.stop() self._current = song self._player.setSource(QUrl.fromLocalFile(local_path)) self._player.play() self.song_started.emit(song) self.song_loaded.emit(True) # Kick off prefetch for the next queued song. self._start_prefetch() # ═══════════════════════════════════════════════════════════════════════ # End-of-song handling # ═══════════════════════════════════════════════════════════════════════ def _on_media_status(self, status: QMediaPlayer.MediaStatus) -> None: if status == QMediaPlayer.MediaStatus.EndOfMedia: self._advance() elif status == QMediaPlayer.MediaStatus.InvalidMedia: if self._current is None: return if self._player.playbackState() == QMediaPlayer.PlaybackState.StoppedState: return self.error.emit({ "video_id": (self._current or {}).get("videoId", ""), "error": "Invalid media", }) # Don't call _advance here — the player will go to stopped state, # and _on_playback_state will handle advancing if appropriate. # But let's advance immediately for invalid media. self._advance() def _advance(self) -> None: """Move to the next song (or stop if the queue is empty). This is the single exit point from an actively-playing song. It handles loop re-queuing, song_ended emission, and queue popping. """ if self._current is None: return finished_song = self._current finished_vid = finished_song.get("videoId") or finished_song.get( "video_id", "" ) self._current = None self.song_ended.emit(finished_vid) # Re-queue for loop mode. if self._loop_mode: self._queue.insert(0, finished_song) self._try_next_song() def _try_next_song(self) -> None: """Attempt to play the next song, or stop if the queue is empty.""" if self._queue: self._pop_and_play() else: self.playback_state_changed.emit("stopped") # ── Position / duration / state ──────────────────────────────────── @Slot(int) def _on_position(self, ms: int) -> None: self.position_changed.emit(ms) @Slot(int) def _on_duration(self, ms: int) -> None: self.duration_changed.emit(ms) @Slot(QMediaPlayer.PlaybackState) def _on_playback_state(self, state: QMediaPlayer.PlaybackState) -> None: if state == QMediaPlayer.PlaybackState.PlayingState: self.playback_state_changed.emit("playing") if self._current: self.resumed.emit(self._current) elif state == QMediaPlayer.PlaybackState.PausedState: self.playback_state_changed.emit("paused") if self._current: self.paused.emit(self._current) else: self.playback_state_changed.emit("stopped") # ── errorOccurred handler ───────────────────────────────────────── @Slot(QMediaPlayer.Error, str) def _on_player_error_occurred( self, error: QMediaPlayer.Error, error_string: str ) -> None: """Handle QMediaPlayer error signals.""" if error in (QMediaPlayer.Error.NetworkError, QMediaPlayer.Error.ResourceError): vid = (self._current or {}).get("videoId", "") log.warning("Player error (%s): %s — advancing", error, error_string) self.error.emit({ "video_id": vid, "error": f"{error_string} (local file issue)", }) self._advance() # ── Prefetch ────────────────────────────────────────────────────────────── def _start_prefetch(self) -> None: """Request a download of the next queued song (if any). Called after playback of the current song starts. Uses the same persistent worker — no separate thread needed. """ if not self._queue: return next_song = self._queue[0] vid = next_song.get("videoId") or next_song.get("video_id", "") # Already cached or already downloading. if vid in self._prefetch_cache: return if vid == self._prefetch_task_id: return # Don't prefetch if it's already the active download target. if self._active_task_id is not None: # The active download might be for this same song (e.g. if # _request_download was called but is still in flight). Check by # tracking task_id alone is enough — if there's an active download, # the prefetch is redundant anyway. return task_id = self._next_task_id self._next_task_id += 1 self._prefetch_task_id = task_id self._worker.download_requested.emit(task_id, next_song) # ═══════════════════════════════════════════════════════════════════════ # Temp-file cleanup # ═══════════════════════════════════════════════════════════════════════ def _cleanup_current_temp(self) -> None: """Remove the temp file of the song we're about to replace.""" if self._current is None: return lp = self._current.get("_local_path") if lp and os.path.isfile(lp): try: os.unlink(lp) except OSError: pass def _cleanup_temp_files(self) -> None: """Remove all temp files we know about (current + prefetch cache).""" if self._current: lp = self._current.get("_local_path") if lp and os.path.isfile(lp): try: os.unlink(lp) except OSError: pass for cached in self._prefetch_cache.values(): lp = cached.get("_local_path") if lp and os.path.isfile(lp): try: os.unlink(lp) except OSError: pass # ── Shutdown ────────────────────────────────────────────────────────────── def shutdown(self) -> None: """Clean shutdown — stop playback, quit worker thread, remove temps.""" self._player.stop() self._cancel_active_downloads() self._cleanup_temp_files() self._queue.clear() self._prefetch_cache.clear() self._current = None self._loop_mode = False # Tear down the persistent worker thread. self._worker_thread.quit() self._worker_thread.wait(3000) # The worker and thread objects are owned by self (parented), so # they'll be cleaned up by Qt when the AudioPlayer is destroyed. # ── Helpers ─────────────────────────────────────────────────────────────────── def _best_thumbnail(thumbnails) -> str: if not thumbnails: return "" if isinstance(thumbnails, str): return thumbnails if isinstance(thumbnails, list): best = thumbnails[-1] return best.get("url", "") if isinstance(best, dict) else str(best) return str(thumbnails) def _build_song_dict(search_result: dict) -> dict: """Normalise a ytmusicapi result into our standard song dict.""" return { "videoId": search_result.get("videoId", ""), "title": search_result.get("title", "Unknown"), "artists": search_result.get("artists") or [], "album": search_result.get("album"), "duration": search_result.get("duration_seconds", 0), "duration_label": search_result.get("duration", "?"), "thumbnail": _best_thumbnail(search_result.get("thumbnails", [])), "videoType": search_result.get("videoType", ""), "resultType": search_result.get("resultType", "song"), }