from datetime import datetime, timezone from pathlib import Path from fastapi import APIRouter, Depends, Query, Request from fastapi.responses import HTMLResponse from jinja2 import Environment, FileSystemLoader from sqlalchemy import func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from haunt_fm.api.status import status as get_status_data from haunt_fm.db import get_session from haunt_fm.models.track import ( FeedbackEvent, ListenEvent, Playlist, Profile, SpeakerProfileMapping, TasteProfile, Track, ) router = APIRouter() _template_dir = Path(__file__).parent.parent / "templates" _jinja_env = Environment(loader=FileSystemLoader(str(_template_dir)), autoescape=True) def _timeago(dt: datetime | str | None) -> str: """Return a human-readable relative time string like '2 min ago'.""" if dt is None: return "never" if isinstance(dt, str): try: dt = datetime.fromisoformat(dt) except ValueError: return dt now = datetime.now(timezone.utc) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) diff = now - dt seconds = int(diff.total_seconds()) if seconds < 60: return "just now" minutes = seconds // 60 if minutes < 60: return f"{minutes} min ago" hours = minutes // 60 if hours < 24: return f"{hours} hr ago" days = hours // 24 if days < 30: return f"{days}d ago" return dt.strftime("%b %d") _jinja_env.filters["timeago"] = _timeago @router.get("/", response_class=HTMLResponse) async def status_page( request: Request, profile: str | None = Query(default=None), session: AsyncSession = Depends(get_session), ): data = await get_status_data(session) # Resolve selected profile selected_profile: Profile | None = None if profile: result = await session.execute(select(Profile).where(Profile.name == profile)) selected_profile = result.scalar_one_or_none() # Recent listens (last 10) with track info listens_query = ( select(ListenEvent, Track) .join(Track, ListenEvent.track_id == Track.id) .order_by(ListenEvent.listened_at.desc()) .limit(10) ) if selected_profile: if selected_profile.name == "default": listens_query = listens_query.where( or_(ListenEvent.profile_id == selected_profile.id, ListenEvent.profile_id.is_(None)) ) else: listens_query = listens_query.where(ListenEvent.profile_id == selected_profile.id) recent_rows = (await session.execute(listens_query)).all() recent_listens = [ { "track_id": track.id, "title": track.title, "artist": track.artist, "speaker": event.speaker_name or "Unknown", "listened_at": event.listened_at, } for event, track in recent_rows ] # Profiles with event/track counts and last listen profile_rows = ( await session.execute( select( Profile, func.count(ListenEvent.id).label("event_count"), func.count(func.distinct(ListenEvent.track_id)).label("track_count"), func.max(ListenEvent.listened_at).label("last_listen"), ) .outerjoin(ListenEvent, ListenEvent.profile_id == Profile.id) .group_by(Profile.id) .order_by(Profile.created_at) ) ).all() # Speaker mappings keyed by profile_id mapping_rows = (await session.execute(select(SpeakerProfileMapping))).scalars().all() speakers_by_profile: dict[int, list[str]] = {} for m in mapping_rows: speakers_by_profile.setdefault(m.profile_id, []).append(m.speaker_name) profiles = [ { "id": profile.id, "raw_name": profile.name, "name": profile.display_name or profile.name, "event_count": event_count, "track_count": track_count, "last_listen": last_listen, "speakers": speakers_by_profile.get(profile.id, []), } for profile, event_count, track_count, last_listen in profile_rows ] # Taste profiles keyed by profile_id taste_rows = (await session.execute(select(TasteProfile))).scalars().all() taste_by_profile_id: dict[int | None, dict] = {} for tp in taste_rows: taste_by_profile_id[tp.profile_id] = { "track_count": tp.track_count, "updated_at": tp.updated_at, } # Feedback profile filter def _feedback_profile_filter(): if not selected_profile: return None if selected_profile.name == "default": return or_(FeedbackEvent.profile_name.is_(None), FeedbackEvent.profile_name == "default") return FeedbackEvent.profile_name == selected_profile.name fb_filter = _feedback_profile_filter() # Feedback summary stats fb_count_query = select(func.count(FeedbackEvent.id)) if fb_filter is not None: fb_count_query = fb_count_query.where(fb_filter) feedback_total = (await session.execute(fb_count_query)).scalar() or 0 feedback_by_signal: dict[str, int] = {} if feedback_total > 0: signal_query = ( select(FeedbackEvent.signal, func.count(FeedbackEvent.id)) .group_by(FeedbackEvent.signal) ) if fb_filter is not None: signal_query = signal_query.where(fb_filter) signal_rows = (await session.execute(signal_query)).all() feedback_by_signal = {signal: count for signal, count in signal_rows} feedback_distinct_tracks = 0 if feedback_total > 0: distinct_query = select(func.count(func.distinct(FeedbackEvent.track_id))) if fb_filter is not None: distinct_query = distinct_query.where(fb_filter) feedback_distinct_tracks = (await session.execute(distinct_query)).scalar() or 0 feedback_summary = { "total": feedback_total, "up": feedback_by_signal.get("up", 0), "down": feedback_by_signal.get("down", 0), "skip": feedback_by_signal.get("skip", 0), "tracks": feedback_distinct_tracks, } # Recent feedback events (last 15) recent_feedback: list[dict] = [] if feedback_total > 0: fb_recent_query = ( select(FeedbackEvent, Track) .join(Track, FeedbackEvent.track_id == Track.id) .order_by(FeedbackEvent.created_at.desc()) .limit(15) ) if fb_filter is not None: fb_recent_query = fb_recent_query.where(fb_filter) feedback_rows = (await session.execute(fb_recent_query)).all() recent_feedback = [ { "id": event.id, "signal": event.signal, "signal_weight": event.signal_weight, "title": track.title, "artist": track.artist, "profile_name": event.profile_name, "vibe_text": event.vibe_text or "no vibe", "created_at": event.created_at, } for event, track in feedback_rows ] # Vibe influence data — top 10 tracks by feedback count vibe_influence: list[dict] = [] if feedback_total > 0: top_tracks_query = ( select(FeedbackEvent.track_id, func.count(FeedbackEvent.id).label("cnt")) .group_by(FeedbackEvent.track_id) .order_by(func.count(FeedbackEvent.id).desc()) .limit(10) ) if fb_filter is not None: top_tracks_query = top_tracks_query.where(fb_filter) top_track_ids_result = (await session.execute(top_tracks_query)).all() top_track_ids = [row[0] for row in top_track_ids_result] if top_track_ids: influence_query = ( select(FeedbackEvent, Track) .join(Track, FeedbackEvent.track_id == Track.id) .where(FeedbackEvent.track_id.in_(top_track_ids)) .order_by(FeedbackEvent.created_at.desc()) ) if fb_filter is not None: influence_query = influence_query.where(fb_filter) influence_rows = (await session.execute(influence_query)).all() tracks_map: dict[int, dict] = {} for event, track in influence_rows: if track.id not in tracks_map: tracks_map[track.id] = { "track_id": track.id, "title": track.title, "artist": track.artist, "vibes": [], } tracks_map[track.id]["vibes"].append({ "vibe_text": event.vibe_text or "no vibe", "signal": event.signal, "created_at": event.created_at, }) # Preserve the top-by-count ordering for tid in top_track_ids: if tid in tracks_map: vibe_influence.append(tracks_map[tid]) # Recent playlists (last 5) playlist_rows = ( await session.execute( select(Playlist).order_by(Playlist.created_at.desc()).limit(5) ) ).scalars().all() recent_playlists = [ { "name": p.name or f"Playlist #{p.id}", "tracks": p.total_tracks, "known_pct": p.known_pct, "vibe": p.vibe, "created_at": p.created_at, } for p in playlist_rows ] # Profile names for selector (use raw Profile.name, not display_name) all_profile_names = [profile.name for profile, *_ in profile_rows] # Speaker entities for dropdowns speaker_entities = [ ("Living Room", "media_player.living_room_speaker_2"), ("Dining Room", "media_player.dining_room_speaker_2"), ("Basement", "media_player.basement_mini_2"), ("Kitchen", "media_player.kitchen_stereo_2"), ("Study", "media_player.study_speaker_2"), ("Butler's Pantry", "media_player.butlers_pantry_speaker_2"), ("Master Bathroom", "media_player.master_bathroom_speaker_2"), ("Kids Room", "media_player.kids_room_speaker_2"), ("Guest Bedroom", "media_player.guest_bedroom_speaker_2_2"), ("Garage", "media_player.garage_wifi_2"), ("Whole House", "media_player.whole_house_2"), ("Downstairs", "media_player.downstairs_2"), ("Upstairs", "media_player.upstairs_2"), ] template = _jinja_env.get_template("status.html") html = template.render( data=data, recent_listens=recent_listens, profiles=profiles, taste_profiles=taste_by_profile_id, recent_playlists=recent_playlists, feedback_summary=feedback_summary, recent_feedback=recent_feedback, vibe_influence=vibe_influence, selected_profile=selected_profile.name if selected_profile else None, all_profile_names=all_profile_names, speaker_entities=speaker_entities, now=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"), ) return HTMLResponse(html)