Files
haunt-fm/src/haunt_fm/api/status_page.py

149 lines
4.5 KiB
Python
Raw Normal View History

from datetime import datetime, timezone
from pathlib import Path
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from jinja2 import Environment, FileSystemLoader
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from haunt_fm.api.status import status as get_status_data
from haunt_fm.db import get_session
from haunt_fm.models.track import (
ListenEvent,
Playlist,
Profile,
SpeakerProfileMapping,
TasteProfile,
Track,
)
router = APIRouter()
_template_dir = Path(__file__).parent.parent / "templates"
_jinja_env = Environment(loader=FileSystemLoader(str(_template_dir)), autoescape=True)
def _timeago(dt: datetime | str | None) -> str:
"""Return a human-readable relative time string like '2 min ago'."""
if dt is None:
return "never"
if isinstance(dt, str):
try:
dt = datetime.fromisoformat(dt)
except ValueError:
return dt
now = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
diff = now - dt
seconds = int(diff.total_seconds())
if seconds < 60:
return "just now"
minutes = seconds // 60
if minutes < 60:
return f"{minutes} min ago"
hours = minutes // 60
if hours < 24:
return f"{hours} hr ago"
days = hours // 24
if days < 30:
return f"{days}d ago"
return dt.strftime("%b %d")
_jinja_env.filters["timeago"] = _timeago
@router.get("/", response_class=HTMLResponse)
async def status_page(request: Request, session: AsyncSession = Depends(get_session)):
data = await get_status_data(session)
# Recent listens (last 10) with track info
recent_rows = (
await session.execute(
select(ListenEvent, Track)
.join(Track, ListenEvent.track_id == Track.id)
.order_by(ListenEvent.listened_at.desc())
.limit(10)
)
).all()
recent_listens = [
{
"title": track.title,
"artist": track.artist,
"speaker": event.speaker_name or "Unknown",
"listened_at": event.listened_at,
}
for event, track in recent_rows
]
# Profiles with event/track counts and last listen
profile_rows = (
await session.execute(
select(
Profile,
func.count(ListenEvent.id).label("event_count"),
func.count(func.distinct(ListenEvent.track_id)).label("track_count"),
func.max(ListenEvent.listened_at).label("last_listen"),
)
.outerjoin(ListenEvent, ListenEvent.profile_id == Profile.id)
.group_by(Profile.id)
.order_by(Profile.created_at)
)
).all()
# Speaker mappings keyed by profile_id
mapping_rows = (await session.execute(select(SpeakerProfileMapping))).scalars().all()
speakers_by_profile: dict[int, list[str]] = {}
for m in mapping_rows:
speakers_by_profile.setdefault(m.profile_id, []).append(m.speaker_name)
profiles = [
{
"id": profile.id,
"name": profile.display_name or profile.name,
"event_count": event_count,
"track_count": track_count,
"last_listen": last_listen,
"speakers": speakers_by_profile.get(profile.id, []),
}
for profile, event_count, track_count, last_listen in profile_rows
]
# Taste profiles keyed by profile_id
taste_rows = (await session.execute(select(TasteProfile))).scalars().all()
taste_by_profile_id: dict[int | None, dict] = {}
for tp in taste_rows:
taste_by_profile_id[tp.profile_id] = {
"track_count": tp.track_count,
"updated_at": tp.updated_at,
}
# Recent playlists (last 5)
playlist_rows = (
await session.execute(
select(Playlist).order_by(Playlist.created_at.desc()).limit(5)
)
).scalars().all()
recent_playlists = [
{
"name": p.name or f"Playlist #{p.id}",
"tracks": p.total_tracks,
"known_pct": p.known_pct,
"created_at": p.created_at,
}
for p in playlist_rows
]
template = _jinja_env.get_template("status.html")
html = template.render(
data=data,
recent_listens=recent_listens,
profiles=profiles,
taste_profiles=taste_by_profile_id,
recent_playlists=recent_playlists,
now=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"),
)
return HTMLResponse(html)