Full music recommendation pipeline: listening history capture via webhook, Last.fm candidate discovery, iTunes preview download, CLAP audio embeddings (512-dim), pgvector cosine similarity recommendations, playlist generation with known/new track interleaving, and Music Assistant playback via HA. Includes: FastAPI app, SQLAlchemy models, Alembic migrations, Docker Compose with pgvector/pg17, status dashboard, and all API endpoints. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
71 lines
2.2 KiB
Python
71 lines
2.2 KiB
Python
#!/usr/bin/env python3
|
|
"""One-time backfill: pull recently played tracks from Music Assistant via HA REST API."""
|
|
import asyncio
|
|
import os
|
|
|
|
import httpx
|
|
|
|
HA_URL = os.environ.get("HAUNTFM_HA_URL", "http://192.168.86.51:8123")
|
|
HA_TOKEN = os.environ.get("HAUNTFM_HA_TOKEN", "")
|
|
HAUNTFM_URL = os.environ.get("HAUNTFM_URL", "http://localhost:8321")
|
|
|
|
|
|
async def get_recently_played() -> list[dict]:
|
|
"""Get recently played items from Music Assistant via HA."""
|
|
headers = {
|
|
"Authorization": f"Bearer {HA_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
# Get all media_player entities
|
|
resp = await client.get(f"{HA_URL}/api/states", headers=headers)
|
|
resp.raise_for_status()
|
|
states = resp.json()
|
|
|
|
# Filter for music assistant players that have media info
|
|
tracks = []
|
|
for state in states:
|
|
if not state["entity_id"].startswith("media_player."):
|
|
continue
|
|
attrs = state.get("attributes", {})
|
|
title = attrs.get("media_title")
|
|
artist = attrs.get("media_artist")
|
|
if title and artist:
|
|
tracks.append({
|
|
"title": title,
|
|
"artist": artist,
|
|
"album": attrs.get("media_album_name"),
|
|
"speaker_name": attrs.get("friendly_name"),
|
|
"source": "music_assistant_backfill",
|
|
})
|
|
|
|
return tracks
|
|
|
|
|
|
async def send_to_webhook(track: dict):
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(f"{HAUNTFM_URL}/api/history/webhook", json=track)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def main():
|
|
if not HA_TOKEN:
|
|
print("Set HAUNTFM_HA_TOKEN environment variable")
|
|
return
|
|
|
|
print(f"Fetching from {HA_URL}...")
|
|
tracks = await get_recently_played()
|
|
print(f"Found {len(tracks)} tracks with media info")
|
|
|
|
for track in tracks:
|
|
try:
|
|
result = await send_to_webhook(track)
|
|
print(f" OK: {track['artist']} - {track['title']} -> track_id={result['track_id']}")
|
|
except Exception as e:
|
|
print(f" FAIL: {track['artist']} - {track['title']}: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|