🎉 | Project added
All checks were successful
SonarQube Code Quality Scan / SonarQube Scan (push) Successful in 3m36s
All checks were successful
SonarQube Code Quality Scan / SonarQube Scan (push) Successful in 3m36s
This commit is contained in:
182
discord_rpc.py
Normal file
182
discord_rpc.py
Normal file
@@ -0,0 +1,182 @@
|
||||
"""Discord Rich Presence integration for Tunetti."""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from pypresence import Presence, DiscordNotFound
|
||||
from pypresence.types import ActivityType
|
||||
|
||||
log = logging.getLogger("tunetti.rpc")
|
||||
|
||||
|
||||
def _safe_artists(song: Optional[dict]) -> str:
|
||||
"""Extract a comma-separated artist string, handling None / missing keys."""
|
||||
if not song:
|
||||
return ""
|
||||
artists = song.get("artists")
|
||||
if not isinstance(artists, list):
|
||||
return ""
|
||||
names = []
|
||||
for a in artists:
|
||||
if isinstance(a, dict):
|
||||
name = a.get("name", "")
|
||||
if name:
|
||||
names.append(name)
|
||||
return ", ".join(names)
|
||||
|
||||
|
||||
class DiscordRPC:
|
||||
"""Manages Discord Rich Presence for the currently playing song.
|
||||
|
||||
Presence updates are normally batched every ``REFRESH_INTERVAL``
|
||||
seconds, but calling :meth:`update_song` immediately signals the
|
||||
background loop to send the new state right away.
|
||||
"""
|
||||
|
||||
REFRESH_INTERVAL = 15 # seconds between re-sends
|
||||
|
||||
def __init__(self, client_id: str):
|
||||
self._client_id = client_id
|
||||
self._rpc: Optional[Presence] = None
|
||||
self._connected = False
|
||||
self._song: Optional[dict] = None
|
||||
self._start_ts: Optional[int] = None
|
||||
self._lock = threading.Lock()
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._stop_event = threading.Event()
|
||||
self._wake_event = threading.Event()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
def start(self) -> None:
|
||||
"""Connect to Discord and start the background update loop."""
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._stop_event.clear()
|
||||
self._wake_event.clear()
|
||||
self._thread = threading.Thread(target=self._run, daemon=True,
|
||||
name="discord-rpc")
|
||||
self._thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
self._stop_event.set()
|
||||
self._wake_event.set()
|
||||
if self._thread:
|
||||
self._thread.join(timeout=3)
|
||||
self._disconnect()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public update
|
||||
# ------------------------------------------------------------------
|
||||
def update_song(self, song: Optional[dict],
|
||||
reset_start: bool = True) -> None:
|
||||
"""Set the currently playing song (or None to clear).
|
||||
|
||||
When *reset_start* is ``True`` (default), the start timestamp
|
||||
is set to now so Discord shows a fresh progress bar. Pass
|
||||
``False`` on resume to keep the original start time.
|
||||
|
||||
The presence is sent to Discord *immediately*.
|
||||
"""
|
||||
with self._lock:
|
||||
self._song = song
|
||||
if reset_start:
|
||||
self._start_ts = int(time.time()) if song else None
|
||||
self._wake_event.set()
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear the presence entirely."""
|
||||
with self._lock:
|
||||
self._song = None
|
||||
self._start_ts = None
|
||||
self._wake_event.set()
|
||||
|
||||
def seek_to(self, position_ms: int) -> None:
|
||||
"""Recalculate start_ts after a seek so Discord's progress bar
|
||||
shows the correct elapsed time from the new position."""
|
||||
with self._lock:
|
||||
if self._start_ts is not None:
|
||||
self._start_ts = int(time.time()) - (position_ms // 1000)
|
||||
self._wake_event.set()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal
|
||||
# ------------------------------------------------------------------
|
||||
def _connect(self) -> bool:
|
||||
try:
|
||||
self._rpc = Presence(self._client_id)
|
||||
self._rpc.connect()
|
||||
self._connected = True
|
||||
log.info("Connected to Discord RPC")
|
||||
return True
|
||||
except DiscordNotFound:
|
||||
log.warning("Discord not running – RPC unavailable")
|
||||
self._connected = False
|
||||
return False
|
||||
except Exception as exc:
|
||||
log.debug("Discord RPC connect failed: %s", exc)
|
||||
self._connected = False
|
||||
return False
|
||||
|
||||
def _disconnect(self) -> None:
|
||||
if self._rpc:
|
||||
try:
|
||||
self._rpc.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._rpc = None
|
||||
self._connected = False
|
||||
|
||||
def _send_presence(self) -> None:
|
||||
with self._lock:
|
||||
song = self._song
|
||||
start = self._start_ts
|
||||
|
||||
if not song:
|
||||
if self._connected and self._rpc is not None:
|
||||
try:
|
||||
self._rpc.clear()
|
||||
except Exception:
|
||||
self._connected = False
|
||||
return
|
||||
|
||||
artists_str = _safe_artists(song)
|
||||
title = song.get("title", "Unknown")
|
||||
thumbnail = song.get("thumbnail") or ""
|
||||
duration = song.get("duration", 0)
|
||||
|
||||
end_ts = (start + duration) if start and duration else None
|
||||
|
||||
if self._rpc is None:
|
||||
return
|
||||
|
||||
try:
|
||||
self._rpc.update(
|
||||
activity_type=ActivityType.LISTENING,
|
||||
details=title,
|
||||
state=f"by {artists_str}" if artists_str else "YouTube Music",
|
||||
large_image=thumbnail,
|
||||
start=start,
|
||||
end=end_ts,
|
||||
)
|
||||
except Exception as exc:
|
||||
log.debug("RPC update failed: %s", exc)
|
||||
self._connected = False
|
||||
|
||||
def _run(self) -> None:
|
||||
while not self._stop_event.is_set():
|
||||
if not self._connected:
|
||||
self._connect()
|
||||
if not self._connected:
|
||||
self._stop_event.wait(10)
|
||||
continue
|
||||
|
||||
self._send_presence()
|
||||
# Wait for either the refresh interval or an immediate wake-up
|
||||
self._wake_event.wait(self.REFRESH_INTERVAL)
|
||||
self._wake_event.clear()
|
||||
|
||||
self._disconnect()
|
||||
Reference in New Issue
Block a user