From be28a7fac27558b22ed67887da3762e7401d2b03 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 7 Apr 2026 21:44:21 +0000 Subject: [PATCH 1/2] feat(dlna_receiver): add dlna_receiver provider v1.0.0 --- .../providers/dlna_receiver/__init__.py | 113 +++ .../providers/dlna_receiver/constants.py | 51 ++ .../providers/dlna_receiver/eventing.py | 251 +++++++ .../providers/dlna_receiver/manifest.json | 15 + .../providers/dlna_receiver/provider.py | 637 ++++++++++++++++ .../providers/dlna_receiver/renderer.py | 689 ++++++++++++++++++ .../dlna_receiver/scpd/AVTransport.xml | 339 +++++++++ .../dlna_receiver/scpd/ConnectionManager.xml | 133 ++++ .../dlna_receiver/scpd/RenderingControl.xml | 121 +++ .../providers/dlna_receiver/ssdp.py | 217 ++++++ tests/providers/dlna_receiver/__init__.py | 1 + .../providers/dlna_receiver/test_eventing.py | 105 +++ .../providers/dlna_receiver/test_provider.py | 68 ++ .../providers/dlna_receiver/test_renderer.py | 251 +++++++ tests/providers/dlna_receiver/test_ssdp.py | 26 + 15 files changed, 3017 insertions(+) create mode 100644 music_assistant/providers/dlna_receiver/__init__.py create mode 100644 music_assistant/providers/dlna_receiver/constants.py create mode 100644 music_assistant/providers/dlna_receiver/eventing.py create mode 100644 music_assistant/providers/dlna_receiver/manifest.json create mode 100644 music_assistant/providers/dlna_receiver/provider.py create mode 100644 music_assistant/providers/dlna_receiver/renderer.py create mode 100644 music_assistant/providers/dlna_receiver/scpd/AVTransport.xml create mode 100644 music_assistant/providers/dlna_receiver/scpd/ConnectionManager.xml create mode 100644 music_assistant/providers/dlna_receiver/scpd/RenderingControl.xml create mode 100644 music_assistant/providers/dlna_receiver/ssdp.py create mode 100644 tests/providers/dlna_receiver/__init__.py create mode 100644 tests/providers/dlna_receiver/test_eventing.py create mode 100644 tests/providers/dlna_receiver/test_provider.py create mode 100644 tests/providers/dlna_receiver/test_renderer.py create mode 100644 tests/providers/dlna_receiver/test_ssdp.py diff --git a/music_assistant/providers/dlna_receiver/__init__.py b/music_assistant/providers/dlna_receiver/__init__.py new file mode 100644 index 0000000000..e7ddcb7e05 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/__init__.py @@ -0,0 +1,113 @@ +""" +DLNA Receiver — Music Assistant Plugin Provider. + +Exposes Music Assistant as a UPnP/DLNA MediaRenderer so that external +applications (Qobuz, BubbleUPnP, foobar2000, mconnect, etc.) can discover +and cast audio streams to any MA player. + +Architecture +~~~~~~~~~~~~ +1. SSDP advertisement — announces virtual MediaRenderers on the LAN +2. UPnP HTTP server — serves device/service XML descriptions and + accepts SOAP control actions (AVTransport, + RenderingControl, ConnectionManager) +3. PluginSource bridge — received audio URL is fed into the MA streaming + pipeline as a PluginSource, routed to the + corresponding target player + +Multi-player mode +~~~~~~~~~~~~~~~~~ +When ``target_players`` contains multiple comma-separated player_id values +(or the special value ``*``), the provider creates one virtual DLNA +renderer per player, each with a unique UDN and HTTP port. DLNA control +points see each renderer as a separate device — e.g. +"Music Assistant — Kitchen", "Music Assistant — Living Room". +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.config_entries import ConfigEntry, ConfigValueType +from music_assistant_models.enums import ConfigEntryType + +from .constants import ( + CONF_BIND_IP, + CONF_FRIENDLY_NAME, + CONF_HTTP_PORT, + CONF_TARGET_PLAYERS, + DEFAULT_FRIENDLY_NAME, + DEFAULT_HTTP_PORT, +) + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + return ( + ConfigEntry( + key=CONF_FRIENDLY_NAME, + type=ConfigEntryType.STRING, + label="Friendly name prefix", + description=( + "Prefix for DLNA renderer names shown on the network. " + "Player name is appended automatically in multi-player mode." + ), + default_value=DEFAULT_FRIENDLY_NAME, + required=True, + ), + ConfigEntry( + key=CONF_TARGET_PLAYERS, + type=ConfigEntryType.STRING, + label="Target players", + description=( + "Comma-separated MA player_ids to expose as DLNA renderers. " + 'Use "*" to auto-create a renderer for every MA player. ' + "Leave empty for a single renderer without a fixed target." + ), + required=False, + ), + ConfigEntry( + key=CONF_BIND_IP, + type=ConfigEntryType.STRING, + label="Bind IP address", + description=( + "IP address to bind the UPnP HTTP server and SSDP listener. " + "Leave empty to auto-detect." + ), + required=False, + ), + ConfigEntry( + key=CONF_HTTP_PORT, + type=ConfigEntryType.INTEGER, + label="HTTP base port", + description=( + "Base port for UPnP HTTP servers. In multi-player mode, " + "each renderer uses an incrementing port (8298, 8299, …)." + ), + default_value=DEFAULT_HTTP_PORT, + required=True, + ), + ) + + +async def setup( + mass: MusicAssistant, + manifest: ProviderManifest, + config: ProviderConfig, +) -> ProviderInstanceType: + """Set up the DLNA Receiver provider.""" + from .provider import DLNAReceiverProvider + + return DLNAReceiverProvider(mass, manifest, config) diff --git a/music_assistant/providers/dlna_receiver/constants.py b/music_assistant/providers/dlna_receiver/constants.py new file mode 100644 index 0000000000..0c2cf210b7 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/constants.py @@ -0,0 +1,51 @@ +"""Constants for the DLNA Receiver plugin provider.""" + +from __future__ import annotations + +# UPnP device and service identifiers +UPNP_DEVICE_TYPE = "urn:schemas-upnp-org:device:MediaRenderer:1" +UPNP_SERVICE_AV_TRANSPORT = "urn:schemas-upnp-org:service:AVTransport:1" +UPNP_SERVICE_RENDERING_CONTROL = "urn:schemas-upnp-org:service:RenderingControl:1" +UPNP_SERVICE_CONNECTION_MANAGER = "urn:schemas-upnp-org:service:ConnectionManager:1" + +# SSDP +SSDP_MULTICAST_ADDR = "239.255.255.250" +SSDP_PORT = 1900 +SSDP_MAX_AGE = 1800 # seconds + +# Config entry keys +CONF_FRIENDLY_NAME = "friendly_name" +CONF_TARGET_PLAYER = "target_player" +CONF_TARGET_PLAYERS = "target_players" +CONF_BIND_IP = "bind_ip" +CONF_HTTP_PORT = "http_port" + +# Defaults +DEFAULT_FRIENDLY_NAME = "Music Assistant" +DEFAULT_HTTP_PORT = 8298 # UPnP renderer HTTP port (separate from MA stream port) + +# Supported MIME types for incoming streams +SUPPORTED_MIME_TYPES = [ + "audio/flac", + "audio/x-flac", + "audio/wav", + "audio/x-wav", + "audio/mpeg", + "audio/mp3", + "audio/mp4", + "audio/aac", + "audio/ogg", + "audio/vorbis", + "audio/L16", + "audio/*", +] + +# UPnP transport states +TRANSPORT_STATE_STOPPED = "STOPPED" +TRANSPORT_STATE_PLAYING = "PLAYING" +TRANSPORT_STATE_PAUSED = "PAUSED_PLAYBACK" +TRANSPORT_STATE_TRANSITIONING = "TRANSITIONING" +TRANSPORT_STATE_NO_MEDIA = "NO_MEDIA_PRESENT" + +# UUID namespace for deterministic UDN generation +UDN_NAMESPACE = "ma-dlna-receiver" diff --git a/music_assistant/providers/dlna_receiver/eventing.py b/music_assistant/providers/dlna_receiver/eventing.py new file mode 100644 index 0000000000..7e10d0844f --- /dev/null +++ b/music_assistant/providers/dlna_receiver/eventing.py @@ -0,0 +1,251 @@ +"""UPnP GENA Eventing — subscription management and NOTIFY dispatch. + +Implements the General Event Notification Architecture (GENA) protocol +for UPnP service state variable change notifications. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from dataclasses import dataclass, field +from time import monotonic +from xml.sax.saxutils import escape + +import aiohttp + +LOGGER = logging.getLogger(__name__) + +DEFAULT_SUBSCRIPTION_TIMEOUT = 1800 # seconds + + +@dataclass +class Subscription: + """Represents a single GENA event subscription.""" + + sid: str + callback_urls: list[str] + timeout: int + created_at: float = field(default_factory=monotonic) + seq: int = 0 + + @property + def is_expired(self) -> bool: + """Check if this subscription has expired.""" + return (monotonic() - self.created_at) > self.timeout + + +class EventingManager: + """Manages GENA subscriptions and sends NOTIFY events. + + Each UPnP service has its own EventingManager instance keyed by + service name (e.g., "AVTransport", "RenderingControl"). + """ + + def __init__(self) -> None: + self._subscriptions: dict[str, Subscription] = {} + self._cleanup_task: asyncio.Task[None] | None = None + + async def start(self) -> None: + """Start the periodic subscription cleanup task.""" + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + + async def stop(self) -> None: + """Stop the cleanup task and clear all subscriptions.""" + if self._cleanup_task: + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass + self._cleanup_task = None + self._subscriptions.clear() + + def subscribe( + self, + callback_header: str, + timeout_header: str | None = None, + ) -> tuple[str, int]: + """Register a new subscription. + + Args: + callback_header: CALLBACK header value, e.g., '' + timeout_header: TIMEOUT header value, e.g., 'Second-1800' + + Returns: + Tuple of (SID, actual_timeout_seconds). + """ + callback_urls = self._parse_callback_header(callback_header) + if not callback_urls: + raise ValueError("No valid callback URLs in CALLBACK header") + + timeout = self._parse_timeout(timeout_header) + sid = f"uuid:{uuid.uuid4()}" + + self._subscriptions[sid] = Subscription( + sid=sid, + callback_urls=callback_urls, + timeout=timeout, + ) + LOGGER.info( + "New GENA subscription: SID=%s, callbacks=%s, timeout=%ds", + sid, + callback_urls, + timeout, + ) + return sid, timeout + + def renew( + self, + sid: str, + timeout_header: str | None = None, + ) -> int: + """Renew an existing subscription. + + Args: + sid: Subscription ID to renew. + timeout_header: New TIMEOUT header value. + + Returns: + Actual timeout seconds. + + Raises: + KeyError: If SID not found. + """ + sub = self._subscriptions.get(sid) + if sub is None: + raise KeyError(f"Unknown SID: {sid}") + + timeout = self._parse_timeout(timeout_header) + sub.timeout = timeout + sub.created_at = monotonic() + LOGGER.debug("Renewed subscription %s, timeout=%ds", sid, timeout) + return timeout + + def unsubscribe(self, sid: str) -> None: + """Remove a subscription.""" + removed = self._subscriptions.pop(sid, None) + if removed: + LOGGER.info("Removed GENA subscription: %s", sid) + else: + LOGGER.debug("Unsubscribe for unknown SID: %s", sid) + + async def notify(self, changed_vars: dict[str, str]) -> None: + """Send NOTIFY to all active subscribers with changed state variables. + + Args: + changed_vars: Mapping of state variable name to new value. + """ + if not changed_vars or not self._subscriptions: + return + + xml_body = self._build_propertyset(changed_vars) + + expired_sids: list[str] = [] + for sid, sub in self._subscriptions.items(): + if sub.is_expired: + expired_sids.append(sid) + continue + asyncio.create_task(self._send_notify(sub, xml_body)) + + for sid in expired_sids: + self._subscriptions.pop(sid, None) + + async def notify_initial(self, sid: str, all_vars: dict[str, str]) -> None: + """Send initial event (SEQ=0) to a newly subscribed control point.""" + sub = self._subscriptions.get(sid) + if sub is None: + return + xml_body = self._build_propertyset(all_vars) + await self._send_notify(sub, xml_body) + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + async def _send_notify(self, sub: Subscription, xml_body: str) -> None: + """Send a GENA NOTIFY to a single subscriber.""" + headers = { + "Content-Type": 'text/xml; charset="utf-8"', + "NT": "upnp:event", + "NTS": "upnp:propchange", + "SID": sub.sid, + "SEQ": str(sub.seq), + } + sub.seq += 1 + + for url in sub.callback_urls: + try: + async with ( + aiohttp.ClientSession() as session, + session.request( + "NOTIFY", + url, + headers=headers, + data=xml_body, + timeout=aiohttp.ClientTimeout(total=5), + ) as resp, + ): + if resp.status >= 300: + LOGGER.warning( + "NOTIFY to %s returned %s", + url, + resp.status, + ) + else: + LOGGER.debug("NOTIFY sent to %s (SEQ=%d)", url, sub.seq - 1) + return # success on first working callback + except Exception: + LOGGER.debug("NOTIFY to %s failed, trying next callback", url) + + LOGGER.warning("All NOTIFY callbacks failed for SID %s", sub.sid) + + @staticmethod + def _build_propertyset(variables: dict[str, str]) -> str: + """Build a GENA propertyset XML body.""" + props = "" + for name, value in variables.items(): + props += f"<{name}>{escape(value)}" + return ( + '' + '' + f"{props}" + "" + ) + + @staticmethod + def _parse_callback_header(header: str) -> list[str]: + """Parse CALLBACK header: '' -> ['url1', 'url2'].""" + urls: list[str] = [] + for part in header.split(">"): + part = part.strip() + if part.startswith("<"): + url = part[1:] + if url.startswith("http"): + urls.append(url) + return urls + + @staticmethod + def _parse_timeout(header: str | None) -> int: + """Parse TIMEOUT header: 'Second-1800' -> 1800.""" + if not header: + return DEFAULT_SUBSCRIPTION_TIMEOUT + header = header.strip() + if header.lower() == "infinite": + return DEFAULT_SUBSCRIPTION_TIMEOUT + if header.lower().startswith("second-"): + try: + return int(header.split("-", 1)[1]) + except (ValueError, IndexError): + pass + return DEFAULT_SUBSCRIPTION_TIMEOUT + + async def _cleanup_loop(self) -> None: + """Periodically remove expired subscriptions.""" + while True: + await asyncio.sleep(60) + expired = [sid for sid, sub in self._subscriptions.items() if sub.is_expired] + for sid in expired: + self._subscriptions.pop(sid, None) + LOGGER.debug("Cleaned up expired subscription: %s", sid) diff --git a/music_assistant/providers/dlna_receiver/manifest.json b/music_assistant/providers/dlna_receiver/manifest.json new file mode 100644 index 0000000000..7c315b10bf --- /dev/null +++ b/music_assistant/providers/dlna_receiver/manifest.json @@ -0,0 +1,15 @@ +{ + "type": "plugin", + "domain": "dlna_receiver", + "name": "DLNA Receiver", + "description": "Expose Music Assistant as a UPnP/DLNA MediaRenderer so external apps can cast audio to any MA player.", + "codeowners": ["@trudenboy"], + "credits": [ + "[async-upnp-client](https://github.com/StevenLooman/async_upnp_client)" + ], + "requirements": ["defusedxml>=0.7.0"], + "documentation": "https://trudenboy.github.io/ma-provider-dlna-receiver/", + "stage": "experimental", + "multi_instance": false, + "builtin": false +} diff --git a/music_assistant/providers/dlna_receiver/provider.py b/music_assistant/providers/dlna_receiver/provider.py new file mode 100644 index 0000000000..bf3f1cc404 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/provider.py @@ -0,0 +1,637 @@ +"""DLNA Receiver — Main provider implementation. + +Registers as a PluginProvider with AUDIO_SOURCE feature so that audio +received from external DLNA control points is routed through the MA +streaming pipeline to any configured player. + +Supports multi-player mode: one virtual DLNA renderer per MA player, +each with a unique UDN, HTTP port, and SSDP advertisement. +""" + +from __future__ import annotations + +import asyncio +import logging +import socket +import time +import uuid +from collections.abc import AsyncGenerator +from dataclasses import dataclass +from html import unescape +from typing import TYPE_CHECKING + +import defusedxml.ElementTree as DefusedET +from music_assistant_models.config_entries import ConfigValueType # noqa: F401 +from music_assistant_models.enums import MediaType, ProviderFeature +from music_assistant_models.streamdetails import StreamMetadata + +from music_assistant.models.player import PlayerMedia +from music_assistant.models.plugin import PluginProvider, PluginSource + +from .constants import ( + CONF_BIND_IP, + CONF_FRIENDLY_NAME, + CONF_HTTP_PORT, + CONF_TARGET_PLAYER, + CONF_TARGET_PLAYERS, + DEFAULT_FRIENDLY_NAME, + DEFAULT_HTTP_PORT, + UDN_NAMESPACE, +) +from .renderer import UPnPRenderer +from .ssdp import SSDPAdvertiser + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + +LOGGER = logging.getLogger(__name__) + + +@dataclass +class RendererInstance: + """Per-player renderer state: UPnP renderer + SSDP + streaming context.""" + + player_id: str + player_name: str + renderer: UPnPRenderer + ssdp: SSDPAdvertiser + current_stream_url: str | None = None + current_metadata: dict[str, str | None] | None = None + plugin_source: PluginSource | None = None + + +class DLNAReceiverProvider(PluginProvider): + """DLNA Receiver plugin provider for Music Assistant. + + Exposes MA as one or more UPnP MediaRenderers on the local network + so that external apps can send audio streams which are then played + on the corresponding MA player. + """ + + SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE} + + def __init__( + self, + mass: MusicAssistant, + manifest: ProviderManifest, + config: ProviderConfig, + ) -> None: + super().__init__(mass, manifest, config, self.SUPPORTED_FEATURES) + self._instances: dict[str, RendererInstance] = {} + self._plugin_source: PluginSource | None = None + # Playback state for elapsed time tracking + self._active_player_id: str | None = None + self._play_start_time: float | None = None + self._elapsed_offset: int = 0 + self._metadata_task: asyncio.Task[None] | None = None + + @property + def supported_features(self) -> set[ProviderFeature]: + """Return supported features.""" + return {ProviderFeature.AUDIO_SOURCE} + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def loaded_in_mass(self) -> None: + """Initialize renderer instances when loaded in Music Assistant.""" + self._friendly_prefix = str( + self.config.get_value(CONF_FRIENDLY_NAME) or DEFAULT_FRIENDLY_NAME, + ) + self._bind_ip = str(self.config.get_value(CONF_BIND_IP) or "") or self._detect_ip() + self._base_port = int( + self.config.get_value(CONF_HTTP_PORT) or DEFAULT_HTTP_PORT # type: ignore[arg-type] + ) + + raw_target = str(self.config.get_value(CONF_TARGET_PLAYERS) or "").strip() + + player_specs = self._resolve_player_specs() + + # When target_players=* but no players registered yet, retry after delay + if raw_target == "*" and not player_specs: + LOGGER.info("target_players=* but no players yet, waiting for registration...") + for attempt in range(12): + await asyncio.sleep(5) + player_specs = self._resolve_player_specs() + if player_specs: + LOGGER.info("Found %d players on attempt %d", len(player_specs), attempt + 1) + break + + # When target_players=*, wait a bit more for late-registering players + if raw_target == "*" and player_specs: + prev_count = len(player_specs) + for _ in range(4): + await asyncio.sleep(3) + player_specs = self._resolve_player_specs() + if len(player_specs) == prev_count: + break + LOGGER.info( + "Player count changed %d → %d, waiting for more...", + prev_count, + len(player_specs), + ) + prev_count = len(player_specs) + + if not player_specs: + # Fallback: single renderer with no fixed target + await self._create_instance( + player_id="", + player_name="", + friendly_prefix=self._friendly_prefix, + bind_ip=self._bind_ip, + http_port=self._base_port, + ) + else: + for idx, (pid, pname) in enumerate(player_specs): + await self._create_instance( + player_id=pid, + player_name=pname, + friendly_prefix=self._friendly_prefix, + bind_ip=self._bind_ip, + http_port=self._base_port + idx, + ) + + count = len(self._instances) + LOGGER.info( + "DLNA Receiver started: %d renderer(s) on %s (base port %s)", + count, + self._bind_ip, + self._base_port, + ) + + async def unload(self, is_removed: bool = False) -> None: + """Unload the provider — stop all renderer instances.""" + if self._metadata_task and not self._metadata_task.done(): + self._metadata_task.cancel() + for inst in self._instances.values(): + await inst.ssdp.stop() + await inst.renderer.stop() + self._instances.clear() + LOGGER.info("DLNA Receiver provider unloaded") + + # ------------------------------------------------------------------ + # Instance management + # ------------------------------------------------------------------ + + async def _create_instance( + self, + player_id: str, + player_name: str, + friendly_prefix: str, + bind_ip: str, + http_port: int, + ) -> RendererInstance: + """Create and start a single renderer instance for a player.""" + if player_name: + friendly_name = f"{friendly_prefix} — {player_name}" + else: + friendly_name = friendly_prefix + + udn = self._deterministic_udn(player_id) + + renderer = UPnPRenderer( + friendly_name=friendly_name, + bind_ip=bind_ip, + http_port=http_port, + udn=udn, + ) + + inst = RendererInstance( + player_id=player_id, + player_name=player_name, + renderer=renderer, + ssdp=SSDPAdvertiser( + udn=udn, + description_url=renderer.description_url, + bind_ip=bind_ip, + ), + ) + + # Wire SOAP callbacks bound to this instance + renderer.on_set_av_transport_uri = lambda uri, meta: self._on_set_transport_uri( + inst, uri, meta + ) + renderer.on_play = lambda: self._on_play(inst) + renderer.on_pause = lambda: self._on_pause(inst) + renderer.on_stop = lambda: self._on_stop(inst) + renderer.on_seek = lambda unit, target: self._on_seek(inst, unit, target) + renderer.on_set_volume = lambda vol: self._on_set_volume(inst, vol) + renderer.on_set_mute = lambda m: self._on_set_mute(inst, m) + + await renderer.start() + await inst.ssdp.start() + + key = player_id or "__default__" + self._instances[key] = inst + + LOGGER.info( + "Renderer '%s' → player '%s' on port %d (UDN: %s)", + friendly_name, + player_id or "(none)", + http_port, + udn, + ) + return inst + + def _resolve_player_specs(self) -> list[tuple[str, str]]: + """Resolve configured player targets to (player_id, display_name) pairs. + + Supports: + - Empty / not set → empty list (single unbound renderer) + - "*" → all currently known MA players + - Comma-separated player_ids + - Legacy CONF_TARGET_PLAYER (single player_id, backward compat) + """ + raw = str(self.config.get_value(CONF_TARGET_PLAYERS) or "").strip() + + # Backward compat: check old single-player key + if not raw: + raw = str(self.config.get_value(CONF_TARGET_PLAYER) or "").strip() + + if not raw: + return [] + + if raw == "*": + return self._get_all_players() + + # Comma-separated list + specs: list[tuple[str, str]] = [] + for pid in raw.split(","): + pid = pid.strip() + if pid: + name = self._get_player_name(pid) + specs.append((pid, name)) + return specs + + def _get_all_players(self) -> list[tuple[str, str]]: + """Get all MA players as (player_id, display_name) pairs. + + Includes unavailable players (they may come online later). + Filters out protocol players and our own DLNA Receiver renderers + (which the UPnP player provider discovers as "up" players). + """ + try: + players = self.mass.players.all_players( + return_unavailable=True, + return_protocol_players=False, + ) + all_pids = {p.player_id for p in players} + + # For every player_id, compute the UPnP player_id that would + # result from discovering our deterministic-UDN renderer. + # This catches ALL recursion levels without needing _instances. + own_renderer_pids: set[str] = set() + for pid in all_pids: + udn = self._deterministic_udn(pid) + upnp_pid = "up" + udn.replace("uuid:", "").replace("-", "") + own_renderer_pids.add(upnp_pid) + + # Also filter already-created instances (belt-and-suspenders) + own_renderer_pids.update( + inst.player_id for inst in self._instances.values() if inst.player_id + ) + + result = [] + for p in players: + if p.player_id in own_renderer_pids: + LOGGER.debug( + "Filtering out own renderer player: %s (%s)", + p.player_id, + p.display_name or p.name, + ) + continue + result.append((p.player_id, p.display_name or p.name or p.player_id)) + return result + except Exception: + LOGGER.warning("Could not enumerate MA players", exc_info=True) + return [] + + def _get_player_name(self, player_id: str) -> str: + """Get the display name for a player, falling back to the id.""" + try: + player = self.mass.players.get_player(player_id) + if player: + return player.display_name or player.name or player_id + return player_id + except Exception: + return player_id + + # ------------------------------------------------------------------ + # PluginProvider audio source interface + # ------------------------------------------------------------------ + + def get_source(self) -> PluginSource: + """Return the plugin source descriptor for this DLNA receiver. + + Returns a persistent instance so metadata updates are visible + to the MA controller between calls. + """ + if self._plugin_source is None: + self._plugin_source = PluginSource( + id=self.instance_id, + name=self.name or "DLNA Receiver", + passive=True, + can_play_pause=True, + ) + return self._plugin_source + + async def get_audio_stream( + self, + player_id: str, + ) -> AsyncGenerator[bytes, None]: + """Yield audio bytes from the received DLNA stream. + + MA calls this when the plugin source is activated on a player. + We proxy the external URL through aiohttp and yield raw bytes. + """ + import aiohttp + + inst = self._instances.get(player_id) or self._instances.get("__default__") + stream_url = inst.current_stream_url if inst else None + + if not stream_url: + LOGGER.warning( + "get_audio_stream(%s) called but no stream URL set", + player_id, + ) + return + + LOGGER.debug("Proxying DLNA stream for %s: %s", player_id, stream_url) + async with aiohttp.ClientSession() as session, session.get(stream_url) as resp: + async for chunk in resp.content.iter_any(): + yield chunk + LOGGER.debug("DLNA stream ended for %s", player_id) + + # ------------------------------------------------------------------ + # DIDL-Lite metadata parsing + # ------------------------------------------------------------------ + + @staticmethod + def _parse_didl_metadata(metadata: str | None) -> dict[str, str | None]: + """Parse DIDL-Lite XML and extract title, artist, album, image_url, duration.""" + result: dict[str, str | None] = { + "title": None, + "artist": None, + "album": None, + "image_url": None, + "duration": None, + } + if not metadata: + return result + + # SOAP bodies may contain XML-escaped DIDL-Lite content + metadata = unescape(metadata) + + try: + root = DefusedET.fromstring(metadata) + except Exception: + LOGGER.info("Failed to parse DIDL-Lite metadata: %s", metadata[:300]) + return result + + ns = { + "dc": "http://purl.org/dc/elements/1.1/", + "upnp": "urn:schemas-upnp-org:metadata-1-0/upnp/", + "didl": "urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/", + } + + item = root.find("didl:item", ns) + if item is None: + item = root + + title_el = item.find("dc:title", ns) + if title_el is not None and title_el.text: + result["title"] = title_el.text + + artist_el = item.find("upnp:artist", ns) + if artist_el is None: + artist_el = item.find("dc:creator", ns) + if artist_el is not None and artist_el.text: + result["artist"] = artist_el.text + + album_el = item.find("upnp:album", ns) + if album_el is not None and album_el.text: + result["album"] = album_el.text + + art_el = item.find("upnp:albumArtURI", ns) + if art_el is not None and art_el.text: + result["image_url"] = art_el.text + + # Extract duration from element + res_el = item.find("didl:res", ns) + if res_el is not None: + dur = res_el.get("duration") + if dur: + result["duration"] = dur + + return result + + # ------------------------------------------------------------------ + # Renderer callbacks (per-instance) + # ------------------------------------------------------------------ + + async def _on_set_transport_uri( + self, + inst: RendererInstance, + uri: str, + metadata: str | None, + ) -> None: + """Handle SetAVTransportURI for a specific renderer instance.""" + LOGGER.info( + "Received transport URI for '%s': %s", + inst.player_name or "(default)", + uri, + ) + inst.current_stream_url = uri + inst.current_metadata = self._parse_didl_metadata(metadata) + + async def _on_play(self, inst: RendererInstance) -> None: + """Handle Play — start streaming to this instance's player.""" + target = inst.player_id + if not target: + LOGGER.warning("No target player bound — ignoring Play") + return + + if not inst.current_stream_url: + LOGGER.warning("Play received but no stream URL for %s", target) + return + + LOGGER.info("Starting playback on player %s", target) + meta = inst.current_metadata or {} + LOGGER.debug("DIDL metadata for %s: %s", target, meta) + duration = self._parse_duration(meta.get("duration")) + + # Update plugin source metadata for MA UI display + source = self.get_source() + source.in_use_by = target + source.metadata = StreamMetadata( + title=meta.get("title") or "DLNA Stream", + artist=meta.get("artist"), + album=meta.get("album"), + image_url=meta.get("image_url"), + duration=duration, + uri=inst.current_stream_url, + elapsed_time=0, + elapsed_time_last_updated=time.time(), + ) + + # Track playback state for elapsed time + self._active_player_id = target + self._play_start_time = time.time() + self._elapsed_offset = 0 + self._ensure_metadata_task() + + media = PlayerMedia( + uri=inst.current_stream_url, + media_type=MediaType.FLOW_STREAM, + title=meta.get("title"), + artist=meta.get("artist"), + album=meta.get("album"), + image_url=meta.get("image_url"), + duration=duration, + source_id=self.instance_id, + ) + await self.mass.players.play_media(target, media) + + async def _on_pause(self, inst: RendererInstance) -> None: + """Handle Pause for this instance's player.""" + if inst.player_id: + self._freeze_elapsed() + await self.mass.players.cmd_pause(inst.player_id) + + async def _on_stop(self, inst: RendererInstance) -> None: + """Handle Stop for this instance's player.""" + if inst.player_id: + await self.mass.players.cmd_stop(inst.player_id) + inst.current_stream_url = None + self._clear_playback_state() + + async def _on_seek(self, inst: RendererInstance, unit: str, target: str) -> None: + """Handle Seek for this instance's player.""" + if not inst.player_id: + return + position = self._parse_duration(target) + if position is not None: + LOGGER.info("Seeking player %s to %ds", inst.player_id, position) + await self.mass.players.cmd_seek(inst.player_id, position) + else: + LOGGER.warning("Could not parse seek target: %s", target) + + async def _on_set_volume(self, inst: RendererInstance, volume: int) -> None: + """Handle volume change for this instance's player.""" + if inst.player_id: + await self.mass.players.cmd_volume_set(inst.player_id, volume) + + async def _on_set_mute(self, inst: RendererInstance, mute: bool) -> None: + """Handle mute change for this instance's player.""" + if inst.player_id: + await self.mass.players.cmd_volume_mute(inst.player_id, mute) + + # ------------------------------------------------------------------ + # Playback state & metadata helpers + # ------------------------------------------------------------------ + + def _freeze_elapsed(self) -> None: + """Freeze elapsed time at the current playback position.""" + if self._play_start_time: + self._elapsed_offset += int(time.time() - self._play_start_time) + self._play_start_time = None + + def _clear_playback_state(self) -> None: + """Clear all playback state and metadata.""" + self._play_start_time = None + self._elapsed_offset = 0 + self._active_player_id = None + if self._plugin_source: + self._plugin_source.metadata = None + self._plugin_source.in_use_by = None + if self._metadata_task and not self._metadata_task.done(): + self._metadata_task.cancel() + + def _ensure_metadata_task(self) -> None: + """Start the metadata update loop if not already running.""" + if self._metadata_task and not self._metadata_task.done(): + return + self._metadata_task = asyncio.create_task(self._metadata_update_loop()) + + async def _metadata_update_loop(self) -> None: + """Periodically update elapsed time and trigger UI refresh.""" + try: + while True: + await asyncio.sleep(2) + source = self._plugin_source + if not source or not source.metadata: + break + + now = time.time() + + if self._play_start_time: + elapsed = self._elapsed_offset + int(now - self._play_start_time) + source.metadata.elapsed_time = elapsed + source.metadata.elapsed_time_last_updated = now + else: + # Paused — keep last_updated fresh to freeze UI display + source.metadata.elapsed_time = self._elapsed_offset + source.metadata.elapsed_time_last_updated = now + + # Check if player still exists and source is still active + if self._active_player_id: + player = self.mass.players.get_player(self._active_player_id) + if not player: + LOGGER.debug("Metadata loop: player %s gone", self._active_player_id) + self._clear_playback_state() + break + + # Trigger player update so UI reflects metadata changes + if self._active_player_id: + self.mass.players.trigger_player_update(self._active_player_id) + except asyncio.CancelledError: + pass + except Exception: + LOGGER.debug("Metadata update loop error", exc_info=True) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _parse_duration(value: str | None) -> int | None: + """Parse UPnP duration string (H:MM:SS or H:MM:SS.xxx) to seconds.""" + if not value: + return None + try: + parts = value.split(":") + if len(parts) == 3: + h, m, s = parts + return int(h) * 3600 + int(m) * 60 + int(float(s)) + if len(parts) == 2: + m, s = parts + return int(m) * 60 + int(float(s)) + return int(float(value)) + except (ValueError, TypeError): + return None + + @staticmethod + def _deterministic_udn(player_id: str) -> str: + """Generate a deterministic UDN from the player_id. + + Uses UUID5 so the same player always gets the same UDN, + keeping DLNA control point bookmarks stable across restarts. + """ + namespace = uuid.uuid5(uuid.NAMESPACE_URL, UDN_NAMESPACE) + return f"uuid:{uuid.uuid5(namespace, player_id or '__default__')}" + + @staticmethod + def _detect_ip() -> str: + """Detect the primary LAN IP address.""" + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.connect(("8.8.8.8", 80)) + ip: str = sock.getsockname()[0] + sock.close() + return ip + except Exception: + return "0.0.0.0" diff --git a/music_assistant/providers/dlna_receiver/renderer.py b/music_assistant/providers/dlna_receiver/renderer.py new file mode 100644 index 0000000000..6fdbf4ea1e --- /dev/null +++ b/music_assistant/providers/dlna_receiver/renderer.py @@ -0,0 +1,689 @@ +"""DLNA Receiver — UPnP MediaRenderer implementation. + +This module contains the HTTP server that serves UPnP device/service XML +descriptions and processes incoming SOAP control actions from DLNA +control points. Includes GENA eventing for state change notifications. +""" + +from __future__ import annotations + +import logging +import uuid +from pathlib import Path +from typing import Any +from xml.etree.ElementTree import Element, SubElement, tostring + +from aiohttp import web + +from .constants import ( + DEFAULT_HTTP_PORT, + SUPPORTED_MIME_TYPES, + TRANSPORT_STATE_NO_MEDIA, + TRANSPORT_STATE_PAUSED, + TRANSPORT_STATE_PLAYING, + TRANSPORT_STATE_STOPPED, + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_CONNECTION_MANAGER, + UPNP_SERVICE_RENDERING_CONTROL, +) +from .eventing import EventingManager + +LOGGER = logging.getLogger(__name__) + +SCPD_DIR = Path(__file__).parent / "scpd" + + +class UPnPRenderer: + """Virtual UPnP MediaRenderer with SOAP action handling.""" + + def __init__( + self, + friendly_name: str, + bind_ip: str, + http_port: int = DEFAULT_HTTP_PORT, + udn: str | None = None, + ) -> None: + self.friendly_name = friendly_name + self.bind_ip = bind_ip + self.http_port = http_port + self.udn = udn or f"uuid:{uuid.uuid4()}" + + # Transport state + self.transport_state: str = TRANSPORT_STATE_NO_MEDIA + self.current_uri: str = "" + self.current_uri_metadata: str = "" + self.volume: int = 50 + self.mute: bool = False + + # HTTP server + self._app = web.Application() + self._runner: web.AppRunner | None = None + self._setup_routes() + + # GENA eventing managers (one per service) + self._evt_av_transport = EventingManager() + self._evt_rendering_control = EventingManager() + self._evt_connection_manager = EventingManager() + + # Callbacks (set by provider) + self.on_set_av_transport_uri: Any = None + self.on_play: Any = None + self.on_pause: Any = None + self.on_stop: Any = None + self.on_seek: Any = None + self.on_set_volume: Any = None + self.on_set_mute: Any = None + + def _setup_routes(self) -> None: + """Register HTTP routes for UPnP description, control, and eventing.""" + self._app.router.add_get("/description.xml", self._handle_description) + # SCPD routes + self._app.router.add_get( + "/AVTransport/description.xml", + self._handle_av_transport_scpd, + ) + self._app.router.add_get( + "/RenderingControl/description.xml", + self._handle_rendering_control_scpd, + ) + self._app.router.add_get( + "/ConnectionManager/description.xml", + self._handle_connection_manager_scpd, + ) + # SOAP control routes + self._app.router.add_post("/AVTransport/control", self._handle_av_transport) + self._app.router.add_post( + "/RenderingControl/control", + self._handle_rendering_control, + ) + self._app.router.add_post( + "/ConnectionManager/control", + self._handle_connection_manager, + ) + # GENA event subscription routes + self._app.router.add_route( + "SUBSCRIBE", + "/AVTransport/event", + self._handle_subscribe_av_transport, + ) + self._app.router.add_route( + "UNSUBSCRIBE", + "/AVTransport/event", + self._handle_unsubscribe_av_transport, + ) + self._app.router.add_route( + "SUBSCRIBE", + "/RenderingControl/event", + self._handle_subscribe_rendering_control, + ) + self._app.router.add_route( + "UNSUBSCRIBE", + "/RenderingControl/event", + self._handle_unsubscribe_rendering_control, + ) + self._app.router.add_route( + "SUBSCRIBE", + "/ConnectionManager/event", + self._handle_subscribe_connection_manager, + ) + self._app.router.add_route( + "UNSUBSCRIBE", + "/ConnectionManager/event", + self._handle_unsubscribe_connection_manager, + ) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Start the UPnP HTTP server and eventing managers.""" + self._runner = web.AppRunner(self._app) + await self._runner.setup() + site = web.TCPSite(self._runner, self.bind_ip, self.http_port) + await site.start() + await self._evt_av_transport.start() + await self._evt_rendering_control.start() + await self._evt_connection_manager.start() + LOGGER.info( + "UPnP renderer HTTP server listening on %s:%s", + self.bind_ip, + self.http_port, + ) + + async def stop(self) -> None: + """Stop the UPnP HTTP server and eventing managers.""" + await self._evt_av_transport.stop() + await self._evt_rendering_control.stop() + await self._evt_connection_manager.stop() + if self._runner: + await self._runner.cleanup() + self._runner = None + LOGGER.info("UPnP renderer HTTP server stopped") + + @property + def description_url(self) -> str: + """Return the device description URL.""" + return f"http://{self.bind_ip}:{self.http_port}/description.xml" + + # ------------------------------------------------------------------ + # UPnP Device Description + # ------------------------------------------------------------------ + + async def _handle_description(self, _request: web.Request) -> web.Response: + """Return the root UPnP device description XML.""" + root = Element("root", xmlns="urn:schemas-upnp-org:device-1-0") + spec = SubElement(root, "specVersion") + SubElement(spec, "major").text = "1" + SubElement(spec, "minor").text = "0" + + device = SubElement(root, "device") + SubElement(device, "deviceType").text = UPNP_DEVICE_TYPE + SubElement(device, "friendlyName").text = self.friendly_name + SubElement(device, "manufacturer").text = "Music Assistant" + SubElement(device, "modelName").text = "DLNA Receiver" + SubElement(device, "modelDescription").text = "Music Assistant DLNA Receiver Bridge" + SubElement(device, "UDN").text = self.udn + + service_list = SubElement(device, "serviceList") + for svc_type, svc_id, scpd_url, ctrl_url, event_url in [ + ( + UPNP_SERVICE_AV_TRANSPORT, + "urn:upnp-org:serviceId:AVTransport", + "/AVTransport/description.xml", + "/AVTransport/control", + "/AVTransport/event", + ), + ( + UPNP_SERVICE_RENDERING_CONTROL, + "urn:upnp-org:serviceId:RenderingControl", + "/RenderingControl/description.xml", + "/RenderingControl/control", + "/RenderingControl/event", + ), + ( + UPNP_SERVICE_CONNECTION_MANAGER, + "urn:upnp-org:serviceId:ConnectionManager", + "/ConnectionManager/description.xml", + "/ConnectionManager/control", + "/ConnectionManager/event", + ), + ]: + svc = SubElement(service_list, "service") + SubElement(svc, "serviceType").text = svc_type + SubElement(svc, "serviceId").text = svc_id + SubElement(svc, "SCPDURL").text = scpd_url + SubElement(svc, "controlURL").text = ctrl_url + SubElement(svc, "eventSubURL").text = event_url + + xml_bytes = b'' + tostring(root, encoding="unicode").encode() + return web.Response(body=xml_bytes, content_type="text/xml") + + # ------------------------------------------------------------------ + # Service SCPDs (served from static XML files) + # ------------------------------------------------------------------ + + async def _handle_av_transport_scpd(self, _request: web.Request) -> web.Response: + """Return AVTransport service description.""" + return self._serve_scpd("AVTransport.xml") + + async def _handle_rendering_control_scpd( + self, + _request: web.Request, + ) -> web.Response: + """Return RenderingControl service description.""" + return self._serve_scpd("RenderingControl.xml") + + async def _handle_connection_manager_scpd( + self, + _request: web.Request, + ) -> web.Response: + """Return ConnectionManager service description.""" + return self._serve_scpd("ConnectionManager.xml") + + @staticmethod + def _serve_scpd(filename: str) -> web.Response: + """Read and serve a SCPD XML file.""" + xml_bytes = (SCPD_DIR / filename).read_bytes() + return web.Response(body=xml_bytes, content_type="text/xml") + + # ------------------------------------------------------------------ + # SOAP Action Handlers + # ------------------------------------------------------------------ + + async def _handle_av_transport(self, request: web.Request) -> web.Response: + """Handle AVTransport SOAP actions.""" + body = await request.text() + soap_action = request.headers.get("SOAPACTION", "").strip('"') + action_name = soap_action.rsplit("#", 1)[-1] if "#" in soap_action else "" + LOGGER.debug("AVTransport action: %s", action_name) + + if action_name == "SetAVTransportURI": + uri = self._extract_xml_value(body, "CurrentURI") + metadata = self._extract_xml_value(body, "CurrentURIMetaData") + LOGGER.debug("SetAVTransportURI raw metadata (first 500): %s", (metadata or "")[:500]) + self.current_uri = uri or "" + self.current_uri_metadata = metadata or "" + self.transport_state = TRANSPORT_STATE_STOPPED + if self.on_set_av_transport_uri: + await self.on_set_av_transport_uri(self.current_uri, metadata) + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Play": + self.transport_state = TRANSPORT_STATE_PLAYING + if self.on_play: + await self.on_play() + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Pause": + self.transport_state = TRANSPORT_STATE_PAUSED + if self.on_pause: + await self.on_pause() + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Stop": + self.transport_state = TRANSPORT_STATE_STOPPED + if self.on_stop: + await self.on_stop() + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Seek": + unit = self._extract_xml_value(body, "Unit") or "" + target = self._extract_xml_value(body, "Target") or "" + LOGGER.info("Seek requested: Unit=%s, Target=%s", unit, target) + if self.on_seek: + await self.on_seek(unit, target) + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "GetTransportInfo": + return self._soap_response( + action_name, + UPNP_SERVICE_AV_TRANSPORT, + { + "CurrentTransportState": self.transport_state, + "CurrentTransportStatus": "OK", + "CurrentSpeed": "1", + }, + ) + + if action_name == "GetPositionInfo": + return self._soap_response( + action_name, + UPNP_SERVICE_AV_TRANSPORT, + { + "Track": "1", + "TrackDuration": "00:00:00", + "TrackMetaData": self.current_uri_metadata, + "TrackURI": self.current_uri, + "RelTime": "00:00:00", + "AbsTime": "00:00:00", + "RelCount": "0", + "AbsCount": "0", + }, + ) + + if action_name == "GetMediaInfo": + return self._soap_response( + action_name, + UPNP_SERVICE_AV_TRANSPORT, + { + "NrTracks": "1", + "MediaDuration": "00:00:00", + "CurrentURI": self.current_uri, + "CurrentURIMetaData": self.current_uri_metadata, + "NextURI": "", + "NextURIMetaData": "", + "PlayMedium": "NETWORK", + "RecordMedium": "NOT_IMPLEMENTED", + "WriteStatus": "NOT_IMPLEMENTED", + }, + ) + + LOGGER.warning("Unhandled AVTransport action: %s", action_name) + return self._soap_error(401, "Invalid Action") + + async def _handle_rendering_control(self, request: web.Request) -> web.Response: + """Handle RenderingControl SOAP actions.""" + body = await request.text() + soap_action = request.headers.get("SOAPACTION", "").strip('"') + action_name = soap_action.rsplit("#", 1)[-1] if "#" in soap_action else "" + LOGGER.debug("RenderingControl action: %s", action_name) + + if action_name == "GetVolume": + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + {"CurrentVolume": str(self.volume)}, + ) + + if action_name == "SetVolume": + vol_str = self._extract_xml_value(body, "DesiredVolume") + if vol_str is not None: + self.volume = max(0, min(100, int(vol_str))) + if self.on_set_volume: + await self.on_set_volume(self.volume) + await self._notify_rendering_control_change() + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + ) + + if action_name == "GetMute": + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + {"CurrentMute": "1" if self.mute else "0"}, + ) + + if action_name == "SetMute": + mute_str = self._extract_xml_value(body, "DesiredMute") + if mute_str is not None: + self.mute = mute_str in ("1", "true", "True") + if self.on_set_mute: + await self.on_set_mute(self.mute) + await self._notify_rendering_control_change() + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + ) + + LOGGER.warning("Unhandled RenderingControl action: %s", action_name) + return self._soap_error(401, "Invalid Action") + + async def _handle_connection_manager(self, request: web.Request) -> web.Response: + """Handle ConnectionManager SOAP actions.""" + soap_action = request.headers.get("SOAPACTION", "").strip('"') + action_name = soap_action.rsplit("#", 1)[-1] if "#" in soap_action else "" + LOGGER.debug("ConnectionManager action: %s", action_name) + + if action_name == "GetProtocolInfo": + sink_protocols = ",".join(f"http-get:*:{mime}:*" for mime in SUPPORTED_MIME_TYPES) + return self._soap_response( + action_name, + UPNP_SERVICE_CONNECTION_MANAGER, + {"Source": "", "Sink": sink_protocols}, + ) + + if action_name == "GetCurrentConnectionIDs": + return self._soap_response( + action_name, + UPNP_SERVICE_CONNECTION_MANAGER, + {"ConnectionIDs": "0"}, + ) + + if action_name == "GetCurrentConnectionInfo": + sink_protocols = ",".join(f"http-get:*:{mime}:*" for mime in SUPPORTED_MIME_TYPES) + return self._soap_response( + action_name, + UPNP_SERVICE_CONNECTION_MANAGER, + { + "RcsID": "0", + "AVTransportID": "0", + "ProtocolInfo": sink_protocols, + "PeerConnectionManager": "", + "PeerConnectionID": "-1", + "Direction": "Input", + "Status": "OK", + }, + ) + + LOGGER.warning("Unhandled ConnectionManager action: %s", action_name) + return self._soap_error(401, "Invalid Action") + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _extract_xml_value(xml_str: str, tag: str) -> str | None: + """Extract a value from a SOAP XML body by tag name (naive parser).""" + import re + + pattern = rf"<[^>]*{tag}[^>]*>(.*?)]*{tag}>" + match = re.search(pattern, xml_str, re.DOTALL) + return match.group(1) if match else None + + @staticmethod + def _soap_response( + action_name: str, + service_type: str, + values: dict[str, str] | None = None, + ) -> web.Response: + """Build a UPnP SOAP response envelope.""" + body = f""" + + + """ + if values: + for key, val in values.items(): + body += f"\n <{key}>{val}" + body += f""" + + +""" + return web.Response(body=body, content_type="text/xml", charset="utf-8") + + @staticmethod + def _soap_error(code: int, description: str) -> web.Response: + """Build a UPnP SOAP error response.""" + body = f""" + + + + s:Client + UPnPError + + + {code} + {description} + + + + +""" + return web.Response( + body=body, + status=500, + content_type="text/xml", + charset="utf-8", + ) + + # ------------------------------------------------------------------ + # GENA Event Subscription Handlers + # ------------------------------------------------------------------ + + async def _handle_subscribe_av_transport( + self, + request: web.Request, + ) -> web.Response: + """Handle SUBSCRIBE for AVTransport events.""" + return await self._handle_subscribe( + request, + self._evt_av_transport, + self._get_av_transport_vars(), + ) + + async def _handle_unsubscribe_av_transport( + self, + request: web.Request, + ) -> web.Response: + """Handle UNSUBSCRIBE for AVTransport events.""" + return self._handle_unsubscribe(request, self._evt_av_transport) + + async def _handle_subscribe_rendering_control( + self, + request: web.Request, + ) -> web.Response: + """Handle SUBSCRIBE for RenderingControl events.""" + return await self._handle_subscribe( + request, + self._evt_rendering_control, + self._get_rendering_control_vars(), + ) + + async def _handle_unsubscribe_rendering_control( + self, + request: web.Request, + ) -> web.Response: + """Handle UNSUBSCRIBE for RenderingControl events.""" + return self._handle_unsubscribe(request, self._evt_rendering_control) + + async def _handle_subscribe_connection_manager( + self, + request: web.Request, + ) -> web.Response: + """Handle SUBSCRIBE for ConnectionManager events.""" + sink_protocols = ",".join(f"http-get:*:{mime}:*" for mime in SUPPORTED_MIME_TYPES) + initial_vars = { + "SourceProtocolInfo": "", + "SinkProtocolInfo": sink_protocols, + "CurrentConnectionIDs": "0", + } + return await self._handle_subscribe( + request, + self._evt_connection_manager, + initial_vars, + ) + + async def _handle_unsubscribe_connection_manager( + self, + request: web.Request, + ) -> web.Response: + """Handle UNSUBSCRIBE for ConnectionManager events.""" + return self._handle_unsubscribe(request, self._evt_connection_manager) + + async def _handle_subscribe( + self, + request: web.Request, + manager: EventingManager, + initial_vars: dict[str, str], + ) -> web.Response: + """Handle SUBSCRIBE requests for a UPnP service.""" + sid = request.headers.get("SID") + + if sid: + # Renewal + try: + timeout = manager.renew( + sid, + request.headers.get("TIMEOUT"), + ) + except KeyError: + return web.Response(status=412, text="Invalid SID") + return web.Response( + status=200, + headers={ + "SID": sid, + "TIMEOUT": f"Second-{timeout}", + }, + ) + + # New subscription + callback = request.headers.get("CALLBACK") + if not callback: + return web.Response(status=412, text="Missing CALLBACK header") + + try: + sid, timeout = manager.subscribe( + callback, + request.headers.get("TIMEOUT"), + ) + except ValueError as exc: + return web.Response(status=412, text=str(exc)) + + # Send initial event with current state + await manager.notify_initial(sid, initial_vars) + + return web.Response( + status=200, + headers={ + "SID": sid, + "TIMEOUT": f"Second-{timeout}", + "Server": "UPnP/1.0 MusicAssistant/1.0", + }, + ) + + @staticmethod + def _handle_unsubscribe( + request: web.Request, + manager: EventingManager, + ) -> web.Response: + """Handle UNSUBSCRIBE requests for a UPnP service.""" + sid = request.headers.get("SID") + if not sid: + return web.Response(status=412, text="Missing SID header") + manager.unsubscribe(sid) + return web.Response(status=200) + + # ------------------------------------------------------------------ + # Event Notification Helpers + # ------------------------------------------------------------------ + + def _get_av_transport_vars(self) -> dict[str, str]: + """Get current AVTransport state as a LastChange XML fragment.""" + last_change = self._build_last_change( + "urn:schemas-upnp-org:service:AVTransport:1", + { + "TransportState": self.transport_state, + "TransportStatus": "OK", + "TransportPlaySpeed": "1", + "CurrentTrackURI": self.current_uri, + "AVTransportURI": self.current_uri, + "AVTransportURIMetaData": self.current_uri_metadata, + "CurrentTrackMetaData": self.current_uri_metadata, + }, + ) + return {"LastChange": last_change} + + def _get_rendering_control_vars(self) -> dict[str, str]: + """Get current RenderingControl state as a LastChange XML fragment.""" + last_change = self._build_last_change( + "urn:schemas-upnp-org:service:RenderingControl:1", + { + "Volume": str(self.volume), + "Mute": "1" if self.mute else "0", + }, + channel="Master", + ) + return {"LastChange": last_change} + + async def _notify_av_transport_change(self) -> None: + """Notify AVTransport subscribers of state changes.""" + await self._evt_av_transport.notify(self._get_av_transport_vars()) + + async def _notify_rendering_control_change(self) -> None: + """Notify RenderingControl subscribers of state changes.""" + await self._evt_rendering_control.notify( + self._get_rendering_control_vars(), + ) + + @staticmethod + def _build_last_change( + namespace: str, + variables: dict[str, str], + channel: str | None = None, + ) -> str: + """Build a LastChange XML value for GENA eventing. + + The LastChange event wraps state variable changes in an + structure as required by UPnP spec. + """ + from xml.sax.saxutils import escape + + parts: list[str] = [] + for name, value in variables.items(): + attrs = f'val="{escape(value)}"' + if channel: + attrs += f' channel="{channel}"' + parts.append(f"<{name} {attrs}/>") + + return ( + f'{"".join(parts)}' + ) diff --git a/music_assistant/providers/dlna_receiver/scpd/AVTransport.xml b/music_assistant/providers/dlna_receiver/scpd/AVTransport.xml new file mode 100644 index 0000000000..3cc047d0a5 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/scpd/AVTransport.xml @@ -0,0 +1,339 @@ + + + + 1 + 0 + + + + SetAVTransportURI + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + CurrentURI + in + AVTransportURI + + + CurrentURIMetaData + in + AVTransportURIMetaData + + + + + GetTransportInfo + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + CurrentTransportState + out + TransportState + + + CurrentTransportStatus + out + TransportStatus + + + CurrentSpeed + out + TransportPlaySpeed + + + + + Play + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Speed + in + TransportPlaySpeed + + + + + Pause + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + + + Stop + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + + + Seek + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Unit + in + A_ARG_TYPE_SeekMode + + + Target + in + A_ARG_TYPE_SeekTarget + + + + + GetPositionInfo + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Track + out + CurrentTrack + + + TrackDuration + out + CurrentTrackDuration + + + TrackMetaData + out + CurrentTrackMetaData + + + TrackURI + out + CurrentTrackURI + + + RelTime + out + RelativeTimePosition + + + AbsTime + out + AbsoluteTimePosition + + + RelCount + out + RelativeCounterPosition + + + AbsCount + out + AbsoluteCounterPosition + + + + + GetMediaInfo + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + NrTracks + out + NumberOfTracks + + + MediaDuration + out + CurrentMediaDuration + + + CurrentURI + out + AVTransportURI + + + CurrentURIMetaData + out + AVTransportURIMetaData + + + NextURI + out + NextAVTransportURI + + + NextURIMetaData + out + NextAVTransportURIMetaData + + + PlayMedium + out + PlaybackStorageMedium + + + RecordMedium + out + RecordStorageMedium + + + WriteStatus + out + RecordMediumWriteStatus + + + + + + + A_ARG_TYPE_InstanceID + ui4 + + + A_ARG_TYPE_SeekMode + string + + REL_TIME + TRACK_NR + + + + A_ARG_TYPE_SeekTarget + string + + + TransportState + string + + STOPPED + PLAYING + TRANSITIONING + PAUSED_PLAYBACK + NO_MEDIA_PRESENT + + + + TransportStatus + string + + OK + ERROR_OCCURRED + + + + TransportPlaySpeed + string + 1 + + + CurrentTrack + ui4 + 0 + + + CurrentTrackDuration + string + 00:00:00 + + + CurrentTrackMetaData + string + + + CurrentTrackURI + string + + + AVTransportURI + string + + + AVTransportURIMetaData + string + + + NextAVTransportURI + string + + + NextAVTransportURIMetaData + string + + + RelativeTimePosition + string + 00:00:00 + + + AbsoluteTimePosition + string + 00:00:00 + + + RelativeCounterPosition + i4 + 0 + + + AbsoluteCounterPosition + i4 + 0 + + + NumberOfTracks + ui4 + 0 + + + CurrentMediaDuration + string + 00:00:00 + + + PlaybackStorageMedium + string + NETWORK + + + RecordStorageMedium + string + NOT_IMPLEMENTED + + + RecordMediumWriteStatus + string + NOT_IMPLEMENTED + + + LastChange + string + + + diff --git a/music_assistant/providers/dlna_receiver/scpd/ConnectionManager.xml b/music_assistant/providers/dlna_receiver/scpd/ConnectionManager.xml new file mode 100644 index 0000000000..8f1fd80ca7 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/scpd/ConnectionManager.xml @@ -0,0 +1,133 @@ + + + + 1 + 0 + + + + GetProtocolInfo + + + Source + out + SourceProtocolInfo + + + Sink + out + SinkProtocolInfo + + + + + GetCurrentConnectionIDs + + + ConnectionIDs + out + CurrentConnectionIDs + + + + + GetCurrentConnectionInfo + + + ConnectionID + in + A_ARG_TYPE_ConnectionID + + + RcsID + out + A_ARG_TYPE_RcsID + + + AVTransportID + out + A_ARG_TYPE_AVTransportID + + + ProtocolInfo + out + A_ARG_TYPE_ProtocolInfo + + + PeerConnectionManager + out + A_ARG_TYPE_ConnectionManager + + + PeerConnectionID + out + A_ARG_TYPE_ConnectionID + + + Direction + out + A_ARG_TYPE_Direction + + + Status + out + A_ARG_TYPE_ConnectionStatus + + + + + + + SourceProtocolInfo + string + + + SinkProtocolInfo + string + + + CurrentConnectionIDs + string + 0 + + + A_ARG_TYPE_ConnectionID + i4 + + + A_ARG_TYPE_RcsID + i4 + + + A_ARG_TYPE_AVTransportID + i4 + + + A_ARG_TYPE_ProtocolInfo + string + + + A_ARG_TYPE_ConnectionManager + string + + + A_ARG_TYPE_Direction + string + + Input + Output + + + + A_ARG_TYPE_ConnectionStatus + string + + OK + ContentFormatMismatch + InsufficientBandwidth + UnreliableChannel + Unknown + + + + diff --git a/music_assistant/providers/dlna_receiver/scpd/RenderingControl.xml b/music_assistant/providers/dlna_receiver/scpd/RenderingControl.xml new file mode 100644 index 0000000000..7f470ba7d2 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/scpd/RenderingControl.xml @@ -0,0 +1,121 @@ + + + + 1 + 0 + + + + GetVolume + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + CurrentVolume + out + Volume + + + + + SetVolume + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + DesiredVolume + in + Volume + + + + + GetMute + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + CurrentMute + out + Mute + + + + + SetMute + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + DesiredMute + in + Mute + + + + + + + A_ARG_TYPE_InstanceID + ui4 + + + A_ARG_TYPE_Channel + string + + Master + + + + Volume + ui2 + + 0 + 100 + 1 + + 50 + + + Mute + boolean + 0 + + + LastChange + string + + + diff --git a/music_assistant/providers/dlna_receiver/ssdp.py b/music_assistant/providers/dlna_receiver/ssdp.py new file mode 100644 index 0000000000..e178e75fb7 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/ssdp.py @@ -0,0 +1,217 @@ +"""DLNA Receiver — SSDP advertisement for the virtual MediaRenderer.""" + +from __future__ import annotations + +import asyncio +import logging +import socket + +from .constants import ( + SSDP_MAX_AGE, + SSDP_MULTICAST_ADDR, + SSDP_PORT, + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_CONNECTION_MANAGER, + UPNP_SERVICE_RENDERING_CONTROL, +) + +LOGGER = logging.getLogger(__name__) + + +class SSDPAdvertiser: + """Advertise a UPnP MediaRenderer via SSDP on the local network.""" + + def __init__( + self, + udn: str, + description_url: str, + bind_ip: str, + ) -> None: + self.udn = udn + self.description_url = description_url + self.bind_ip = bind_ip + self._transport: asyncio.DatagramTransport | None = None + self._recv_transport: asyncio.DatagramTransport | None = None + self._advertise_task: asyncio.Task[None] | None = None + self._running = False + + async def start(self) -> None: + """Start SSDP advertisement.""" + self._running = True + loop = asyncio.get_running_loop() + + # Sending socket — for NOTIFY alive/byebye to multicast group + send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + send_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + send_sock.setsockopt( + socket.IPPROTO_IP, + socket.IP_MULTICAST_IF, + socket.inet_aton(self.bind_ip), + ) + send_sock.setblocking(False) + self._transport, _ = await loop.create_datagram_endpoint( + lambda: _SSDPSendProtocol(), + sock=send_sock, + ) + + # Receiving socket — join multicast group on port 1900 for M-SEARCH + recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + except (AttributeError, OSError): + pass + recv_sock.bind(("", SSDP_PORT)) + mreq = socket.inet_aton(SSDP_MULTICAST_ADDR) + socket.inet_aton(self.bind_ip) + recv_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + recv_sock.setblocking(False) + self._recv_transport, _ = await loop.create_datagram_endpoint( + lambda: _SSDPRecvProtocol(self), + sock=recv_sock, + ) + + # Send initial alive notifications + await self._send_alive() + + # Periodic re-advertisement + self._advertise_task = asyncio.create_task(self._periodic_alive()) + LOGGER.info("SSDP advertiser started for %s", self.udn) + + async def stop(self) -> None: + """Stop SSDP advertisement and send byebye.""" + self._running = False + if self._advertise_task: + self._advertise_task.cancel() + self._advertise_task = None + await self._send_byebye() + if self._recv_transport: + self._recv_transport.close() + self._recv_transport = None + if self._transport: + self._transport.close() + self._transport = None + LOGGER.info("SSDP advertiser stopped") + + async def _periodic_alive(self) -> None: + """Re-send alive notifications periodically.""" + while self._running: + await asyncio.sleep(SSDP_MAX_AGE // 2) + if self._running: + await self._send_alive() + + async def _send_alive(self) -> None: + """Send ssdp:alive NOTIFY messages for all service types.""" + notification_types = [ + "upnp:rootdevice", + self.udn, + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_RENDERING_CONTROL, + UPNP_SERVICE_CONNECTION_MANAGER, + ] + for nt in notification_types: + usn = f"{self.udn}::{nt}" if nt != self.udn else self.udn + message = ( + "NOTIFY * HTTP/1.1\r\n" + f"HOST: {SSDP_MULTICAST_ADDR}:{SSDP_PORT}\r\n" + f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}\r\n" + f"LOCATION: {self.description_url}\r\n" + f"NT: {nt}\r\n" + "NTS: ssdp:alive\r\n" + f"SERVER: Music Assistant DLNA Receiver/1.0 UPnP/1.0\r\n" + f"USN: {usn}\r\n" + "\r\n" + ) + self._send_datagram(message) + LOGGER.debug("Sent SSDP alive notifications") + + async def _send_byebye(self) -> None: + """Send ssdp:byebye NOTIFY messages.""" + notification_types = [ + "upnp:rootdevice", + self.udn, + UPNP_DEVICE_TYPE, + ] + for nt in notification_types: + usn = f"{self.udn}::{nt}" if nt != self.udn else self.udn + message = ( + "NOTIFY * HTTP/1.1\r\n" + f"HOST: {SSDP_MULTICAST_ADDR}:{SSDP_PORT}\r\n" + f"NT: {nt}\r\n" + "NTS: ssdp:byebye\r\n" + f"USN: {usn}\r\n" + "\r\n" + ) + self._send_datagram(message) + LOGGER.debug("Sent SSDP byebye notifications") + + def _send_datagram(self, message: str) -> None: + """Send a UDP datagram to the SSDP multicast address.""" + if self._transport and not self._transport.is_closing(): + self._transport.sendto( + message.encode("utf-8"), + (SSDP_MULTICAST_ADDR, SSDP_PORT), + ) + + def handle_search(self, data: bytes, addr: tuple[str, int]) -> None: + """Respond to M-SEARCH requests matching our device/service types.""" + text = data.decode("utf-8", errors="ignore") + if "M-SEARCH" not in text: + return + + st = "" + for line in text.splitlines(): + if line.upper().startswith("ST:"): + st = line.split(":", 1)[1].strip() + break + + # Check if the search target matches any of our types + match_targets = { + "ssdp:all", + "upnp:rootdevice", + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_RENDERING_CONTROL, + UPNP_SERVICE_CONNECTION_MANAGER, + self.udn, + } + if st not in match_targets: + return + + usn = f"{self.udn}::{st}" if st != self.udn else self.udn + response = ( + "HTTP/1.1 200 OK\r\n" + f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}\r\n" + f"LOCATION: {self.description_url}\r\n" + f"ST: {st}\r\n" + f"USN: {usn}\r\n" + f"SERVER: Music Assistant DLNA Receiver/1.0 UPnP/1.0\r\n" + "\r\n" + ) + if self._recv_transport and not self._recv_transport.is_closing(): + self._recv_transport.sendto(response.encode("utf-8"), addr) + LOGGER.debug("Responded to M-SEARCH from %s for %s", addr, st) + + +class _SSDPSendProtocol(asyncio.DatagramProtocol): + """Asyncio UDP protocol for the SSDP sending socket (NOTIFY).""" + + def error_received(self, exc: Exception) -> None: + """Handle transport errors.""" + LOGGER.warning("SSDP send transport error: %s", exc) + + +class _SSDPRecvProtocol(asyncio.DatagramProtocol): + """Asyncio UDP protocol for receiving SSDP M-SEARCH requests.""" + + def __init__(self, advertiser: SSDPAdvertiser) -> None: + self._advertiser = advertiser + + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: + """Handle incoming UDP datagrams.""" + self._advertiser.handle_search(data, addr) + + def error_received(self, exc: Exception) -> None: + """Handle transport errors.""" + LOGGER.warning("SSDP recv transport error: %s", exc) diff --git a/tests/providers/dlna_receiver/__init__.py b/tests/providers/dlna_receiver/__init__.py new file mode 100644 index 0000000000..944595c8b9 --- /dev/null +++ b/tests/providers/dlna_receiver/__init__.py @@ -0,0 +1 @@ +"""Tests for the DLNA Receiver provider.""" diff --git a/tests/providers/dlna_receiver/test_eventing.py b/tests/providers/dlna_receiver/test_eventing.py new file mode 100644 index 0000000000..03364be617 --- /dev/null +++ b/tests/providers/dlna_receiver/test_eventing.py @@ -0,0 +1,105 @@ +"""Tests for the GENA eventing module.""" + +from __future__ import annotations + +import pytest + +from provider.eventing import EventingManager + + +@pytest.fixture +def manager() -> EventingManager: + """Create a fresh eventing manager.""" + return EventingManager() + + +def test_subscribe_returns_sid_and_timeout(manager: EventingManager) -> None: + sid, timeout = manager.subscribe("") + assert sid.startswith("uuid:") + assert timeout == 1800 + + +def test_subscribe_custom_timeout(manager: EventingManager) -> None: + sid, timeout = manager.subscribe( + "", + "Second-300", + ) + assert timeout == 300 + + +def test_subscribe_multiple_callbacks(manager: EventingManager) -> None: + sid, _ = manager.subscribe( + "", + ) + sub = manager._subscriptions[sid] + assert len(sub.callback_urls) == 2 + + +def test_subscribe_no_callback_raises(manager: EventingManager) -> None: + with pytest.raises(ValueError): + manager.subscribe("") + + +def test_unsubscribe(manager: EventingManager) -> None: + sid, _ = manager.subscribe("") + assert sid in manager._subscriptions + manager.unsubscribe(sid) + assert sid not in manager._subscriptions + + +def test_unsubscribe_unknown_is_noop(manager: EventingManager) -> None: + manager.unsubscribe("uuid:nonexistent") # should not raise + + +def test_renew(manager: EventingManager) -> None: + sid, _ = manager.subscribe("", "Second-100") + new_timeout = manager.renew(sid, "Second-600") + assert new_timeout == 600 + + +def test_renew_unknown_raises(manager: EventingManager) -> None: + with pytest.raises(KeyError): + manager.renew("uuid:nonexistent") + + +def test_parse_callback_header() -> None: + urls = EventingManager._parse_callback_header( + "", + ) + assert urls == ["http://192.168.1.5:8080/event", "http://10.0.0.1:9000/ev"] + + +def test_parse_callback_header_single() -> None: + urls = EventingManager._parse_callback_header("") + assert urls == ["http://host:1234/cb"] + + +def test_parse_timeout_default() -> None: + assert EventingManager._parse_timeout(None) == 1800 + assert EventingManager._parse_timeout("") == 1800 + + +def test_parse_timeout_infinite() -> None: + assert EventingManager._parse_timeout("infinite") == 1800 + + +def test_parse_timeout_seconds() -> None: + assert EventingManager._parse_timeout("Second-300") == 300 + assert EventingManager._parse_timeout("Second-7200") == 7200 + + +def test_build_propertyset() -> None: + xml = EventingManager._build_propertyset({"Volume": "75", "Mute": "0"}) + assert "e:propertyset" in xml + assert "75" in xml + assert "0" in xml + + +def test_build_propertyset_escapes_values() -> None: + xml = EventingManager._build_propertyset({"Title": "Tom & Jerry"}) + assert "Tom & Jerry" in xml + + +async def test_notify_no_subscribers(manager: EventingManager) -> None: + """Notify with no subscribers should be a no-op.""" + await manager.notify({"TransportState": "PLAYING"}) diff --git a/tests/providers/dlna_receiver/test_provider.py b/tests/providers/dlna_receiver/test_provider.py new file mode 100644 index 0000000000..8ee621fe9a --- /dev/null +++ b/tests/providers/dlna_receiver/test_provider.py @@ -0,0 +1,68 @@ +"""Tests for the multi-player provider logic. + +Provider module imports music_assistant.models which is only available +when running inside MA. We test the pure utility functions directly +and guard the full-provider import with pytest.importorskip. +""" + +from __future__ import annotations + +import uuid + +from provider.constants import UDN_NAMESPACE + + +def _deterministic_udn(player_id: str) -> str: + """Replicate the UDN generation logic for standalone testing.""" + namespace = uuid.uuid5(uuid.NAMESPACE_URL, UDN_NAMESPACE) + return f"uuid:{uuid.uuid5(namespace, player_id or '__default__')}" + + +def test_deterministic_udn_same_input() -> None: + """Same player_id always produces the same UDN.""" + udn1 = _deterministic_udn("player_kitchen") + udn2 = _deterministic_udn("player_kitchen") + assert udn1 == udn2 + assert udn1.startswith("uuid:") + + +def test_deterministic_udn_different_inputs() -> None: + """Different player_ids produce different UDNs.""" + udn1 = _deterministic_udn("player_kitchen") + udn2 = _deterministic_udn("player_bedroom") + assert udn1 != udn2 + + +def test_deterministic_udn_default() -> None: + """Empty player_id produces a stable UDN for the default instance.""" + udn1 = _deterministic_udn("") + udn2 = _deterministic_udn("") + assert udn1 == udn2 + assert udn1 != _deterministic_udn("some_player") + + +def test_deterministic_udn_is_valid_uuid() -> None: + """Generated UDN contains a valid UUID5.""" + udn = _deterministic_udn("test_player") + uuid_str = udn.replace("uuid:", "") + parsed = uuid.UUID(uuid_str) + assert parsed.version == 5 + + +def test_multiple_renderers_different_ports() -> None: + """Verify multiple renderers can bind to different ports.""" + from provider.renderer import UPnPRenderer + + r1 = UPnPRenderer("Player 1", "127.0.0.1", 9001) + r2 = UPnPRenderer("Player 2", "127.0.0.1", 9002) + assert r1.http_port != r2.http_port + assert r1.udn != r2.udn + + +def test_renderer_with_explicit_udn() -> None: + """Renderer uses provided UDN instead of generating one.""" + from provider.renderer import UPnPRenderer + + udn = _deterministic_udn("test_player") + r = UPnPRenderer("Test", "127.0.0.1", 9999, udn=udn) + assert r.udn == udn diff --git a/tests/providers/dlna_receiver/test_renderer.py b/tests/providers/dlna_receiver/test_renderer.py new file mode 100644 index 0000000000..9f8c295cf8 --- /dev/null +++ b/tests/providers/dlna_receiver/test_renderer.py @@ -0,0 +1,251 @@ +"""Tests for the UPnP renderer SOAP handling.""" + +from __future__ import annotations + +import pytest +from aiohttp.test_utils import TestClient, TestServer + +from provider.renderer import UPnPRenderer + + +@pytest.fixture +def renderer() -> UPnPRenderer: + """Create a test renderer instance.""" + return UPnPRenderer( + friendly_name="Test Renderer", + bind_ip="127.0.0.1", + http_port=0, + ) + + +@pytest.fixture +async def client(renderer: UPnPRenderer) -> TestClient: + """Create an aiohttp test client for the renderer.""" + server = TestServer(renderer._app) + _client = TestClient(server) + await _client.start_server() + yield _client + await _client.close() + + +async def test_device_description(client: TestClient) -> None: + resp = await client.get("/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "MediaRenderer" in text + assert "Test Renderer" in text + + +async def test_get_transport_info(client: TestClient) -> None: + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#GetTransportInfo"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "NO_MEDIA_PRESENT" in text + + +async def test_set_volume(client: TestClient, renderer: UPnPRenderer) -> None: + volume_received: list[int] = [] + + async def _on_volume(v: int) -> None: + volume_received.append(v) + + renderer.on_set_volume = _on_volume + + resp = await client.post( + "/RenderingControl/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:RenderingControl:1#SetVolume"', + }, + data="75", + ) + assert resp.status == 200 + assert renderer.volume == 75 + assert volume_received == [75] + + +async def test_get_protocol_info(client: TestClient) -> None: + resp = await client.post( + "/ConnectionManager/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:ConnectionManager:1#GetProtocolInfo"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "audio/flac" in text + + +# ------------------------------------------------------------------ +# SCPD tests — verify full service descriptions are served +# ------------------------------------------------------------------ + + +async def test_av_transport_scpd(client: TestClient) -> None: + resp = await client.get("/AVTransport/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "SetAVTransportURI" in text + assert "Play" in text + assert "Seek" in text + # Verify state variables are present (not just action names) + assert "TransportState" in text + assert "serviceStateTable" in text + assert "argumentList" in text + + +async def test_rendering_control_scpd(client: TestClient) -> None: + resp = await client.get("/RenderingControl/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "GetVolume" in text + assert "SetMute" in text + assert "Volume" in text + assert "allowedValueRange" in text + + +async def test_connection_manager_scpd(client: TestClient) -> None: + resp = await client.get("/ConnectionManager/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "GetProtocolInfo" in text + assert "GetCurrentConnectionInfo" in text + assert "SinkProtocolInfo" in text + + +# ------------------------------------------------------------------ +# SOAP action tests +# ------------------------------------------------------------------ + + +async def test_play_pause_stop(client: TestClient, renderer: UPnPRenderer) -> None: + """Test transport state transitions via SOAP actions.""" + # SetAVTransportURI + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI"', + }, + data="http://example.com/stream.flac", + ) + assert resp.status == 200 + assert renderer.current_uri == "http://example.com/stream.flac" + assert renderer.transport_state == "STOPPED" + + # Play + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Play"', + }, + data="", + ) + assert resp.status == 200 + assert renderer.transport_state == "PLAYING" + + # Pause + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Pause"', + }, + data="", + ) + assert resp.status == 200 + assert renderer.transport_state == "PAUSED_PLAYBACK" + + # Stop + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Stop"', + }, + data="", + ) + assert resp.status == 200 + assert renderer.transport_state == "STOPPED" + + +async def test_seek_action(client: TestClient) -> None: + """Test that Seek action returns success (no-op).""" + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Seek"', + }, + data="REL_TIME00:01:30", + ) + assert resp.status == 200 + + +async def test_get_position_info(client: TestClient) -> None: + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#GetPositionInfo"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "RelTime" in text + + +async def test_get_connection_info(client: TestClient) -> None: + """Test GetCurrentConnectionInfo action.""" + resp = await client.post( + "/ConnectionManager/control", + headers={ + "SOAPACTION": ( + '"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionInfo"' + ), + }, + data="0", + ) + assert resp.status == 200 + text = await resp.text() + assert "Direction" in text + assert "Input" in text + + +async def test_invalid_action(client: TestClient) -> None: + """Test that unknown actions return SOAP error.""" + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#NonExistentAction"', + }, + data="", + ) + assert resp.status == 500 + text = await resp.text() + assert "Invalid Action" in text + + +async def test_set_mute(client: TestClient, renderer: UPnPRenderer) -> None: + resp = await client.post( + "/RenderingControl/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:RenderingControl:1#SetMute"', + }, + data="1", + ) + assert resp.status == 200 + assert renderer.mute is True + + resp = await client.post( + "/RenderingControl/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:RenderingControl:1#GetMute"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "1" in text diff --git a/tests/providers/dlna_receiver/test_ssdp.py b/tests/providers/dlna_receiver/test_ssdp.py new file mode 100644 index 0000000000..fb6a8e0629 --- /dev/null +++ b/tests/providers/dlna_receiver/test_ssdp.py @@ -0,0 +1,26 @@ +"""Tests for SSDP advertiser.""" + +from __future__ import annotations + +from provider.ssdp import SSDPAdvertiser + + +def test_ssdp_advertiser_init() -> None: + adv = SSDPAdvertiser( + udn="uuid:test-1234", + description_url="http://192.168.1.100:8298/description.xml", + bind_ip="192.168.1.100", + ) + assert adv.udn == "uuid:test-1234" + assert adv.bind_ip == "192.168.1.100" + assert "8298" in adv.description_url + + +def test_handle_search_ignores_non_matching() -> None: + adv = SSDPAdvertiser( + udn="uuid:test-1234", + description_url="http://192.168.1.100:8298/description.xml", + bind_ip="192.168.1.100", + ) + # Non-M-SEARCH data should be silently ignored (no exception) + adv.handle_search(b"NOTIFY * HTTP/1.1\r\n", ("192.168.1.1", 1900)) From 3a678973c5d61ba8a0dd3339b7c0e04d8c515b5e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 7 Apr 2026 21:51:24 +0000 Subject: [PATCH 2/2] feat(dlna_receiver): sync provider from ma-provider-dlna-receiver v1.0.0 --- music_assistant/providers/dlna_receiver/manifest.json | 2 +- music_assistant/providers/dlna_receiver/provider.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/music_assistant/providers/dlna_receiver/manifest.json b/music_assistant/providers/dlna_receiver/manifest.json index 7c315b10bf..842806785b 100644 --- a/music_assistant/providers/dlna_receiver/manifest.json +++ b/music_assistant/providers/dlna_receiver/manifest.json @@ -7,7 +7,7 @@ "credits": [ "[async-upnp-client](https://github.com/StevenLooman/async_upnp_client)" ], - "requirements": ["defusedxml>=0.7.0"], + "requirements": [], "documentation": "https://trudenboy.github.io/ma-provider-dlna-receiver/", "stage": "experimental", "multi_instance": false, diff --git a/music_assistant/providers/dlna_receiver/provider.py b/music_assistant/providers/dlna_receiver/provider.py index bf3f1cc404..dcd72ab504 100644 --- a/music_assistant/providers/dlna_receiver/provider.py +++ b/music_assistant/providers/dlna_receiver/provider.py @@ -15,12 +15,12 @@ import socket import time import uuid +import xml.etree.ElementTree as ET from collections.abc import AsyncGenerator from dataclasses import dataclass from html import unescape from typing import TYPE_CHECKING -import defusedxml.ElementTree as DefusedET from music_assistant_models.config_entries import ConfigValueType # noqa: F401 from music_assistant_models.enums import MediaType, ProviderFeature from music_assistant_models.streamdetails import StreamMetadata @@ -387,7 +387,7 @@ def _parse_didl_metadata(metadata: str | None) -> dict[str, str | None]: metadata = unescape(metadata) try: - root = DefusedET.fromstring(metadata) + root = ET.fromstring(metadata) # noqa: S314 except Exception: LOGGER.info("Failed to parse DIDL-Lite metadata: %s", metadata[:300]) return result