All checks were successful
SonarQube Code Quality Scan / SonarQube Scan (push) Successful in 3m36s
183 lines
5.9 KiB
Python
183 lines
5.9 KiB
Python
"""Discord Rich Presence integration for Tunetti."""
|
||
|
||
import logging
|
||
import threading
|
||
import time
|
||
from typing import Optional
|
||
|
||
from pypresence import Presence, DiscordNotFound
|
||
from pypresence.types import ActivityType
|
||
|
||
log = logging.getLogger("tunetti.rpc")
|
||
|
||
|
||
def _safe_artists(song: Optional[dict]) -> str:
|
||
"""Extract a comma-separated artist string, handling None / missing keys."""
|
||
if not song:
|
||
return ""
|
||
artists = song.get("artists")
|
||
if not isinstance(artists, list):
|
||
return ""
|
||
names = []
|
||
for a in artists:
|
||
if isinstance(a, dict):
|
||
name = a.get("name", "")
|
||
if name:
|
||
names.append(name)
|
||
return ", ".join(names)
|
||
|
||
|
||
class DiscordRPC:
|
||
"""Manages Discord Rich Presence for the currently playing song.
|
||
|
||
Presence updates are normally batched every ``REFRESH_INTERVAL``
|
||
seconds, but calling :meth:`update_song` immediately signals the
|
||
background loop to send the new state right away.
|
||
"""
|
||
|
||
REFRESH_INTERVAL = 15 # seconds between re-sends
|
||
|
||
def __init__(self, client_id: str):
|
||
self._client_id = client_id
|
||
self._rpc: Optional[Presence] = None
|
||
self._connected = False
|
||
self._song: Optional[dict] = None
|
||
self._start_ts: Optional[int] = None
|
||
self._lock = threading.Lock()
|
||
self._thread: Optional[threading.Thread] = None
|
||
self._stop_event = threading.Event()
|
||
self._wake_event = threading.Event()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Lifecycle
|
||
# ------------------------------------------------------------------
|
||
def start(self) -> None:
|
||
"""Connect to Discord and start the background update loop."""
|
||
if self._thread and self._thread.is_alive():
|
||
return
|
||
self._stop_event.clear()
|
||
self._wake_event.clear()
|
||
self._thread = threading.Thread(target=self._run, daemon=True,
|
||
name="discord-rpc")
|
||
self._thread.start()
|
||
|
||
def stop(self) -> None:
|
||
self._stop_event.set()
|
||
self._wake_event.set()
|
||
if self._thread:
|
||
self._thread.join(timeout=3)
|
||
self._disconnect()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Public update
|
||
# ------------------------------------------------------------------
|
||
def update_song(self, song: Optional[dict],
|
||
reset_start: bool = True) -> None:
|
||
"""Set the currently playing song (or None to clear).
|
||
|
||
When *reset_start* is ``True`` (default), the start timestamp
|
||
is set to now so Discord shows a fresh progress bar. Pass
|
||
``False`` on resume to keep the original start time.
|
||
|
||
The presence is sent to Discord *immediately*.
|
||
"""
|
||
with self._lock:
|
||
self._song = song
|
||
if reset_start:
|
||
self._start_ts = int(time.time()) if song else None
|
||
self._wake_event.set()
|
||
|
||
def clear(self) -> None:
|
||
"""Clear the presence entirely."""
|
||
with self._lock:
|
||
self._song = None
|
||
self._start_ts = None
|
||
self._wake_event.set()
|
||
|
||
def seek_to(self, position_ms: int) -> None:
|
||
"""Recalculate start_ts after a seek so Discord's progress bar
|
||
shows the correct elapsed time from the new position."""
|
||
with self._lock:
|
||
if self._start_ts is not None:
|
||
self._start_ts = int(time.time()) - (position_ms // 1000)
|
||
self._wake_event.set()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Internal
|
||
# ------------------------------------------------------------------
|
||
def _connect(self) -> bool:
|
||
try:
|
||
self._rpc = Presence(self._client_id)
|
||
self._rpc.connect()
|
||
self._connected = True
|
||
log.info("Connected to Discord RPC")
|
||
return True
|
||
except DiscordNotFound:
|
||
log.warning("Discord not running – RPC unavailable")
|
||
self._connected = False
|
||
return False
|
||
except Exception as exc:
|
||
log.debug("Discord RPC connect failed: %s", exc)
|
||
self._connected = False
|
||
return False
|
||
|
||
def _disconnect(self) -> None:
|
||
if self._rpc:
|
||
try:
|
||
self._rpc.close()
|
||
except Exception:
|
||
pass
|
||
self._rpc = None
|
||
self._connected = False
|
||
|
||
def _send_presence(self) -> None:
|
||
with self._lock:
|
||
song = self._song
|
||
start = self._start_ts
|
||
|
||
if not song:
|
||
if self._connected and self._rpc is not None:
|
||
try:
|
||
self._rpc.clear()
|
||
except Exception:
|
||
self._connected = False
|
||
return
|
||
|
||
artists_str = _safe_artists(song)
|
||
title = song.get("title", "Unknown")
|
||
thumbnail = song.get("thumbnail") or ""
|
||
duration = song.get("duration", 0)
|
||
|
||
end_ts = (start + duration) if start and duration else None
|
||
|
||
if self._rpc is None:
|
||
return
|
||
|
||
try:
|
||
self._rpc.update(
|
||
activity_type=ActivityType.LISTENING,
|
||
details=title,
|
||
state=f"by {artists_str}" if artists_str else "YouTube Music",
|
||
large_image=thumbnail,
|
||
start=start,
|
||
end=end_ts,
|
||
)
|
||
except Exception as exc:
|
||
log.debug("RPC update failed: %s", exc)
|
||
self._connected = False
|
||
|
||
def _run(self) -> None:
|
||
while not self._stop_event.is_set():
|
||
if not self._connected:
|
||
self._connect()
|
||
if not self._connected:
|
||
self._stop_event.wait(10)
|
||
continue
|
||
|
||
self._send_presence()
|
||
# Wait for either the refresh interval or an immediate wake-up
|
||
self._wake_event.wait(self.REFRESH_INTERVAL)
|
||
self._wake_event.clear()
|
||
|
||
self._disconnect()
|