All checks were successful
SonarQube Code Quality Scan / SonarQube Scan (push) Successful in 3m36s
226 lines
8.7 KiB
Python
226 lines
8.7 KiB
Python
"""SQLite-backed database for song history, favourites, and play stats."""
|
|
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
|
|
class MusicDB:
|
|
"""Thread-safe database layer for Tunetti."""
|
|
|
|
def __init__(self, db_path: str):
|
|
self._lock = threading.Lock()
|
|
self.conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
self.conn.row_factory = sqlite3.Row
|
|
self.conn.execute("PRAGMA journal_mode=WAL")
|
|
self.conn.execute("PRAGMA foreign_keys=ON")
|
|
self._create_tables()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Schema
|
|
# ------------------------------------------------------------------
|
|
def _create_tables(self) -> None:
|
|
with self._lock:
|
|
self.conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS songs (
|
|
video_id TEXT PRIMARY KEY,
|
|
title TEXT NOT NULL,
|
|
artists TEXT NOT NULL, -- JSON array of {name, id}
|
|
album TEXT, -- JSON {name, id} or NULL
|
|
duration INTEGER NOT NULL DEFAULT 0,
|
|
thumbnail TEXT,
|
|
is_favourite INTEGER NOT NULL DEFAULT 0,
|
|
play_count INTEGER NOT NULL DEFAULT 0,
|
|
first_played TIMESTAMP,
|
|
last_played TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS play_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
video_id TEXT NOT NULL REFERENCES songs(video_id),
|
|
played_at TIMESTAMP NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_history_video
|
|
ON play_history(video_id);
|
|
CREATE INDEX IF NOT EXISTS idx_history_date
|
|
ON play_history(played_at);
|
|
CREATE INDEX IF NOT EXISTS idx_songs_fav
|
|
ON songs(is_favourite);
|
|
""")
|
|
self.conn.commit()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
def _upsert_song(self, video_id: str, title: str, artists: str,
|
|
album: Optional[str], duration: int,
|
|
thumbnail: Optional[str]) -> None:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
self.conn.execute("""
|
|
INSERT INTO songs (video_id, title, artists, album, duration,
|
|
thumbnail, first_played, last_played)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(video_id) DO UPDATE SET
|
|
title = excluded.title,
|
|
artists = excluded.artists,
|
|
album = excluded.album,
|
|
duration = excluded.duration,
|
|
thumbnail = excluded.thumbnail,
|
|
last_played = excluded.last_played
|
|
""", (video_id, title, artists, album, duration, thumbnail, now, now))
|
|
self.conn.execute("""
|
|
UPDATE songs SET play_count = play_count + 1 WHERE video_id = ?
|
|
""", (video_id,))
|
|
|
|
def _get_song_row(self, video_id: str) -> Optional[sqlite3.Row]:
|
|
cur = self.conn.execute(
|
|
"SELECT * FROM songs WHERE video_id = ?", (video_id,))
|
|
return cur.fetchone()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
def record_play(self, video_id: str, title: str,
|
|
artists: list[dict], album: Optional[dict] = None,
|
|
duration: int = 0,
|
|
thumbnail: Optional[str] = None) -> None:
|
|
"""Record that a song was played (upsert + history row)."""
|
|
import json
|
|
artists_json = json.dumps(artists, ensure_ascii=False)
|
|
album_json = json.dumps(album) if album else None
|
|
with self._lock:
|
|
self._upsert_song(video_id, title, artists_json,
|
|
album_json, duration, thumbnail)
|
|
self.conn.execute(
|
|
"INSERT INTO play_history (video_id) VALUES (?)", (video_id,))
|
|
self.conn.commit()
|
|
|
|
def toggle_favourite(self, video_id: str) -> bool:
|
|
"""Toggle the favourite flag for a song. Returns the new state."""
|
|
with self._lock:
|
|
row = self._get_song_row(video_id)
|
|
if row is None:
|
|
return False
|
|
new_val = 0 if row["is_favourite"] else 1
|
|
self.conn.execute(
|
|
"UPDATE songs SET is_favourite = ? WHERE video_id = ?",
|
|
(new_val, video_id))
|
|
self.conn.commit()
|
|
return bool(new_val)
|
|
|
|
def set_favourite(self, video_id: str, value: bool) -> bool:
|
|
"""Explicitly set favourite state. Returns True if song exists."""
|
|
with self._lock:
|
|
row = self._get_song_row(video_id)
|
|
if row is None:
|
|
return False
|
|
self.conn.execute(
|
|
"UPDATE songs SET is_favourite = ? WHERE video_id = ?",
|
|
(1 if value else 0, video_id))
|
|
self.conn.commit()
|
|
return True
|
|
|
|
def get_favourites(self) -> list[sqlite3.Row]:
|
|
"""Return all favourited songs ordered by last played desc."""
|
|
with self._lock:
|
|
cur = self.conn.execute("""
|
|
SELECT * FROM songs
|
|
WHERE is_favourite = 1
|
|
ORDER BY last_played DESC
|
|
""")
|
|
return cur.fetchall()
|
|
|
|
def is_favourite(self, video_id: str) -> bool:
|
|
"""Check if a song is favourited."""
|
|
with self._lock:
|
|
row = self._get_song_row(video_id)
|
|
return bool(row and row["is_favourite"])
|
|
|
|
def get_history(self, limit: int = 50) -> list[sqlite3.Row]:
|
|
"""Return recent play history with song details.
|
|
|
|
Each video_id appears at most once (keeps the most recent play).
|
|
"""
|
|
with self._lock:
|
|
cur = self.conn.execute("""
|
|
SELECT h.played_at,
|
|
s.video_id, s.title, s.artists, s.album,
|
|
s.duration, s.thumbnail, s.is_favourite
|
|
FROM play_history h
|
|
JOIN songs s ON s.video_id = h.video_id
|
|
WHERE h.id IN (
|
|
SELECT MAX(id) FROM play_history GROUP BY video_id
|
|
)
|
|
ORDER BY h.id DESC
|
|
LIMIT ?
|
|
""", (limit,))
|
|
return cur.fetchall()
|
|
|
|
def get_stats(self) -> dict:
|
|
"""Return aggregate listening statistics."""
|
|
with self._lock:
|
|
cur = self.conn.execute("""
|
|
SELECT COUNT(DISTINCT video_id) AS unique_songs,
|
|
SUM(play_count) AS total_plays
|
|
FROM songs
|
|
""")
|
|
total = cur.fetchone()
|
|
|
|
cur = self.conn.execute("""
|
|
SELECT COUNT(*) AS fav_count FROM songs WHERE is_favourite = 1
|
|
""")
|
|
favs = cur.fetchone()
|
|
|
|
cur = self.conn.execute("""
|
|
SELECT video_id, title, artists, thumbnail, play_count
|
|
FROM songs ORDER BY play_count DESC LIMIT 10
|
|
""")
|
|
top = cur.fetchall()
|
|
|
|
cur = self.conn.execute("""
|
|
SELECT s.title, s.artists, h.played_at
|
|
FROM play_history h
|
|
JOIN songs s ON s.video_id = h.video_id
|
|
ORDER BY h.played_at DESC LIMIT 5
|
|
""")
|
|
recent = cur.fetchall()
|
|
|
|
cur = self.conn.execute("""
|
|
SELECT date(played_at) AS day, COUNT(*) AS plays
|
|
FROM play_history
|
|
GROUP BY day ORDER BY day DESC LIMIT 30
|
|
""")
|
|
by_day = cur.fetchall()
|
|
|
|
return {
|
|
"unique_songs": total["unique_songs"],
|
|
"total_plays": total["total_plays"],
|
|
"favourites": favs["fav_count"],
|
|
"top_played": [
|
|
{
|
|
"video_id": r["video_id"],
|
|
"title": r["title"],
|
|
"artists": r["artists"],
|
|
"thumbnail": r["thumbnail"] or "",
|
|
"play_count": r["play_count"]
|
|
}
|
|
for r in top
|
|
],
|
|
"recent": [
|
|
{
|
|
"title": r["title"],
|
|
"artists": r["artists"],
|
|
"played_at": r["played_at"]
|
|
}
|
|
for r in recent
|
|
],
|
|
"plays_by_day": [
|
|
{"day": r["day"], "plays": r["plays"]} for r in by_day
|
|
],
|
|
}
|
|
|
|
def close(self) -> None:
|
|
self.conn.close()
|