from datetime import datetime, timezone from pathlib import Path from fastapi import APIRouter, Depends, Request from fastapi.responses import HTMLResponse from jinja2 import Environment, FileSystemLoader from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from haunt_fm.api.status import status as get_status_data from haunt_fm.db import get_session from haunt_fm.models.track import ( FeedbackEvent, ListenEvent, Playlist, Profile, SpeakerProfileMapping, TasteProfile, Track, ) router = APIRouter() _template_dir = Path(__file__).parent.parent / "templates" _jinja_env = Environment(loader=FileSystemLoader(str(_template_dir)), autoescape=True) def _timeago(dt: datetime | str | None) -> str: """Return a human-readable relative time string like '2 min ago'.""" if dt is None: return "never" if isinstance(dt, str): try: dt = datetime.fromisoformat(dt) except ValueError: return dt now = datetime.now(timezone.utc) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) diff = now - dt seconds = int(diff.total_seconds()) if seconds < 60: return "just now" minutes = seconds // 60 if minutes < 60: return f"{minutes} min ago" hours = minutes // 60 if hours < 24: return f"{hours} hr ago" days = hours // 24 if days < 30: return f"{days}d ago" return dt.strftime("%b %d") _jinja_env.filters["timeago"] = _timeago @router.get("/", response_class=HTMLResponse) async def status_page(request: Request, session: AsyncSession = Depends(get_session)): data = await get_status_data(session) # Recent listens (last 10) with track info recent_rows = ( await session.execute( select(ListenEvent, Track) .join(Track, ListenEvent.track_id == Track.id) .order_by(ListenEvent.listened_at.desc()) .limit(10) ) ).all() recent_listens = [ { "title": track.title, "artist": track.artist, "speaker": event.speaker_name or "Unknown", "listened_at": event.listened_at, } for event, track in recent_rows ] # Profiles with event/track counts and last listen profile_rows = ( await session.execute( select( Profile, func.count(ListenEvent.id).label("event_count"), func.count(func.distinct(ListenEvent.track_id)).label("track_count"), func.max(ListenEvent.listened_at).label("last_listen"), ) .outerjoin(ListenEvent, ListenEvent.profile_id == Profile.id) .group_by(Profile.id) .order_by(Profile.created_at) ) ).all() # Speaker mappings keyed by profile_id mapping_rows = (await session.execute(select(SpeakerProfileMapping))).scalars().all() speakers_by_profile: dict[int, list[str]] = {} for m in mapping_rows: speakers_by_profile.setdefault(m.profile_id, []).append(m.speaker_name) profiles = [ { "id": profile.id, "name": profile.display_name or profile.name, "event_count": event_count, "track_count": track_count, "last_listen": last_listen, "speakers": speakers_by_profile.get(profile.id, []), } for profile, event_count, track_count, last_listen in profile_rows ] # Taste profiles keyed by profile_id taste_rows = (await session.execute(select(TasteProfile))).scalars().all() taste_by_profile_id: dict[int | None, dict] = {} for tp in taste_rows: taste_by_profile_id[tp.profile_id] = { "track_count": tp.track_count, "updated_at": tp.updated_at, } # Feedback summary stats feedback_total = ( await session.execute(select(func.count(FeedbackEvent.id))) ).scalar() or 0 feedback_by_signal: dict[str, int] = {} if feedback_total > 0: signal_rows = ( await session.execute( select(FeedbackEvent.signal, func.count(FeedbackEvent.id)) .group_by(FeedbackEvent.signal) ) ).all() feedback_by_signal = {signal: count for signal, count in signal_rows} feedback_distinct_tracks = 0 if feedback_total > 0: feedback_distinct_tracks = ( await session.execute( select(func.count(func.distinct(FeedbackEvent.track_id))) ) ).scalar() or 0 feedback_summary = { "total": feedback_total, "up": feedback_by_signal.get("up", 0), "down": feedback_by_signal.get("down", 0), "skip": feedback_by_signal.get("skip", 0), "tracks": feedback_distinct_tracks, } # Recent feedback events (last 15) recent_feedback: list[dict] = [] if feedback_total > 0: feedback_rows = ( await session.execute( select(FeedbackEvent, Track) .join(Track, FeedbackEvent.track_id == Track.id) .order_by(FeedbackEvent.created_at.desc()) .limit(15) ) ).all() recent_feedback = [ { "signal": event.signal, "signal_weight": event.signal_weight, "title": track.title, "artist": track.artist, "vibe_text": event.vibe_text or "no vibe", "created_at": event.created_at, } for event, track in feedback_rows ] # Vibe influence data — top 10 tracks by feedback count vibe_influence: list[dict] = [] if feedback_total > 0: top_track_ids_result = ( await session.execute( select(FeedbackEvent.track_id, func.count(FeedbackEvent.id).label("cnt")) .group_by(FeedbackEvent.track_id) .order_by(func.count(FeedbackEvent.id).desc()) .limit(10) ) ).all() top_track_ids = [row[0] for row in top_track_ids_result] if top_track_ids: influence_rows = ( await session.execute( select(FeedbackEvent, Track) .join(Track, FeedbackEvent.track_id == Track.id) .where(FeedbackEvent.track_id.in_(top_track_ids)) .order_by(FeedbackEvent.created_at.desc()) ) ).all() tracks_map: dict[int, dict] = {} for event, track in influence_rows: if track.id not in tracks_map: tracks_map[track.id] = { "title": track.title, "artist": track.artist, "vibes": [], } tracks_map[track.id]["vibes"].append({ "vibe_text": event.vibe_text or "no vibe", "signal": event.signal, "created_at": event.created_at, }) # Preserve the top-by-count ordering for tid in top_track_ids: if tid in tracks_map: vibe_influence.append(tracks_map[tid]) # Recent playlists (last 5) playlist_rows = ( await session.execute( select(Playlist).order_by(Playlist.created_at.desc()).limit(5) ) ).scalars().all() recent_playlists = [ { "name": p.name or f"Playlist #{p.id}", "tracks": p.total_tracks, "known_pct": p.known_pct, "created_at": p.created_at, } for p in playlist_rows ] template = _jinja_env.get_template("status.html") html = template.render( data=data, recent_listens=recent_listens, profiles=profiles, taste_profiles=taste_by_profile_id, recent_playlists=recent_playlists, feedback_summary=feedback_summary, recent_feedback=recent_feedback, vibe_influence=vibe_influence, now=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"), ) return HTMLResponse(html)