"""SQLite-backed database for song history, favourites, and play stats.""" import sqlite3 import threading from datetime import datetime, timezone from typing import Optional class MusicDB: """Thread-safe database layer for Tunetti.""" def __init__(self, db_path: str): self._lock = threading.Lock() self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.row_factory = sqlite3.Row self.conn.execute("PRAGMA journal_mode=WAL") self.conn.execute("PRAGMA foreign_keys=ON") self._create_tables() # ------------------------------------------------------------------ # Schema # ------------------------------------------------------------------ def _create_tables(self) -> None: with self._lock: self.conn.executescript(""" CREATE TABLE IF NOT EXISTS songs ( video_id TEXT PRIMARY KEY, title TEXT NOT NULL, artists TEXT NOT NULL, -- JSON array of {name, id} album TEXT, -- JSON {name, id} or NULL duration INTEGER NOT NULL DEFAULT 0, thumbnail TEXT, is_favourite INTEGER NOT NULL DEFAULT 0, play_count INTEGER NOT NULL DEFAULT 0, first_played TIMESTAMP, last_played TIMESTAMP ); CREATE TABLE IF NOT EXISTS play_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT NOT NULL REFERENCES songs(video_id), played_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_history_video ON play_history(video_id); CREATE INDEX IF NOT EXISTS idx_history_date ON play_history(played_at); CREATE INDEX IF NOT EXISTS idx_songs_fav ON songs(is_favourite); """) self.conn.commit() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _upsert_song(self, video_id: str, title: str, artists: str, album: Optional[str], duration: int, thumbnail: Optional[str]) -> None: now = datetime.now(timezone.utc).isoformat() self.conn.execute(""" INSERT INTO songs (video_id, title, artists, album, duration, thumbnail, first_played, last_played) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(video_id) DO UPDATE SET title = excluded.title, artists = excluded.artists, album = excluded.album, duration = excluded.duration, thumbnail = excluded.thumbnail, last_played = excluded.last_played """, (video_id, title, artists, album, duration, thumbnail, now, now)) self.conn.execute(""" UPDATE songs SET play_count = play_count + 1 WHERE video_id = ? """, (video_id,)) def _get_song_row(self, video_id: str) -> Optional[sqlite3.Row]: cur = self.conn.execute( "SELECT * FROM songs WHERE video_id = ?", (video_id,)) return cur.fetchone() # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def record_play(self, video_id: str, title: str, artists: list[dict], album: Optional[dict] = None, duration: int = 0, thumbnail: Optional[str] = None) -> None: """Record that a song was played (upsert + history row).""" import json artists_json = json.dumps(artists, ensure_ascii=False) album_json = json.dumps(album) if album else None with self._lock: self._upsert_song(video_id, title, artists_json, album_json, duration, thumbnail) self.conn.execute( "INSERT INTO play_history (video_id) VALUES (?)", (video_id,)) self.conn.commit() def toggle_favourite(self, video_id: str) -> bool: """Toggle the favourite flag for a song. Returns the new state.""" with self._lock: row = self._get_song_row(video_id) if row is None: return False new_val = 0 if row["is_favourite"] else 1 self.conn.execute( "UPDATE songs SET is_favourite = ? WHERE video_id = ?", (new_val, video_id)) self.conn.commit() return bool(new_val) def set_favourite(self, video_id: str, value: bool) -> bool: """Explicitly set favourite state. Returns True if song exists.""" with self._lock: row = self._get_song_row(video_id) if row is None: return False self.conn.execute( "UPDATE songs SET is_favourite = ? WHERE video_id = ?", (1 if value else 0, video_id)) self.conn.commit() return True def get_favourites(self) -> list[sqlite3.Row]: """Return all favourited songs ordered by last played desc.""" with self._lock: cur = self.conn.execute(""" SELECT * FROM songs WHERE is_favourite = 1 ORDER BY last_played DESC """) return cur.fetchall() def is_favourite(self, video_id: str) -> bool: """Check if a song is favourited.""" with self._lock: row = self._get_song_row(video_id) return bool(row and row["is_favourite"]) def get_history(self, limit: int = 50) -> list[sqlite3.Row]: """Return recent play history with song details. Each video_id appears at most once (keeps the most recent play). """ with self._lock: cur = self.conn.execute(""" SELECT h.played_at, s.video_id, s.title, s.artists, s.album, s.duration, s.thumbnail, s.is_favourite FROM play_history h JOIN songs s ON s.video_id = h.video_id WHERE h.id IN ( SELECT MAX(id) FROM play_history GROUP BY video_id ) ORDER BY h.id DESC LIMIT ? """, (limit,)) return cur.fetchall() def get_stats(self) -> dict: """Return aggregate listening statistics.""" with self._lock: cur = self.conn.execute(""" SELECT COUNT(DISTINCT video_id) AS unique_songs, SUM(play_count) AS total_plays FROM songs """) total = cur.fetchone() cur = self.conn.execute(""" SELECT COUNT(*) AS fav_count FROM songs WHERE is_favourite = 1 """) favs = cur.fetchone() cur = self.conn.execute(""" SELECT video_id, title, artists, thumbnail, play_count FROM songs ORDER BY play_count DESC LIMIT 10 """) top = cur.fetchall() cur = self.conn.execute(""" SELECT s.title, s.artists, h.played_at FROM play_history h JOIN songs s ON s.video_id = h.video_id ORDER BY h.played_at DESC LIMIT 5 """) recent = cur.fetchall() cur = self.conn.execute(""" SELECT date(played_at) AS day, COUNT(*) AS plays FROM play_history GROUP BY day ORDER BY day DESC LIMIT 30 """) by_day = cur.fetchall() return { "unique_songs": total["unique_songs"], "total_plays": total["total_plays"], "favourites": favs["fav_count"], "top_played": [ { "video_id": r["video_id"], "title": r["title"], "artists": r["artists"], "thumbnail": r["thumbnail"] or "", "play_count": r["play_count"] } for r in top ], "recent": [ { "title": r["title"], "artists": r["artists"], "played_at": r["played_at"] } for r in recent ], "plays_by_day": [ {"day": r["day"], "plays": r["plays"]} for r in by_day ], } def close(self) -> None: self.conn.close()