Files
Thomas Hallock 95de2470b8 Implement provider plugin architecture (#9) with local directory provider (#10)
Plugin framework for photo source backends:
- PhotoProvider ABC with lifecycle hooks, auth flow, cache control
- @register_provider decorator + registry for auto-discovery
- ProviderManager handles instance lifecycle, config persistence,
  aggregated photo pool with weighted random selection
- ProviderCache: in-memory list cache (per-provider TTL), disk-based
  thumbnail cache, optional full image cache for remote providers
- Per-image settings migrated from bare filenames to composite keys
  (provider_id:photo_id) with automatic one-time migration + backup

Local directory provider included as reference implementation — wraps
the existing filesystem logic into the provider interface with upload
and delete support.

All existing endpoints preserved with composite key routing. ESP32
firmware unchanged — still hits GET /photo, gets a JPEG.

New API: /api/providers/* for managing provider instances, auth flows,
and cache control.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 23:40:15 -05:00

506 lines
18 KiB
Python

"""
Photo source provider plugin framework.
Provides the base class, registry, caching layer, and instance manager
for photo source plugins. Concrete providers (local, SFTP, Google Photos,
etc.) implement PhotoProvider and register via @register_provider.
Usage:
from providers import PhotoProvider, PhotoRef, register_provider, ProviderManager
"""
from __future__ import annotations
import abc
import enum
import hashlib
import io
import json
import random
import shutil
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from PIL import Image
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class PhotoRef:
"""Lightweight reference to a photo — no pixel data, just metadata."""
provider_id: str
photo_id: str
display_name: str
sort_key: str = ""
@property
def composite_key(self) -> str:
"""Unique key across all providers: '{provider_id}:{photo_id}'"""
return f"{self.provider_id}:{self.photo_id}"
class AuthType(enum.Enum):
NONE = "none"
CREDENTIALS = "credentials"
OAUTH2 = "oauth2"
CREDENTIALS_2FA = "credentials_2fa"
@dataclass
class ProviderHealth:
status: str = "ok" # ok | degraded | error | auth_required
message: str = ""
last_successful: str | None = None
# ---------------------------------------------------------------------------
# Base class
# ---------------------------------------------------------------------------
class PhotoProvider(abc.ABC):
"""Base class for all photo source plugins."""
# Subclasses must set these as class attributes
provider_type: str = ""
display_name: str = ""
auth_type: AuthType = AuthType.NONE
config_schema: dict = {}
def __init__(self, instance_id: str, config: dict):
self.instance_id = instance_id
self.config = config
# -- Lifecycle --
def startup(self) -> None:
"""Called once when the provider instance is created.
Use for connection setup, session restoration, etc."""
def shutdown(self) -> None:
"""Called on server shutdown or provider removal.
Close connections, persist sessions, etc."""
def health_check(self) -> ProviderHealth:
"""Return current connectivity/auth status."""
return ProviderHealth()
# -- Required abstract methods --
@abc.abstractmethod
def list_photos(self) -> list[PhotoRef]:
"""Return all available photos from this source."""
...
@abc.abstractmethod
def get_photo(self, photo_id: str) -> Image.Image:
"""Fetch and return the full-resolution image as an RGB PIL Image."""
...
# -- Optional capabilities --
def supports_upload(self) -> bool:
return False
def upload_photo(self, filename: str, data: bytes) -> PhotoRef:
raise NotImplementedError
def supports_delete(self) -> bool:
return False
def delete_photo(self, photo_id: str) -> None:
raise NotImplementedError
def get_photo_size(self, photo_id: str) -> int | None:
"""Return file size in bytes, or None if unknown."""
return None
# -- Cache control (override for provider-specific TTLs) --
def list_cache_ttl(self) -> int:
"""Seconds to cache the photo list. Default 300s (5 min)."""
return 300
def photo_cache_ttl(self) -> int | None:
"""Seconds to cache full images on disk. None = no caching.
Local providers return None (direct read). Remote providers
should return a positive value to avoid re-downloading."""
return None
# -- Auth hooks (override for providers that need auth) --
def auth_start(self, callback_url: str) -> dict:
"""Begin OAuth flow. Return {"redirect_url": "..."}."""
raise NotImplementedError
def auth_callback(self, params: dict) -> dict:
"""Handle OAuth callback. Return {"status": "ok"} or error."""
raise NotImplementedError
def auth_submit_code(self, code: str) -> dict:
"""Submit 2FA/verification code. Return {"status": "ok"} or error."""
raise NotImplementedError
def get_auth_state(self) -> dict:
"""Return current auth status for UI display."""
return {"authenticated": True}
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
_PROVIDER_REGISTRY: dict[str, type[PhotoProvider]] = {}
def register_provider(cls: type[PhotoProvider]) -> type[PhotoProvider]:
"""Class decorator: registers a PhotoProvider subclass by its provider_type."""
if not cls.provider_type:
raise ValueError(f"{cls.__name__} must set provider_type")
_PROVIDER_REGISTRY[cls.provider_type] = cls
return cls
def get_provider_class(provider_type: str) -> type[PhotoProvider]:
if provider_type not in _PROVIDER_REGISTRY:
raise ValueError(f"Unknown provider type: {provider_type!r}. "
f"Available: {list(_PROVIDER_REGISTRY.keys())}")
return _PROVIDER_REGISTRY[provider_type]
def available_provider_types() -> list[dict]:
"""Return metadata about all registered provider types (for the web UI)."""
return [
{
"type": cls.provider_type,
"display_name": cls.display_name,
"auth_type": cls.auth_type.value,
"config_schema": cls.config_schema,
}
for cls in _PROVIDER_REGISTRY.values()
]
# ---------------------------------------------------------------------------
# Caching layer
# ---------------------------------------------------------------------------
def _make_thumbnail(img: Image.Image, size: int = 300) -> bytes:
img = img.copy()
img.thumbnail((size, size), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=75)
return buf.getvalue()
def _safe_filename(s: str) -> str:
"""Convert an arbitrary string to a safe filename for cache keys."""
return hashlib.sha256(s.encode()).hexdigest()[:24]
class ProviderCache:
"""Caching layer for photo lists, thumbnails, and full images."""
def __init__(self, cache_dir: Path):
self._cache_dir = cache_dir
self._thumb_dir = cache_dir / "thumbs"
self._photo_dir = cache_dir / "photos"
# In-memory: instance_id -> (expiry_timestamp, photos)
self._list_cache: dict[str, tuple[float, list[PhotoRef]]] = {}
def get_photo_list(self, provider: PhotoProvider) -> list[PhotoRef]:
iid = provider.instance_id
now = time.time()
cached = self._list_cache.get(iid)
if cached and now < cached[0]:
return cached[1]
photos = provider.list_photos()
ttl = provider.list_cache_ttl()
self._list_cache[iid] = (now + ttl, photos)
return photos
def invalidate_list(self, instance_id: str) -> None:
self._list_cache.pop(instance_id, None)
def get_photo(self, provider: PhotoProvider, photo_id: str) -> Image.Image:
ttl = provider.photo_cache_ttl()
if ttl is not None:
cached = self._read_photo_cache(provider.instance_id, photo_id, ttl)
if cached is not None:
return cached
img = provider.get_photo(photo_id)
if ttl is not None:
self._write_photo_cache(provider.instance_id, photo_id, img)
return img
def get_thumbnail(self, provider: PhotoProvider, photo_id: str,
size: int = 300) -> bytes:
safe_id = _safe_filename(f"{provider.instance_id}:{photo_id}")
thumb_path = self._thumb_dir / f"{safe_id}_{size}.jpg"
if thumb_path.exists():
return thumb_path.read_bytes()
img = self.get_photo(provider, photo_id)
data = _make_thumbnail(img, size)
thumb_path.parent.mkdir(parents=True, exist_ok=True)
thumb_path.write_bytes(data)
return data
def clear(self, instance_id: str) -> None:
"""Clear all caches for a provider instance."""
self.invalidate_list(instance_id)
prefix = _safe_filename(f"{instance_id}:")[:12]
for d in (self._thumb_dir, self._photo_dir):
if d.exists():
for f in d.iterdir():
if f.name.startswith(prefix):
f.unlink(missing_ok=True)
def _read_photo_cache(self, instance_id: str, photo_id: str,
ttl: int) -> Image.Image | None:
safe_id = _safe_filename(f"{instance_id}:{photo_id}")
path = self._photo_dir / f"{safe_id}.jpg"
if not path.exists():
return None
age = time.time() - path.stat().st_mtime
if age > ttl:
path.unlink(missing_ok=True)
return None
return Image.open(path).convert("RGB")
def _write_photo_cache(self, instance_id: str, photo_id: str,
img: Image.Image) -> None:
safe_id = _safe_filename(f"{instance_id}:{photo_id}")
self._photo_dir.mkdir(parents=True, exist_ok=True)
path = self._photo_dir / f"{safe_id}.jpg"
img.save(str(path), format="JPEG", quality=90)
# ---------------------------------------------------------------------------
# Provider instance manager
# ---------------------------------------------------------------------------
def _load_json(path: Path, default_factory):
try:
if path.exists():
return json.loads(path.read_text())
except Exception:
pass
return default_factory()
def _save_json(path: Path, data):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
class ProviderManager:
"""Manages all provider instances, their configs, and the cache layer."""
PROVIDERS_FILE = "providers.json"
def __init__(self, data_dir: Path, default_photos_dir: str = "/photos"):
self._data_dir = data_dir
self._config_file = data_dir / self.PROVIDERS_FILE
self._default_photos_dir = default_photos_dir
self._instances: dict[str, PhotoProvider] = {}
self._configs: dict[str, dict] = {}
self.cache = ProviderCache(data_dir / "cache")
def load(self) -> None:
"""Load provider configs from disk and instantiate all enabled providers."""
raw = _load_json(self._config_file, dict)
if not raw:
# First run or empty config — auto-create local-default if the
# local provider type is registered
if "local" in _PROVIDER_REGISTRY:
raw = {
"local-default": {
"type": "local",
"enabled": True,
"config": {
"path": self._default_photos_dir,
"recursive": True,
},
}
}
_save_json(self._config_file, raw)
for instance_id, cfg in raw.items():
if not cfg.get("enabled", True):
self._configs[instance_id] = cfg
continue
provider_type = cfg.get("type", "")
if provider_type not in _PROVIDER_REGISTRY:
print(f"Warning: unknown provider type {provider_type!r} "
f"for instance {instance_id!r}, skipping")
self._configs[instance_id] = cfg
continue
try:
self._create_instance(instance_id, cfg)
except Exception as e:
print(f"Warning: failed to start provider {instance_id!r}: {e}")
self._configs[instance_id] = cfg
def _create_instance(self, instance_id: str, cfg: dict) -> PhotoProvider:
cls = get_provider_class(cfg["type"])
provider = cls(instance_id=instance_id, config=cfg.get("config", {}))
provider.startup()
self._instances[instance_id] = provider
self._configs[instance_id] = cfg
return provider
def _persist(self) -> None:
_save_json(self._config_file, self._configs)
def add_provider(self, instance_id: str, provider_type: str,
config: dict, enabled: bool = True) -> PhotoProvider:
if ":" in instance_id:
raise ValueError("Provider instance ID must not contain ':'")
if instance_id in self._configs:
raise ValueError(f"Provider {instance_id!r} already exists")
cfg = {"type": provider_type, "enabled": enabled, "config": config}
provider = self._create_instance(instance_id, cfg)
self._persist()
return provider
def remove_provider(self, instance_id: str) -> None:
provider = self._instances.pop(instance_id, None)
if provider:
provider.shutdown()
self._configs.pop(instance_id, None)
self.cache.clear(instance_id)
self._persist()
def update_provider_config(self, instance_id: str, config: dict) -> None:
if instance_id not in self._configs:
raise ValueError(f"Provider {instance_id!r} not found")
# Shutdown old instance
old = self._instances.pop(instance_id, None)
if old:
old.shutdown()
# Update config and restart
self._configs[instance_id]["config"] = config
cfg = self._configs[instance_id]
if cfg.get("enabled", True) and cfg["type"] in _PROVIDER_REGISTRY:
self._create_instance(instance_id, cfg)
self.cache.invalidate_list(instance_id)
self._persist()
def set_enabled(self, instance_id: str, enabled: bool) -> None:
if instance_id not in self._configs:
raise ValueError(f"Provider {instance_id!r} not found")
self._configs[instance_id]["enabled"] = enabled
if not enabled:
provider = self._instances.pop(instance_id, None)
if provider:
provider.shutdown()
elif instance_id not in self._instances:
cfg = self._configs[instance_id]
if cfg["type"] in _PROVIDER_REGISTRY:
self._create_instance(instance_id, cfg)
self._persist()
def get_instance(self, instance_id: str) -> PhotoProvider | None:
return self._instances.get(instance_id)
def all_instances(self) -> dict[str, PhotoProvider]:
return dict(self._instances)
def all_configs(self) -> dict[str, dict]:
return dict(self._configs)
def shutdown_all(self) -> None:
for provider in self._instances.values():
try:
provider.shutdown()
except Exception:
pass
self._instances.clear()
# -- Aggregated photo operations --
def get_all_photos(self) -> list[PhotoRef]:
all_photos: list[PhotoRef] = []
for provider in self._instances.values():
try:
photos = self.cache.get_photo_list(provider)
all_photos.extend(photos)
except Exception as e:
print(f"Warning: {provider.instance_id} list_photos failed: {e}")
return all_photos
def get_photo_image(self, ref: PhotoRef) -> Image.Image:
provider = self._instances.get(ref.provider_id)
if not provider:
raise ValueError(f"Provider {ref.provider_id!r} not found or not enabled")
return self.cache.get_photo(provider, ref.photo_id)
def get_photo_thumbnail(self, ref: PhotoRef, size: int = 300) -> bytes:
provider = self._instances.get(ref.provider_id)
if not provider:
raise ValueError(f"Provider {ref.provider_id!r} not found or not enabled")
return self.cache.get_thumbnail(provider, ref.photo_id, size)
def pick_random_photo(self) -> PhotoRef | None:
all_photos = self.get_all_photos()
if not all_photos:
return None
return random.choice(all_photos)
def find_photo_ref(self, composite_key: str) -> PhotoRef | None:
"""Look up a PhotoRef by its composite key."""
if ":" not in composite_key:
return None
provider_id, photo_id = composite_key.split(":", 1)
provider = self._instances.get(provider_id)
if not provider:
return None
for ref in self.cache.get_photo_list(provider):
if ref.photo_id == photo_id:
return ref
return None
def save_provider_auth_state(self, instance_id: str, auth_data: dict) -> None:
"""Persist auth tokens/sessions into the provider's config.
Called by providers after OAuth/2FA completion."""
if instance_id not in self._configs:
return
self._configs[instance_id].setdefault("config", {}).update(auth_data)
self._persist()
# ---------------------------------------------------------------------------
# Migration helper
# ---------------------------------------------------------------------------
def migrate_image_settings(data_dir: Path, default_provider_id: str = "local-default"):
"""Rewrite bare-filename keys in image_settings.json to composite keys.
Called once on first startup with the provider framework."""
settings_file = data_dir / "image_settings.json"
if not settings_file.exists():
return
raw = json.loads(settings_file.read_text())
if not raw:
return
# Check if already migrated (keys contain ':')
if any(":" in k for k in raw):
return
# Backup
backup = data_dir / "image_settings.json.bak"
shutil.copy2(settings_file, backup)
# Rewrite keys
migrated = {f"{default_provider_id}:{k}": v for k, v in raw.items()}
_save_json(settings_file, migrated)
print(f"Migrated {len(migrated)} image settings to composite keys "
f"(backup at {backup})")