diff --git a/backend/endpoints/activity.py b/backend/endpoints/activity.py new file mode 100644 index 0000000000..9f9619ed4d --- /dev/null +++ b/backend/endpoints/activity.py @@ -0,0 +1,169 @@ +from datetime import datetime, timezone + +import socketio # type: ignore +from fastapi import HTTPException, Request, status +from pydantic import BaseModel, Field + +from config import REDIS_URL +from decorators.auth import protected_route +from endpoints.responses.activity import ActivityClearSchema, ActivityEntrySchema +from handler.activity_handler import ActivityEntry, activity_handler +from handler.auth.constants import Scope +from handler.database import db_device_handler, db_rom_handler +from logger.logger import log +from utils.router import APIRouter + +router = APIRouter( + prefix="/activity", + tags=["activity"], +) + + +class DeviceHeartbeatPayload(BaseModel): + rom_id: int = Field(ge=1) + device_id: str = Field(min_length=1, max_length=255) + + +def _get_socket_manager() -> socketio.AsyncRedisManager: + """Create a write-only Redis manager for emitting from REST endpoints.""" + return socketio.AsyncRedisManager(REDIS_URL, write_only=True) + + +def _build_activity_entry( + *, + user_id: int, + username: str, + avatar_path: str, + rom_id: int, + rom_name: str, + rom_cover_path: str, + platform_slug: str, + platform_name: str, + device_id: str, + device_type: str, + started_at: str, +) -> ActivityEntry: + return ActivityEntry( + user_id=user_id, + username=username, + avatar_path=avatar_path, + rom_id=rom_id, + rom_name=rom_name, + rom_cover_path=rom_cover_path, + platform_slug=platform_slug, + platform_name=platform_name, + device_id=device_id, + device_type=device_type, + started_at=started_at, + ) + + +@protected_route(router.get, "", [Scope.ROMS_USER_READ]) +async def get_all_activity(request: Request) -> list[ActivityEntrySchema]: + """Return every currently active play session across all users.""" + entries = await activity_handler.get_all_active() + return [ActivityEntrySchema(**e) for e in entries] + + +@protected_route(router.get, "/rom/{rom_id}", [Scope.ROMS_USER_READ]) +async def get_rom_activity( + request: Request, rom_id: int +) -> list[ActivityEntrySchema]: + """Return all active play sessions for a specific ROM.""" + entries = await activity_handler.get_active_for_rom(rom_id) + return [ActivityEntrySchema(**e) for e in entries] + + +@protected_route(router.post, "/heartbeat", [Scope.ROMS_USER_WRITE]) +async def device_heartbeat( + request: Request, payload: DeviceHeartbeatPayload +) -> ActivityEntrySchema: + """Heartbeat endpoint for external devices (muOS, Android, etc.). + + Called periodically by devices while the user is playing a game. Writes + activity state to Redis and broadcasts an ``activity:update`` event over + the main Socket.IO namespace. + """ + rom = db_rom_handler.get_rom(payload.rom_id) + if rom is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"ROM {payload.rom_id} not found", + ) + + device = db_device_handler.get_device( + device_id=payload.device_id, user_id=request.user.id + ) + if device is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Device {payload.device_id} not found for this user", + ) + + # Preserve the started_at from the existing entry if we are refreshing. + existing = await activity_handler.get_active(request.user.id, device.id) + started_at = ( + existing["started_at"] + if existing + else datetime.now(timezone.utc).isoformat() + ) + + platform = rom.platform + entry = _build_activity_entry( + user_id=request.user.id, + username=request.user.username, + avatar_path=request.user.avatar_path or "", + rom_id=rom.id, + rom_name=rom.name or rom.fs_name, + rom_cover_path=rom.path_cover_s or "", + platform_slug=platform.slug if platform else "", + platform_name=(platform.custom_name or platform.name) if platform else "", + device_id=device.id, + device_type=device.client or "unknown", + started_at=started_at, + ) + + await activity_handler.set_active(entry) + + # Update the device last_seen as a side-effect (mirrors play session ingest). + db_device_handler.update_last_seen(device_id=device.id, user_id=request.user.id) + + # Broadcast to all connected sockets. + try: + sm = _get_socket_manager() + await sm.emit("activity:update", dict(entry)) + except Exception as e: # noqa: BLE001 + log.warning(f"Failed to broadcast activity:update for user {request.user.id}: {e}") + + return ActivityEntrySchema(**entry) + + +@protected_route( + router.delete, + "/heartbeat", + [Scope.ROMS_USER_WRITE], + status_code=status.HTTP_204_NO_CONTENT, +) +async def clear_device_activity( + request: Request, device_id: str +) -> None: + """Immediately clear an active session for a device (e.g. on graceful exit).""" + rom_id = await activity_handler.clear_active(request.user.id, device_id) + if rom_id is None: + return None + + try: + sm = _get_socket_manager() + await sm.emit( + "activity:clear", + ActivityClearSchema( + user_id=request.user.id, + device_id=device_id, + rom_id=rom_id, + ).model_dump(), + ) + except Exception as e: # noqa: BLE001 + log.warning( + f"Failed to broadcast activity:clear for user {request.user.id}: {e}" + ) + return None diff --git a/backend/endpoints/responses/activity.py b/backend/endpoints/responses/activity.py new file mode 100644 index 0000000000..6ee4a4a9db --- /dev/null +++ b/backend/endpoints/responses/activity.py @@ -0,0 +1,21 @@ +from .base import BaseModel + + +class ActivityEntrySchema(BaseModel): + user_id: int + username: str + avatar_path: str + rom_id: int + rom_name: str + rom_cover_path: str = "" + platform_slug: str + platform_name: str + device_id: str + device_type: str + started_at: str + + +class ActivityClearSchema(BaseModel): + user_id: int + device_id: str + rom_id: int diff --git a/backend/endpoints/sockets/activity.py b/backend/endpoints/sockets/activity.py new file mode 100644 index 0000000000..be1058b9de --- /dev/null +++ b/backend/endpoints/sockets/activity.py @@ -0,0 +1,206 @@ +"""Socket.IO events for real-time user game activity. + +Handles: +- activity:start - client reports starting a game (emits activity:update) +- activity:heartbeat - client refreshes TTL while playing (emits activity:update) +- activity:stop - client reports stopping (emits activity:clear) +- disconnect - safety net: clears any activity registered for the socket + +All events broadcast to every connected client on the main `/ws` namespace. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import TypedDict + +from endpoints.responses.activity import ActivityClearSchema +from handler.activity_handler import ActivityEntry, activity_handler +from handler.database import db_rom_handler, db_user_handler +from handler.socket_handler import socket_handler +from logger.logger import log + + +class ActivityEventPayload(TypedDict, total=False): + rom_id: int + user_id: int + device_id: str + + +def _empty_string(value: object) -> str: + if value is None: + return "" + return str(value) + + +async def _store_session(sid: str, user_id: int, device_id: str) -> None: + """Remember the user/device associated with a socket for disconnect cleanup.""" + try: + existing = await socket_handler.socket_server.get_session(sid) or {} + except KeyError: + existing = {} + existing["activity_user_id"] = user_id + existing["activity_device_id"] = device_id + await socket_handler.socket_server.save_session(sid, existing) + + +async def _build_entry( + *, user_id: int, device_id: str, rom_id: int, preserve_started_at: bool +) -> ActivityEntry | None: + """Look up DB info and assemble an ActivityEntry. Returns None if invalid.""" + user = db_user_handler.get_user(user_id) + if user is None: + log.debug(f"activity: unknown user_id {user_id}") + return None + + rom = db_rom_handler.get_rom(rom_id) + if rom is None: + log.debug(f"activity: unknown rom_id {rom_id}") + return None + + platform = rom.platform + started_at = datetime.now(timezone.utc).isoformat() + + if preserve_started_at: + existing = await activity_handler.get_active(user_id, device_id) + if existing: + started_at = existing["started_at"] + + # Infer device_type: web is the default for browser-emitted events. + device_type = "web" + if device_id != "web": + # The browser may pass its device_id (a UUID) - we still treat it as "web" + # because Socket.IO events are only emitted from browser clients. + device_type = "web" + + return ActivityEntry( + user_id=user.id, + username=user.username, + avatar_path=_empty_string(user.avatar_path), + rom_id=rom.id, + rom_name=rom.name or rom.fs_name, + rom_cover_path=_empty_string(rom.path_cover_s), + platform_slug=_empty_string(platform.slug) if platform else "", + platform_name=_empty_string( + (platform.custom_name or platform.name) if platform else "" + ), + device_id=device_id, + device_type=device_type, + started_at=started_at, + ) + + +def _extract_payload(data: object) -> tuple[int | None, str | None, int | None]: + """Return ``(user_id, device_id, rom_id)`` parsed from an event payload.""" + if not isinstance(data, dict): + return None, None, None + try: + user_id = int(data.get("user_id")) if data.get("user_id") is not None else None + except (TypeError, ValueError): + user_id = None + device_id = data.get("device_id") + if not isinstance(device_id, str) or not device_id: + device_id = None + try: + rom_id = int(data.get("rom_id")) if data.get("rom_id") is not None else None + except (TypeError, ValueError): + rom_id = None + return user_id, device_id, rom_id + + +@socket_handler.socket_server.on("activity:start") # type: ignore +async def activity_start(sid: str, data: ActivityEventPayload) -> None: + user_id, device_id, rom_id = _extract_payload(data) + if user_id is None or device_id is None or rom_id is None: + log.debug(f"activity:start ignored (invalid payload): {data}") + return + + entry = await _build_entry( + user_id=user_id, + device_id=device_id, + rom_id=rom_id, + preserve_started_at=False, + ) + if entry is None: + return + + await activity_handler.set_active(entry) + await _store_session(sid, user_id, device_id) + await socket_handler.socket_server.emit("activity:update", dict(entry)) + + +@socket_handler.socket_server.on("activity:heartbeat") # type: ignore +async def activity_heartbeat(sid: str, data: ActivityEventPayload) -> None: + user_id, device_id, rom_id = _extract_payload(data) + if user_id is None or device_id is None or rom_id is None: + return + + entry = await _build_entry( + user_id=user_id, + device_id=device_id, + rom_id=rom_id, + preserve_started_at=True, + ) + if entry is None: + return + + await activity_handler.set_active(entry) + await _store_session(sid, user_id, device_id) + await socket_handler.socket_server.emit("activity:update", dict(entry)) + + +@socket_handler.socket_server.on("activity:stop") # type: ignore +async def activity_stop(sid: str, data: ActivityEventPayload | None = None) -> None: + user_id: int | None = None + device_id: str | None = None + + if data: + user_id, device_id, _ = _extract_payload(data) + + # Fall back to the stored session if the payload is missing fields. + if user_id is None or device_id is None: + try: + session = await socket_handler.socket_server.get_session(sid) or {} + except KeyError: + session = {} + user_id = user_id if user_id is not None else session.get("activity_user_id") + device_id = device_id if device_id else session.get("activity_device_id") + + if user_id is None or not device_id: + return + + rom_id = await activity_handler.clear_active(int(user_id), device_id) + if rom_id is None: + return + + await socket_handler.socket_server.emit( + "activity:clear", + ActivityClearSchema( + user_id=int(user_id), device_id=device_id, rom_id=rom_id + ).model_dump(), + ) + + +@socket_handler.socket_server.on("disconnect") # type: ignore +async def activity_on_disconnect(sid: str) -> None: + """Safety net: clear any activity tied to a disconnecting socket.""" + try: + session = await socket_handler.socket_server.get_session(sid) or {} + except KeyError: + return + + user_id = session.get("activity_user_id") + device_id = session.get("activity_device_id") + if user_id is None or not device_id: + return + + rom_id = await activity_handler.clear_active(int(user_id), device_id) + if rom_id is None: + return + + await socket_handler.socket_server.emit( + "activity:clear", + ActivityClearSchema( + user_id=int(user_id), device_id=device_id, rom_id=rom_id + ).model_dump(), + ) diff --git a/backend/handler/activity_handler.py b/backend/handler/activity_handler.py new file mode 100644 index 0000000000..e057b43fd7 --- /dev/null +++ b/backend/handler/activity_handler.py @@ -0,0 +1,139 @@ +"""Real-time user game activity tracking. + +Stores ephemeral "currently playing" state in Redis. Each active session is a +Redis key with a short TTL, refreshed by periodic heartbeats from the client +(browser) or the device. When the TTL expires (no heartbeat received), the +session is considered ended automatically. +""" + +from __future__ import annotations + +import json +from typing import TypedDict + +from handler.redis_handler import async_cache +from logger.logger import log + + +class ActivityEntry(TypedDict): + user_id: int + username: str + avatar_path: str + rom_id: int + rom_name: str + rom_cover_path: str # small cover path, may be empty + platform_slug: str + platform_name: str + device_id: str + device_type: str # "web", "grout", "argosy-launcher", etc. + started_at: str # ISO 8601 timestamp + + +class ActivityHandler: + """Redis-backed store for currently active game play sessions.""" + + ACTIVITY_TTL = 90 # seconds; refreshed by heartbeats + ROM_INDEX_TTL = 120 # slightly longer than ACTIVITY_TTL + KEY_PREFIX = "activity:user:" + ROM_INDEX_PREFIX = "activity:rom:" + + def _activity_key(self, user_id: int, device_id: str) -> str: + return f"{self.KEY_PREFIX}{user_id}:{device_id}" + + def _rom_index_key(self, rom_id: int) -> str: + return f"{self.ROM_INDEX_PREFIX}{rom_id}" + + def _member(self, user_id: int, device_id: str) -> str: + return f"{user_id}:{device_id}" + + async def set_active(self, entry: ActivityEntry) -> None: + """Store or refresh a user's active play session.""" + key = self._activity_key(entry["user_id"], entry["device_id"]) + rom_key = self._rom_index_key(entry["rom_id"]) + member = self._member(entry["user_id"], entry["device_id"]) + + async with async_cache.pipeline() as pipe: + await pipe.set(key, json.dumps(entry), ex=self.ACTIVITY_TTL) + await pipe.sadd(rom_key, member) + await pipe.expire(rom_key, self.ROM_INDEX_TTL) + await pipe.execute() + + async def clear_active(self, user_id: int, device_id: str) -> int | None: + """Clear a user's active play session. Returns the rom_id that was cleared, or None.""" + key = self._activity_key(user_id, device_id) + raw = await async_cache.get(key) + if not raw: + return None + + try: + entry = json.loads(raw) + rom_id = int(entry["rom_id"]) + except (ValueError, KeyError, TypeError) as e: + log.warning(f"Failed to parse activity entry for cleanup: {e}") + await async_cache.delete(key) + return None + + member = self._member(user_id, device_id) + async with async_cache.pipeline() as pipe: + await pipe.delete(key) + await pipe.srem(self._rom_index_key(rom_id), member) + await pipe.execute() + return rom_id + + async def get_active(self, user_id: int, device_id: str) -> ActivityEntry | None: + """Get a single active session by user and device.""" + key = self._activity_key(user_id, device_id) + raw = await async_cache.get(key) + if not raw: + return None + try: + return json.loads(raw) + except ValueError: + return None + + async def get_all_active(self) -> list[ActivityEntry]: + """Get all currently active play sessions across all users.""" + entries: list[ActivityEntry] = [] + pattern = f"{self.KEY_PREFIX}*" + async for key in async_cache.scan_iter(match=pattern): + raw = await async_cache.get(key) + if not raw: + continue + try: + entries.append(json.loads(raw)) + except ValueError: + continue + return entries + + async def get_active_for_rom(self, rom_id: int) -> list[ActivityEntry]: + """Get all active play sessions for a specific ROM.""" + rom_key = self._rom_index_key(rom_id) + members = await async_cache.smembers(rom_key) + entries: list[ActivityEntry] = [] + stale_members: list[str] = [] + + for member in members: + try: + user_id_str, device_id = member.rsplit(":", 1) + user_id = int(user_id_str) + except (ValueError, AttributeError): + stale_members.append(member) + continue + + raw = await async_cache.get(self._activity_key(user_id, device_id)) + if not raw: + # Key expired; clean up the stale set member. + stale_members.append(member) + continue + try: + entries.append(json.loads(raw)) + except ValueError: + stale_members.append(member) + + if stale_members: + await async_cache.srem(rom_key, *stale_members) + + return entries + + +activity_handler = ActivityHandler() diff --git a/backend/main.py b/backend/main.py index 9430b51d24..57480b5168 100644 --- a/backend/main.py +++ b/backend/main.py @@ -13,6 +13,7 @@ from starlette.middleware.authentication import AuthenticationMiddleware from startup import main +import endpoints.sockets.activity # noqa import endpoints.sockets.netplay # noqa import endpoints.sockets.scan # noqa import endpoints.sockets.sync # noqa @@ -25,6 +26,7 @@ ROMM_AUTH_SECRET_KEY, SENTRY_DSN, ) +from endpoints.activity import router as activity_router from endpoints.auth import router as auth_router from endpoints.client_tokens import router as client_tokens_router from endpoints.collections import router as collections_router @@ -126,6 +128,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: app.include_router(heartbeat_router, prefix="/api") app.include_router(auth_router, prefix="/api") +app.include_router(activity_router, prefix="/api") app.include_router(user_router, prefix="/api") app.include_router(client_tokens_router, prefix="/api") app.include_router(device_router, prefix="/api") diff --git a/frontend/src/components/Details/ActivePlayers.vue b/frontend/src/components/Details/ActivePlayers.vue new file mode 100644 index 0000000000..6bccc6e9c6 --- /dev/null +++ b/frontend/src/components/Details/ActivePlayers.vue @@ -0,0 +1,77 @@ + + + diff --git a/frontend/src/components/common/Navigation/ActivityBtn.vue b/frontend/src/components/common/Navigation/ActivityBtn.vue new file mode 100644 index 0000000000..9283a727df --- /dev/null +++ b/frontend/src/components/common/Navigation/ActivityBtn.vue @@ -0,0 +1,68 @@ + + + diff --git a/frontend/src/components/common/Navigation/MainAppBar.vue b/frontend/src/components/common/Navigation/MainAppBar.vue index 57aa468011..4d5ac44e7c 100644 --- a/frontend/src/components/common/Navigation/MainAppBar.vue +++ b/frontend/src/components/common/Navigation/MainAppBar.vue @@ -4,6 +4,7 @@ import { storeToRefs } from "pinia"; import { useDisplay } from "vuetify"; import RandomBtn from "@/components/Gallery/AppBar/common/RandomBtn.vue"; import UploadRomDialog from "@/components/common/Game/Dialog/UploadRom.vue"; +import ActivityBtn from "@/components/common/Navigation/ActivityBtn.vue"; import CollectionsBtn from "@/components/common/Navigation/CollectionsBtn.vue"; import CollectionsDrawer from "@/components/common/Navigation/CollectionsDrawer.vue"; import ConsoleModeBtn from "@/components/common/Navigation/ConsoleModeBtn.vue"; @@ -47,6 +48,7 @@ function collapse() {