""" E-Ink Photo Frame Server Serves photos for the ESP32 photo frame and provides a web UI for management. Tracks frame status via heartbeat reports from the ESP32. Endpoints: GET / Web UI — gallery, upload, frame status GET /photo Random JPEG resized to 800x480 (called by ESP32) POST /heartbeat ESP32 reports what it displayed GET /api/status Frame status (for Home Assistant REST sensor) GET /api/photos List all photos as JSON POST /api/upload Upload new photos DELETE /api/photos/ Delete a photo GET /health Health check """ import argparse import io import json import random import sys import time from datetime import datetime, timezone from pathlib import Path from flask import Flask, Response, jsonify, request, render_template_string from werkzeug.utils import secure_filename from PIL import Image app = Flask(__name__) PHOTOS_DIR: Path = Path(".") TARGET_WIDTH = 800 TARGET_HEIGHT = 480 SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".bmp", ".tiff"} MAX_UPLOAD_SIZE = 20 * 1024 * 1024 # 20MB # Frame state — persisted to disk so it survives restarts STATE_FILE = Path("/data/frame_state.json") def load_state() -> dict: try: if STATE_FILE.exists(): return json.loads(STATE_FILE.read_text()) except Exception: pass return {"last_update": None, "current_photo": None, "ip": None, "updates": 0} def save_state(state: dict): try: STATE_FILE.parent.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(json.dumps(state)) except Exception as e: print(f"Warning: could not save state: {e}", file=sys.stderr) frame_state = load_state() def get_photo_list() -> list[Path]: photos = [] for f in PHOTOS_DIR.rglob("*"): if f.suffix.lower() in SUPPORTED_EXTENSIONS and f.is_file(): photos.append(f) return sorted(photos, key=lambda p: p.name) def resize_and_crop(img: Image.Image) -> Image.Image: target_ratio = TARGET_WIDTH / TARGET_HEIGHT img_ratio = img.width / img.height if img_ratio > target_ratio: new_height = TARGET_HEIGHT new_width = int(img.width * (TARGET_HEIGHT / img.height)) else: new_width = TARGET_WIDTH new_height = int(img.height * (TARGET_WIDTH / img.width)) img = img.resize((new_width, new_height), Image.LANCZOS) left = (new_width - TARGET_WIDTH) // 2 top = (new_height - TARGET_HEIGHT) // 2 img = img.crop((left, top, left + TARGET_WIDTH, top + TARGET_HEIGHT)) return img def make_thumbnail(img: Image.Image, size: int = 300) -> bytes: img.thumbnail((size, size), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=75) return buf.getvalue() # --- ESP32 endpoints --- @app.route("/photo") def random_photo(): photos = get_photo_list() if not photos: return jsonify({"error": "No photos found"}), 404 chosen = random.choice(photos) try: img = Image.open(chosen) img = img.convert("RGB") img = resize_and_crop(img) buf = io.BytesIO() img.save(buf, format="JPEG", quality=85) buf.seek(0) return Response(buf.getvalue(), mimetype="image/jpeg", headers={"X-Photo-Name": chosen.name}) except Exception as e: print(f"Error processing {chosen}: {e}", file=sys.stderr) return jsonify({"error": str(e)}), 500 @app.route("/heartbeat", methods=["POST"]) def heartbeat(): """ESP32 reports after displaying a photo.""" global frame_state data = request.get_json(silent=True) or {} frame_state["last_update"] = datetime.now(timezone.utc).isoformat() frame_state["current_photo"] = data.get("photo", None) frame_state["ip"] = request.remote_addr frame_state["free_heap"] = data.get("free_heap", None) frame_state["updates"] = frame_state.get("updates", 0) + 1 save_state(frame_state) return jsonify({"ok": True}) # --- API endpoints --- @app.route("/api/status") def api_status(): """Frame status for Home Assistant REST sensor.""" photos = get_photo_list() return jsonify({ "state": "online" if frame_state.get("last_update") else "waiting", "last_update": frame_state.get("last_update"), "current_photo": frame_state.get("current_photo"), "frame_ip": frame_state.get("ip"), "total_photos": len(photos), "total_updates": frame_state.get("updates", 0), }) @app.route("/api/photos") def api_photos(): photos = get_photo_list() return jsonify([{"name": p.name, "size": p.stat().st_size} for p in photos]) @app.route("/api/upload", methods=["POST"]) def api_upload(): uploaded = [] for f in request.files.getlist("photos"): if not f.filename: continue fname = secure_filename(f.filename) ext = Path(fname).suffix.lower() if ext not in SUPPORTED_EXTENSIONS: continue dest = PHOTOS_DIR / fname f.save(str(dest)) uploaded.append(fname) return jsonify({"uploaded": uploaded, "count": len(uploaded)}) @app.route("/api/photos/", methods=["DELETE"]) def api_delete(name: str): fname = secure_filename(name) path = PHOTOS_DIR / fname if not path.exists(): return jsonify({"error": "not found"}), 404 path.unlink() return jsonify({"deleted": fname}) @app.route("/thumb/") def thumbnail(name: str): fname = secure_filename(name) path = PHOTOS_DIR / fname if not path.exists(): return "", 404 try: img = Image.open(path).convert("RGB") data = make_thumbnail(img) return Response(data, mimetype="image/jpeg") except Exception: return "", 500 # --- Web UI --- @app.route("/") def index(): photos = get_photo_list() return render_template_string(WEB_UI, photos=photos, state=frame_state, photo_count=len(photos)) @app.route("/health") def health(): photos = get_photo_list() return jsonify({"status": "ok", "photo_count": len(photos)}) WEB_UI = """ Photo Frame

Photo Frame

Frame Status {{ 'Online' if state.get('last_update') else 'Waiting for first update' }}
{% if state.get('last_update') %}
Last Update {{ state.last_update[:19] }}
Showing {{ state.get('current_photo', 'Unknown') }}
{% endif %}
Photos {{ photo_count }}
Total Refreshes {{ state.get('updates', 0) }}

Drop photos here or click to upload

{% if photos %} {% else %}
No photos yet. Upload some!
{% endif %}
""" if __name__ == "__main__": parser = argparse.ArgumentParser(description="E-Ink Photo Frame Server") parser.add_argument("--port", type=int, default=8473) parser.add_argument("--photos-dir", type=str, default="./photos") args = parser.parse_args() PHOTOS_DIR = Path(args.photos_dir) if not PHOTOS_DIR.exists(): PHOTOS_DIR.mkdir(parents=True, exist_ok=True) print(f"Serving photos from: {PHOTOS_DIR.resolve()}") print(f"Found {len(get_photo_list())} photos") print(f"Listening on port {args.port}") app.run(host="0.0.0.0", port=args.port)