diff --git a/.gitignore b/.gitignore index 79f712045d..4910c59646 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,4 @@ switch_product_ids.json # AI tools .claude +test.sh diff --git a/backend/config/config_manager.py b/backend/config/config_manager.py index 4655f1774f..0edc5d2f81 100644 --- a/backend/config/config_manager.py +++ b/backend/config/config_manager.py @@ -101,6 +101,13 @@ class NetplayICEServer(TypedDict): credential: NotRequired[str] +class StreamingContainer(TypedDict): + platform: str + host: str + broker_host: str + label: str + + class Config: CONFIG_FILE_MOUNTED: bool CONFIG_FILE_WRITABLE: bool @@ -132,6 +139,8 @@ class Config: SCAN_MEDIA: list[str] GAMELIST_MEDIA_THUMBNAIL: MetadataMediaType GAMELIST_MEDIA_IMAGE: MetadataMediaType + STREAMING_ENABLED: bool + STREAMING_CONTAINERS: list[StreamingContainer] def __init__(self, **entries): self.__dict__.update(entries) @@ -419,6 +428,10 @@ def _parse_config(self): PEGASUS_AUTO_EXPORT_ON_SCAN=pydash.get( self._raw_config, "scan.pegasus.export", False ), + STREAMING_ENABLED=pydash.get(self._raw_config, "streaming.enabled", False), + STREAMING_CONTAINERS=pydash.get( + self._raw_config, "streaming.containers", [] + ), ) def _get_ejs_controls(self) -> dict[str, EjsControls]: @@ -676,6 +689,14 @@ def _validate_config(self): ) sys.exit(3) + if not isinstance(self.config.STREAMING_ENABLED, bool): + log.critical("Invalid config.yml: streaming.enabled must be a boolean") + sys.exit(3) + + if not isinstance(self.config.STREAMING_CONTAINERS, list): + log.critical("Invalid config.yml: streaming.containers must be a list") + sys.exit(3) + def get_config(self) -> Config: try: with open(self.config_file, "r") as config_file: diff --git a/backend/endpoints/streaming.py b/backend/endpoints/streaming.py new file mode 100644 index 0000000000..347b752fda --- /dev/null +++ b/backend/endpoints/streaming.py @@ -0,0 +1,643 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import os +import urllib.error +import urllib.request +from datetime import datetime, timezone +from typing import Annotated, Any +from urllib.parse import urlparse, urlunparse + +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field + +from config.config_manager import config_manager as cm +from models.user import Role + +log = logging.getLogger("romm") + +router = APIRouter(prefix="/streaming", tags=["streaming"]) + +# NOTE: in-process storage — breaks under multi-worker deployments (gunicorn/uvicorn +# with workers > 1). Each worker has its own copy; sessions claimed in one are +# invisible to others and the 409 guard silently stops working. +_sessions: dict[str, dict[str, Any]] = {} +_session_locks: dict[str, asyncio.Lock] = {} + + +def _container_key(container: dict[str, Any]) -> str: + """Stable unique key for a container, derived the same way as the broker URL.""" + broker_host = container.get("broker_host", "").rstrip("/") + if broker_host: + return broker_host + # Derive from stream host the same way _broker_url does: replace port with 8000 + stream_host = container.get("host", "").rstrip("/") + try: + parsed = urlparse(stream_host) + return urlunparse(parsed._replace(netloc=f"{parsed.hostname}:8000")).rstrip("/") + except Exception: + return stream_host + + +class ClaimSessionRequest(BaseModel): + platform: str + rom_path: str + rom_name: str + + +class SaveAndExitRequest(BaseModel): + slot: Annotated[int, Field(ge=0, le=10)] = 0 + wait: bool = True + + +class VolumeRequest(BaseModel): + level: Annotated[int, Field(ge=0, le=100)] + + +class MuteRequest(BaseModel): + mute: bool | None = None # None = toggle, True/False = explicit set + + +class SaveStateRequest(BaseModel): + # Range 1–9 covers PCSX2 (slots 1–9) and Dolphin (slots 1–8). + # The broker enforces the per-emulator ceiling; the frontend further limits + # the slot selector via platformCapabilities(). + slot: Annotated[int, Field(ge=1, le=9)] = 1 + + +class LoadStateRequest(BaseModel): + # Range 1–10 covers PCSX2 (slots 1–9 + slot 10 autosave) and Dolphin (1–8). + # The broker enforces the per-emulator ceiling. + slot: Annotated[int, Field(ge=1, le=10)] = 1 + + +def _get_streaming_config() -> dict[str, Any]: + """Extract streaming config from the parsed Config object""" + try: + cfg = cm.get_config() + + enabled = getattr(cfg, "STREAMING_ENABLED", False) + containers = getattr(cfg, "STREAMING_CONTAINERS", []) + + return {"enabled": enabled, "containers": containers} + + except Exception as e: + log.error("streaming: Failed to extract config from cm: %s", e) + return {"enabled": False, "containers": []} + + +def _container_for_platform(platform: str) -> dict[str, Any] | None: + cfg = _get_streaming_config() + if not cfg.get("enabled", False): + return None + lower = platform.lower() + for entry in cfg.get("containers", []): + if entry.get("platform", "").lower() == lower: + return entry + return None + + +# broker communication + + +def _broker_url(container: dict[str, Any], path: str) -> str: + """ + Build the URL for the ROM broker API. + + The broker runs inside the emulator container on BROKER_PORT (default 8000). + `broker_host` in config.yml is the host:port of the broker endpoint — + separate from `host` which is the browser-facing stream URL. + + If broker_host is not set, we assume it is on the same host and swap the port. + Example: + host: http://192.168.1.51:3000 (Selkies web UI, browser-facing) + broker_host: http://192.168.1.51:8000 (broker API, server-to-server) + """ + broker_host = container.get("broker_host", "").rstrip("/") + + if not broker_host: + # Derive broker URL from stream host — replace port with 8000 + stream_host = container.get("host", "").rstrip("/") + # Parse out just the scheme + hostname, replace port + # e.g. http://192.168.1.51:3000 → http://192.168.1.51:8000 + try: + parsed = urlparse(stream_host) + broker_host = urlunparse(parsed._replace(netloc=f"{parsed.hostname}:8000")) + except Exception: + broker_host = stream_host + + return f"{broker_host}{path}" + + +def _call_broker(container: dict[str, Any], rom_path: str, rom_name: str) -> None: + """ + POST to the broker's /launch endpoint to tell the emulator container to + load a ROM. Uses only Python stdlib urllib — no extra dependencies. + + Raises HTTPException if the broker is unreachable or returns an error. + """ + url = _broker_url(container, "/launch") + secret = os.environ.get("BROKER_SECRET", container.get("broker_secret", "")) + + payload = json.dumps( + { + "rom_path": rom_path, + "rom_name": rom_name, + } + ).encode() + + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(payload)), + **({"X-Broker-Secret": secret} if secret else {}), + }, + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: # nosec B310 + body = json.loads(resp.read()) + log.info("streaming: broker launched ROM — %s", body) + except urllib.error.HTTPError as exc: + error_body = exc.read().decode(errors="replace") + log.error("streaming: broker HTTP error %d — %s", exc.code, error_body) + try: + detail = json.loads(error_body) + except Exception: + detail = error_body + raise HTTPException( + status_code=502, + detail=f"Broker returned {exc.code}: {detail}", + ) from exc + except (urllib.error.URLError, OSError) as exc: + log.error("streaming: broker unreachable at %s — %s", url, exc) + raise HTTPException( + status_code=503, + detail=( + f"Could not reach ROM broker at {url}. " + "Check that broker.py is running inside the emulator container " + "and that port 8000 is reachable from the RomM host." + ), + ) from exc + + +def _save_and_exit_broker( + container: dict[str, Any], slot: int = 0, wait: bool = True +) -> bool: + """ + POST /save-and-exit to the broker. Best-effort — logs but never raises. + With wait=True the call blocks until save+kill completes (use for button press). + With wait=False the broker fires save+kill in the background (use for navigation away). + Returns True if the broker reported a successful save. + """ + url = _broker_url(container, "/save-and-exit") + secret = os.environ.get("BROKER_SECRET", container.get("broker_secret", "")) + payload = json.dumps({"slot": slot, "wait": wait}).encode() + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(payload)), + **({"X-Broker-Secret": secret} if secret else {}), + }, + ) + timeout = 20 if wait else 5 + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: # nosec B310 + body = json.loads(resp.read()) + saved = bool(body.get("saved", False)) + log.info( + "streaming: broker save-and-exit — saved=%s slot=%d wait=%s", + saved, + slot, + wait, + ) + return saved + except Exception as exc: + log.warning("streaming: broker save-and-exit failed — %s", exc) + return False + + +def _volume_broker(container: dict[str, Any], level: int) -> bool: + """POST /volume to the broker. Best-effort — logs but never raises.""" + url = _broker_url(container, "/volume") + secret = os.environ.get("BROKER_SECRET", container.get("broker_secret", "")) + payload = json.dumps({"level": level}).encode() + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(payload)), + **({"X-Broker-Secret": secret} if secret else {}), + }, + ) + try: + with urllib.request.urlopen(req, timeout=5) as resp: # nosec B310 + body = json.loads(resp.read()) + log.debug("streaming: broker volume set to %d — %s", level, body) + return body.get("status") == "ok" + except Exception as exc: + log.warning("streaming: broker volume failed — %s", exc) + return False + + +def _mute_broker(container: dict[str, Any], mute: bool | None) -> bool | None: + """POST /mute to the broker. Returns confirmed mute state, or None on error.""" + url = _broker_url(container, "/mute") + secret = os.environ.get("BROKER_SECRET", container.get("broker_secret", "")) + body_dict: dict[str, Any] = {} if mute is None else {"mute": mute} + payload = json.dumps(body_dict).encode() + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(payload)), + **({"X-Broker-Secret": secret} if secret else {}), + }, + ) + try: + with urllib.request.urlopen(req, timeout=5) as resp: # nosec B310 + body = json.loads(resp.read()) + confirmed = body.get("mute") + log.debug("streaming: broker mute — %s", body) + return confirmed + except Exception as exc: + log.warning("streaming: broker mute failed — %s", exc) + return None + + +def _save_state_broker(container: dict[str, Any], slot: int) -> bool: + """POST /save-state to the broker. Returns True if the request was accepted.""" + url = _broker_url(container, "/save-state") + secret = os.environ.get("BROKER_SECRET", container.get("broker_secret", "")) + payload = json.dumps({"slot": slot}).encode() + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(payload)), + **({"X-Broker-Secret": secret} if secret else {}), + }, + ) + try: + with urllib.request.urlopen(req, timeout=5) as resp: # nosec B310 + body = json.loads(resp.read()) + log.debug("streaming: broker save-state slot=%d — %s", slot, body) + return body.get("status") == "saving" + except Exception as exc: + log.warning("streaming: broker save-state failed — %s", exc) + return False + + +def _load_state_broker(container: dict[str, Any], slot: int) -> bool: + """POST /load-state to the broker. Returns True if broker confirmed success.""" + url = _broker_url(container, "/load-state") + secret = os.environ.get("BROKER_SECRET", container.get("broker_secret", "")) + payload = json.dumps({"slot": slot}).encode() + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(payload)), + **({"X-Broker-Secret": secret} if secret else {}), + }, + ) + try: + with urllib.request.urlopen( + req, timeout=60 + ) as resp: # nosec B310 — worst-case: 9 slot cycles × ~5s xdotool timeout + body = json.loads(resp.read()) + loaded = bool(body.get("loaded", False)) + log.debug("streaming: broker load-state slot=%d loaded=%s", slot, loaded) + return loaded + except Exception as exc: + log.warning("streaming: broker load-state failed — %s", exc) + return False + + +def _stop_broker(container: dict[str, Any]) -> None: + """Tell the broker to stop emulator. Best-effort — don't raise on failure.""" + url = _broker_url(container, "/launch") + secret = os.environ.get("BROKER_SECRET", container.get("broker_secret", "")) + req = urllib.request.Request( + url, + method="DELETE", + headers={**({"X-Broker-Secret": secret} if secret else {})}, + ) + try: + with urllib.request.urlopen(req, timeout=5): # nosec B310 + pass + except Exception as exc: + log.warning("streaming: could not stop broker session — %s", exc) + + +# ── Routes ──────────────────────────────────────────────────────────────────── + + +@router.get("/config") +async def get_config() -> JSONResponse: + """Return streaming configuration to the frontend""" + cfg = _get_streaming_config() + + safe_containers = [] + for c in cfg.get("containers", []): + if not c.get("platform") or not c.get("host"): + log.warning("streaming: container missing platform/host — skipping: %s", c) + continue + + safe_containers.append( + { + "platform": c.get("platform"), + "host": c.get("host"), + "label": c.get("label") or c.get("platform", "").upper(), + } + ) + + return JSONResponse( + { + "enabled": cfg.get("enabled", False), + "containers": safe_containers, + } + ) + + +@router.post("/sessions") +async def claim_session(req: ClaimSessionRequest, request: Request) -> JSONResponse: + """ + Claim a streaming session and tell the broker to load the ROM. + returns 404 is not configured for that platform + returns 409 if the platform is already occupied + returns 503 if the broker is unreachable + """ + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + + container = _container_for_platform(req.platform) + + if container is None: + raise HTTPException( + status_code=404, + detail=f"No streaming container configured for platform '{req.platform}'", + ) + + session_key = _container_key(container) + lock = _session_locks.setdefault(session_key, asyncio.Lock()) + + async with lock: + existing = _sessions.get(session_key) + if existing: + raise HTTPException( + status_code=409, + detail={ + "message": "Session in use", + "rom_name": existing["rom_name"], + "claimed_at": existing["claimed_at"], + }, + ) + + # Tell the broker to load the ROM — raises HTTPException on failure. + # Wrapped in asyncio.to_thread because urllib is synchronous. + await asyncio.to_thread(_call_broker, container, req.rom_path, req.rom_name) + + now = datetime.now(timezone.utc).isoformat() + _sessions[session_key] = { + "rom_path": req.rom_path, + "rom_name": req.rom_name, + "claimed_at": now, + "user_id": request.user.id, + } + + log.info( + "streaming: session claimed — platform=%s rom=%s", + req.platform, + req.rom_name, + ) + + return JSONResponse( + { + "platform": req.platform, + "host": container["host"], + "label": container.get("label", req.platform.upper()), + "rom_name": req.rom_name, + "claimed_at": now, + } + ) + + +@router.post("/sessions/{platform}/save-and-exit") +async def save_and_exit_session( + platform: str, req: SaveAndExitRequest, request: Request +) -> JSONResponse: + """ + Save game state then release the session. + wait=true (default): blocks until broker confirms save+kill complete. + wait=false: broker fires save+kill in background, returns immediately. + """ + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + + container = _container_for_platform(platform) + if container is None: + raise HTTPException( + status_code=404, + detail=f"No streaming container configured for platform '{platform}'", + ) + session_key = _container_key(container) + session = _sessions.get(session_key) + if session is None: + raise HTTPException( + status_code=404, + detail=f"No active session for platform '{platform}'", + ) + + saved = await asyncio.to_thread( + _save_and_exit_broker, container, slot=req.slot, wait=req.wait + ) + + _sessions.pop(session_key, None) + _session_locks.pop(session_key, None) + log.info("streaming: save-and-exit — platform=%s saved=%s", platform, saved) + return JSONResponse({"status": "ok", "saved": saved, "platform": platform}) + + +@router.post("/sessions/{platform}/volume") +async def set_volume( + platform: str, req: VolumeRequest, request: Request +) -> JSONResponse: + """Set emulator audio volume (0–100). Best-effort — no 404 if broker unreachable.""" + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + + container = _container_for_platform(platform) + if container is None: + raise HTTPException( + status_code=404, + detail=f"No streaming container configured for platform '{platform}'", + ) + session_key = _container_key(container) + session = _sessions.get(session_key) + if session is None: + raise HTTPException( + status_code=404, detail=f"No active session for platform '{platform}'" + ) + + ok = await asyncio.to_thread(_volume_broker, container, req.level) + + return JSONResponse( + {"status": "ok" if ok else "error", "level": req.level, "platform": platform} + ) + + +@router.post("/sessions/{platform}/mute") +async def set_mute(platform: str, req: MuteRequest, request: Request) -> JSONResponse: + """Toggle or explicitly set mute state. Omit body to toggle.""" + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + + container = _container_for_platform(platform) + if container is None: + raise HTTPException( + status_code=404, + detail=f"No streaming container configured for platform '{platform}'", + ) + session_key = _container_key(container) + session = _sessions.get(session_key) + if session is None: + raise HTTPException( + status_code=404, detail=f"No active session for platform '{platform}'" + ) + + confirmed = await asyncio.to_thread(_mute_broker, container, req.mute) + + return JSONResponse({"status": "ok", "mute": confirmed, "platform": platform}) + + +@router.post("/sessions/{platform}/save-state") +async def save_state( + platform: str, req: SaveStateRequest, request: Request +) -> JSONResponse: + """Save game state to a slot (1–9) without stopping the emulator.""" + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + + container = _container_for_platform(platform) + if container is None: + raise HTTPException( + status_code=404, + detail=f"No streaming container configured for platform '{platform}'", + ) + session_key = _container_key(container) + session = _sessions.get(session_key) + if session is None: + raise HTTPException( + status_code=404, detail=f"No active session for platform '{platform}'" + ) + + ok = await asyncio.to_thread(_save_state_broker, container, req.slot) + + status_code = 200 if ok else 500 + return JSONResponse( + {"status": "saving" if ok else "error", "slot": req.slot, "platform": platform}, + status_code=status_code, + ) + + +@router.post("/sessions/{platform}/load-state") +async def load_state( + platform: str, req: LoadStateRequest, request: Request +) -> JSONResponse: + """Load game state from a slot (1–10). Slot 10 is the autosave.""" + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + + container = _container_for_platform(platform) + if container is None: + raise HTTPException( + status_code=404, + detail=f"No streaming container configured for platform '{platform}'", + ) + session_key = _container_key(container) + session = _sessions.get(session_key) + if session is None: + raise HTTPException( + status_code=404, detail=f"No active session for platform '{platform}'" + ) + + ok = await asyncio.to_thread(_load_state_broker, container, req.slot) + + return JSONResponse( + { + "status": "ok" if ok else "error", + "loaded": ok, + "slot": req.slot, + "platform": platform, + } + ) + + +@router.delete("/sessions/{platform}") +async def release_session(platform: str, request: Request) -> JSONResponse: + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + + # Release a session. Also tells the broker to stop the emulator. + container = _container_for_platform(platform) + # Fallback: streaming disabled or unconfigured — no session will be stored under this key + session_key = _container_key(container) if container else platform + released = _sessions.pop(session_key, None) + _session_locks.pop(session_key, None) + + if released is None: + return JSONResponse({"status": "not_found", "platform": platform}) + + # Best-effort stop, don't block user + if container: + await asyncio.to_thread(_stop_broker, container) + + log.info("streaming: session released — platform=%s", platform) + return JSONResponse({"status": "released", "platform": platform}) + + +@router.get("/sessions") +async def list_sessions(request: Request) -> JSONResponse: + """Debug — active sessions.""" + if not request.user.is_authenticated: + raise HTTPException(status_code=403, detail="Forbidden") + return JSONResponse( + { + session_key: { + "rom_name": s["rom_name"], + "claimed_at": s["claimed_at"], + } + for session_key, s in _sessions.items() + } + ) + + +@router.delete("/sessions") +async def force_release_all(request: Request) -> JSONResponse: + if not request.user.is_authenticated or request.user.role != Role.ADMIN: + raise HTTPException(status_code=403, detail="Forbidden") + + # Admin endpoint — force releases all active sessions + released = list(_sessions.keys()) + _sessions.clear() + _session_locks.clear() + log.info("streaming: all sessions force-released by admin — %s", released) + return JSONResponse({"status": "released", "platforms": released}) diff --git a/backend/main.py b/backend/main.py index 9430b51d24..4bb8ab4be5 100644 --- a/backend/main.py +++ b/backend/main.py @@ -44,6 +44,7 @@ from endpoints.search import router as search_router from endpoints.states import router as states_router from endpoints.stats import router as stats_router +from endpoints.streaming import router as streaming_router from endpoints.sync import router as sync_router from endpoints.tasks import router as tasks_router from endpoints.user import router as user_router @@ -104,6 +105,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: re.compile(r"^/api/client-tokens/pair/.+/status"), re.compile(r"^/ws"), re.compile(r"^/netplay"), + re.compile(r"^/api/streaming/config$"), ], ) @@ -146,6 +148,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: app.include_router(collections_router, prefix="/api") app.include_router(export_router, prefix="/api") app.include_router(netplay_router, prefix="/api") +app.include_router(streaming_router, prefix="/api") app.mount("/ws", socket_handler.socket_app) app.mount("/netplay", netplay_socket_handler.socket_app) diff --git a/backend/tests/endpoints/test_streaming.py b/backend/tests/endpoints/test_streaming.py new file mode 100644 index 0000000000..6ffa7146fc --- /dev/null +++ b/backend/tests/endpoints/test_streaming.py @@ -0,0 +1,181 @@ +import asyncio +import logging +from unittest.mock import MagicMock, patch + +import httpx +import pytest +from fastapi.testclient import TestClient +from main import app + + +@pytest.fixture +def client(): + with TestClient(app) as client: + yield client + + +def _mock_cm(enabled=True, containers=None): + """Return a mock config_manager that yields the given streaming config.""" + if containers is None: + containers = [] + cfg = MagicMock() + cfg.STREAMING_ENABLED = enabled + cfg.STREAMING_CONTAINERS = containers + return cfg + + +def test_get_config_warns_on_missing_platform(client, caplog): + # The "romm" logger has propagate=False, so caplog's handler must be + # added directly to it rather than relying on root-logger propagation. + bad_container = {"host": "http://192.168.1.10:3000"} # no "platform" + romm_logger = logging.getLogger("romm") + romm_logger.addHandler(caplog.handler) + try: + with patch( + "endpoints.streaming.cm.get_config", + return_value=_mock_cm(containers=[bad_container]), + ): + with caplog.at_level(logging.WARNING, logger="romm"): + response = client.get("/api/streaming/config") + finally: + romm_logger.removeHandler(caplog.handler) + assert response.status_code == 200 + assert response.json()["containers"] == [] + assert "missing platform/host" in caplog.text + + +@pytest.mark.skip(reason="requires db") +def test_claim_session_same_container_two_platforms_rejected(client, access_token): + """Dolphin serves ngc and wii from the same broker — second claim must be 409.""" + containers = [ + { + "platform": "ngc", + "host": "http://192.168.1.10:3000", + "broker_host": "http://192.168.1.10:8000", + }, + { + "platform": "wii", + "host": "http://192.168.1.10:3000", + "broker_host": "http://192.168.1.10:8000", + }, + ] + with patch( + "endpoints.streaming.cm.get_config", + return_value=_mock_cm(containers=containers), + ): + with patch("endpoints.streaming._call_broker"): + r1 = client.post( + "/api/streaming/sessions", + json={ + "platform": "ngc", + "rom_path": "/roms/game.iso", + "rom_name": "Game", + }, + headers={"Authorization": f"Bearer {access_token}"}, + ) + r2 = client.post( + "/api/streaming/sessions", + json={ + "platform": "wii", + "rom_path": "/roms/game2.iso", + "rom_name": "Game2", + }, + headers={"Authorization": f"Bearer {access_token}"}, + ) + assert r1.status_code == 200 + assert r2.status_code == 409 + + +@pytest.mark.skip(reason="requires db") +def test_release_uses_container_key_not_platform(client, access_token): + """release_session must find the session by broker_host, not by platform string.""" + container = { + "platform": "ngc", + "host": "http://192.168.1.10:3000", + "broker_host": "http://192.168.1.10:8000", + } + with patch( + "endpoints.streaming.cm.get_config", + return_value=_mock_cm(containers=[container]), + ): + with patch("endpoints.streaming._call_broker"): + client.post( + "/api/streaming/sessions", + json={"platform": "ngc", "rom_path": "/roms/g.iso", "rom_name": "G"}, + headers={"Authorization": f"Bearer {access_token}"}, + ) + r = client.delete( + "/api/streaming/sessions/ngc", + headers={"Authorization": f"Bearer {access_token}"}, + ) + assert r.status_code == 200 + assert r.json()["status"] == "released" + + +@pytest.mark.asyncio +@pytest.mark.skip(reason="requires db") +async def test_concurrent_claim_only_one_succeeds(access_token): + """Two concurrent claims on the same container must yield exactly one 200 and one 409.""" + container = { + "platform": "ps2", + "host": "http://192.168.1.20:3000", + "broker_host": "http://192.168.1.20:8000", + } + + with patch( + "endpoints.streaming.cm.get_config", + return_value=_mock_cm(containers=[container]), + ): + with patch("endpoints.streaming._call_broker"): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test" + ) as ac: + r1, r2 = await asyncio.gather( + ac.post( + "/api/streaming/sessions", + json={ + "platform": "ps2", + "rom_path": "/roms/a.iso", + "rom_name": "A", + }, + headers={"Authorization": f"Bearer {access_token}"}, + ), + ac.post( + "/api/streaming/sessions", + json={ + "platform": "ps2", + "rom_path": "/roms/b.iso", + "rom_name": "B", + }, + headers={"Authorization": f"Bearer {access_token}"}, + ), + ) + + assert sorted([r1.status_code, r2.status_code]) == [200, 409] + + +def test_claim_session_requires_auth(client): + """Unauthenticated POST /sessions must return 403.""" + response = client.post( + "/api/streaming/sessions", + json={"platform": "ps2", "rom_path": "/roms/game.iso", "rom_name": "Game"}, + ) + assert response.status_code == 403 + + +def test_release_session_requires_auth(client): + """Unauthenticated DELETE /sessions/{platform} must return 403.""" + response = client.delete("/api/streaming/sessions/ps2") + assert response.status_code == 403 + + +def test_force_release_all_requires_auth(client): + """Unauthenticated DELETE /sessions must return 403 (admin-only endpoint).""" + response = client.delete("/api/streaming/sessions") + assert response.status_code == 403 + + +def test_list_sessions_requires_auth(client): + """Unauthenticated GET /sessions must return 403.""" + response = client.get("/api/streaming/sessions") + assert response.status_code == 403 diff --git a/env.template b/env.template index 74f9bf52bf..814334aff5 100644 --- a/env.template +++ b/env.template @@ -138,3 +138,6 @@ POSTGRES_USER=authentik # Postgres user for the Authentik dev stack POSTGRES_PASSWORD=authentik # Postgres password for the Authentik dev stack AUTHENTIK_SECRET_KEY= # Authentik secret key AUTHENTIK_BOOTSTRAP_PASSWORD= # Initial Authentik admin bootstrap password + +# Emulator Broker Integration +BROKER_SECRET= diff --git a/examples/config.example.yml b/examples/config.example.yml index cc4b636da0..f5b135f001 100644 --- a/examples/config.example.yml +++ b/examples/config.example.yml @@ -174,3 +174,25 @@ # 1: # Player 2 # 2: # Player 3 # 3: # Player 4 + +# streaming: +# enabled: true +# containers: +# - platform: ps2 +# # Browser-facing Selkies web UI, MUST be serverd over https +# # (use a reverse proxy or the built in self signed cert for linuxserver) +# # can also be a FQDN eg. https://your.example.com +# host: https://192.168.1.51:3001 +# +# # server-to-broker URL does NOT need to be served over https +# # If pcsx2 emulator is on a different host, use its LAN IP. +# # If on the same Docker network, use the container name: http://pcsx2:8000 +# broker_host: http://192.168.1.51:8000 +# # what shows up on the play button +# label: PCSX2 +# +# # Add more emulator containers here as needed: +# # - platform: psx +# # host: http://192.168.1.51:3002 +# # broker_host: http://192.168.1.51:8001 +# # label: DuckStation diff --git a/frontend/src/components/common/Game/PlayBtn.vue b/frontend/src/components/common/Game/PlayBtn.vue index 368f5ec183..e311490145 100644 --- a/frontend/src/components/common/Game/PlayBtn.vue +++ b/frontend/src/components/common/Game/PlayBtn.vue @@ -11,6 +11,7 @@ import { ROUTES } from "@/plugins/router"; import storeConfig from "@/stores/config"; import storeHeartbeat from "@/stores/heartbeat"; import storeRoms, { type SimpleRom } from "@/stores/roms"; +import { useStreamingStore } from "@/stores/streaming"; import type { Events } from "@/types/emitter"; import { isEJSEmulationSupported, isRuffleEmulationSupported } from "@/utils"; @@ -23,6 +24,10 @@ const router = useRouter(); const { config } = storeToRefs(configStore); const { value: heartbeat } = storeToRefs(heartbeatStore); const emitter = inject>("emitter"); +const streamingStore = useStreamingStore(); +const streamingContainer = computed(() => + streamingStore.containerForPlatform(props.rom.platform_slug), +); const isAprilFools = computed(() => { const today = new Date(); @@ -98,4 +103,16 @@ async function goToPlayer(rom: SimpleRom) { mdi-play + + + mdi-cast + diff --git a/frontend/src/layouts/Main.vue b/frontend/src/layouts/Main.vue index 49d1f2f301..d719a2acdc 100644 --- a/frontend/src/layouts/Main.vue +++ b/frontend/src/layouts/Main.vue @@ -26,11 +26,13 @@ import UploadProgress from "@/components/common/Notifications/UploadProgress.vue import storeCollections from "@/stores/collections"; import storeNavigation from "@/stores/navigation"; import storePlatforms from "@/stores/platforms"; +import { useStreamingStore } from "@/stores/streaming"; import type { Events } from "@/types/emitter"; const navigationStore = storeNavigation(); const platformsStore = storePlatforms(); const collectionsStore = storeCollections(); +const streamingStore = useStreamingStore(); const emitter = inject>("emitter"); emitter?.on("refreshDrawer", async () => { @@ -63,6 +65,7 @@ onBeforeMount(async () => { if (showVirtualCollections) { collectionsStore.fetchVirtualCollections(virtualCollectionTypeRef.value); } + streamingStore.fetchConfig(); navigationStore.reset(); }); diff --git a/frontend/src/plugins/router.ts b/frontend/src/plugins/router.ts index fef66afd77..46433605be 100644 --- a/frontend/src/plugins/router.ts +++ b/frontend/src/plugins/router.ts @@ -27,6 +27,7 @@ export const ROUTES = { ROM: "rom", EMULATORJS: "emulatorjs", RUFFLE: "ruffle", + STREAM: "stream", SCAN: "scan", PATCHER: "patcher", USER_PROFILE: "user-profile", @@ -189,6 +190,11 @@ const routes = [ name: ROUTES.APRIL_FOOLS, component: () => import("@/views/Player/AprilFools.vue"), }, + { + path: "rom/:rom/stream", + name: ROUTES.STREAM, + component: () => import("@/views/Player/Stream/Player.vue"), + }, { path: "scan", name: ROUTES.SCAN, diff --git a/frontend/src/stores/streaming.ts b/frontend/src/stores/streaming.ts new file mode 100644 index 0000000000..869a413f0d --- /dev/null +++ b/frontend/src/stores/streaming.ts @@ -0,0 +1,264 @@ +import { defineStore } from "pinia"; +import { ref, computed } from "vue"; +import api from "@/services/api"; + +// ── Types ───────────────────────────────────────────────────────────────────── + +export interface StreamingContainer { + platform: string; // e.g. "ps2" + host: string; // browser-facing URL, e.g. "http://192.168.1.50:3000" + label: string; // e.g. "PCSX2" +} + +export interface StreamingConfig { + enabled: boolean; + containers: StreamingContainer[]; +} + +export interface ActiveSession { + platform: string; + host: string; + label: string; + rom_name: string; + claimed_at: string; +} + +// ── Store ───────────────────────────────────────────────────────────────────── + +export const useStreamingStore = defineStore("streaming", () => { + const config = ref({ enabled: false, containers: [] }); + const activeSession = ref(null); + const loading = ref(false); + const error = ref(null); + + const isEnabled = computed(() => config.value.enabled); + + // ── Actions ──────────────────────────────────────────────────────────────── + + /** + * Returns the streaming container for a given platform slug, or null if + * streaming is disabled or no container is configured for that platform. + * Case-insensitive so "PS2" and "ps2" both match. + */ + function containerForPlatform( + slug: string | null | undefined, + ): StreamingContainer | null { + if (!slug || !config.value.enabled) return null; + const lower = slug.toLowerCase(); + return ( + config.value.containers.find((c) => c.platform.toLowerCase() === lower) ?? + null + ); + } + + /** + * Returns per-platform save-state capabilities for the streaming player UI. + * + * maxSlots — number of user-accessible save slots (slot selector range) + * hasAutosave — whether a dedicated "load autosave" action is available + * + * Dolphin (ngc, wii, wiiu): slots 1–7 user-accessible; slot 8 reserved for auto-save. + * PCSX2 (ps2), xemu (xbox), and default: 9 slots + slot 10 autosave. + * Eden (switch): no save state support. + */ + function platformCapabilities(slug: string | null | undefined): { + maxSlots: number; + hasAutosave: boolean; + autosaveSlot: number; + } { + const lower = (slug ?? "").toLowerCase(); + if (lower === "ngc" || lower === "wii" || lower === "wiiu") { + return { maxSlots: 7, hasAutosave: true, autosaveSlot: 8 }; + } + if (lower === "switch") { + return { maxSlots: 0, hasAutosave: false, autosaveSlot: 0 }; + } + if (lower === "xbox") { + return { maxSlots: 9, hasAutosave: true, autosaveSlot: 10 }; + } + return { maxSlots: 9, hasAutosave: true, autosaveSlot: 10 }; + } + + /** + * Fetch streaming config from the backend once on app load. + * Non-fatal — if it fails, streaming stays disabled and no buttons appear. + */ + async function fetchConfig(): Promise { + loading.value = true; + error.value = null; + try { + const res = await fetch("/api/streaming/config", { + cache: "no-store", + headers: { "Cache-Control": "no-cache" }, + }); + + if (!res.ok) throw new Error(`HTTP ${res.status}`); + + const data: StreamingConfig = await res.json(); + config.value = { + enabled: data.enabled ?? false, + containers: data.containers ?? [], + }; + } catch (err) { + error.value = String(err); + console.warn("[streaming] Could not fetch config:", err); + } finally { + loading.value = false; + } + } + + /** + * Claim a streaming session for a platform + ROM. + * Returns the session data (including the container host URL) on success. + * Throws an error with a `status` property on failure: + * 409 session in use - error has who/what is playing + * 404 — platform not configured + * 503 — broker unreachable + */ + async function claimSession( + platform: string, + romPath: string, + romName: string, + ): Promise { + try { + const { data } = await api.post("/streaming/sessions", { + platform, + rom_path: romPath, + rom_name: romName, + }); + activeSession.value = data; + return data; + } catch (e: any) { + const detail = e.response?.data?.detail; + const err = Object.assign( + new Error(detail?.message ?? `HTTP ${e.response?.status}`), + { status: e.response?.status, detail }, + ); + throw err; + } + } + + /** + * Release the active session when the user leaves the player page. + * Best-effort — never throws. + */ + async function releaseSession(platform: string): Promise { + if (!platform) return; + activeSession.value = null; + try { + await api.delete(`/streaming/sessions/${platform}`); + } catch (err) { + console.warn("[streaming] Could not release session:", err); + } + } + + /** + * Save game state then release the session. + * wait=true (default): blocks until broker confirms save+kill — use for explicit button press. + * wait=false: broker fires save+kill in background, returns immediately — use for navigation away. + * Best-effort — never throws. + */ + async function saveAndExit( + platform: string, + slot = 0, + wait = true, + ): Promise { + if (!platform) return false; + activeSession.value = null; + try { + const { data } = await api.post( + `/streaming/sessions/${platform}/save-and-exit`, + { slot, wait }, + ); + return data.saved ?? false; + } catch (err) { + console.warn("[streaming] Could not save-and-exit:", err); + return false; + } + } + + /** + * Set emulator volume (0–100). Best-effort — never throws. + */ + async function setVolume(platform: string, level: number): Promise { + if (!platform) return; + try { + await api.post(`/streaming/sessions/${platform}/volume`, { + level: Math.round(level), + }); + } catch (err) { + console.warn("[streaming] Could not set volume:", err); + } + } + + /** + * Toggle or explicitly set mute. Pass true/false to set, omit for toggle. + * Best-effort — never throws. + */ + async function setMute(platform: string, mute?: boolean): Promise { + if (!platform) return; + try { + await api.post( + `/streaming/sessions/${platform}/mute`, + mute !== undefined ? { mute } : {}, + ); + } catch (err) { + console.warn("[streaming] Could not set mute:", err); + } + } + + /** + * Save game state to a slot (1–9) without stopping the emulator. + * The broker fires the save in the background and returns immediately. + * Best-effort — never throws. + */ + async function saveState(platform: string, slot = 1): Promise { + if (!platform) return false; + try { + const { data } = await api.post( + `/streaming/sessions/${platform}/save-state`, + { slot }, + ); + return data.status === "saving"; + } catch (err) { + console.warn("[streaming] Could not save state:", err); + return false; + } + } + + /** + * Load game state from a slot (1–10). Slot 10 is the autosave slot on xemu and rpcs3. + * Best-effort — never throws. + */ + async function loadState(platform: string, slot = 1): Promise { + if (!platform) return false; + try { + const { data } = await api.post( + `/streaming/sessions/${platform}/load-state`, + { slot }, + ); + return data.loaded ?? false; + } catch (err) { + console.warn("[streaming] Could not load state:", err); + return false; + } + } + + return { + config, + activeSession, + loading, + error, + isEnabled, + containerForPlatform, + platformCapabilities, + fetchConfig, + claimSession, + releaseSession, + saveAndExit, + setVolume, + setMute, + saveState, + loadState, + }; +}); diff --git a/frontend/src/views/Player/Stream/Player.vue b/frontend/src/views/Player/Stream/Player.vue new file mode 100644 index 0000000000..b17dfdec6b --- /dev/null +++ b/frontend/src/views/Player/Stream/Player.vue @@ -0,0 +1,861 @@ +