diff --git a/music_assistant/providers/tagplayer/__init__.py b/music_assistant/providers/tagplayer/__init__.py new file mode 100644 index 0000000000..bdc746847e --- /dev/null +++ b/music_assistant/providers/tagplayer/__init__.py @@ -0,0 +1,230 @@ +""" +Tag Player plugin for Music Assistant. + +Links arbitrary identifiers (NFC tags, QR codes, etc.) to media items for quick playback. +Uses provider mappings on existing library items — no custom database tables. + +Usage: +- Link tags via API: tagplayer/link with tag_id and target (e.g., "playlist/42") +- Play via API: tagplayer/play with tag_id and player_id +- Play via URI: tagplayer:/// (media type must match the linked item) +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import MediaType, QueueOption +from music_assistant_models.errors import MediaNotFoundError +from music_assistant_models.media_items import ProviderMapping + +from music_assistant.models.plugin import PluginProvider + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig + from music_assistant_models.media_items import MediaItemType + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + +# Media types that support tag mappings +TAGGABLE_MEDIA_TYPES = ( + MediaType.TRACK, + MediaType.ALBUM, + MediaType.PLAYLIST, + MediaType.ARTIST, + MediaType.RADIO, + MediaType.AUDIOBOOK, + MediaType.PODCAST, +) + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider instance with given configuration.""" + return TagPlayerProvider(mass, manifest, config, set()) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + return () + + +class TagPlayerProvider(PluginProvider): + """Tag Player plugin that links identifiers to existing library items.""" + + _unregister_commands: list[Callable[[], None]] + + async def loaded_in_mass(self) -> None: + """Register API commands after the provider is loaded.""" + self._unregister_commands = [ + self.mass.register_api_command("tagplayer/link", self.link_tag), + self.mass.register_api_command("tagplayer/unlink", self.unlink_tag), + self.mass.register_api_command("tagplayer/get", self.get_tag), + self.mass.register_api_command("tagplayer/list", self.list_tags), + self.mass.register_api_command("tagplayer/play", self.play_tag), + ] + + async def unload(self, is_removed: bool = False) -> None: + """Unregister API commands and clean up.""" + for unregister in self._unregister_commands: + unregister() + self._unregister_commands.clear() + + async def link_tag(self, tag_id: str, target: str) -> dict[str, Any]: + """Link a tag identifier to a library media item. + + Adds a provider mapping with available=False on the target item so the tag + resolves to it via URI (tagplayer:///) without interfering + with streaming through the item's real providers. + + :param tag_id: The tag identifier (NFC serial, QR code content, etc.). + :param target: Media path like "track/42" or "playlist/5". + """ + if not tag_id or not tag_id.strip(): + raise ValueError("tag_id cannot be empty") + + media_type, library_id = self._parse_target(target) + controller = self.mass.music.get_controller(media_type) + + # verify the library item exists + await controller.get_library_item(library_id) + + # check if this tag is already linked to a different item and unlink it + existing = await self._find_tagged_item(tag_id) + if existing is not None: + old_type, old_item = existing + old_ctrl = self.mass.music.get_controller(old_type) + await old_ctrl.remove_provider_mapping(int(old_item.item_id), self.instance_id, tag_id) + + mapping = ProviderMapping( + item_id=tag_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + available=False, + ) + await controller.add_provider_mapping(library_id, mapping) + + uri = f"{self.domain}://{media_type.value}/{tag_id}" + self.logger.debug( + "Linked tag '%s' to %s/%d (URI: %s)", tag_id, media_type.value, library_id, uri + ) + return {"tag_id": tag_id, "target": target, "uri": uri} + + async def unlink_tag(self, tag_id: str) -> dict[str, str]: + """Remove a tag mapping. + + :param tag_id: The tag identifier to unlink. + """ + result = await self._find_tagged_item(tag_id) + if result is None: + raise MediaNotFoundError(f"Unknown tag: {tag_id}") + + media_type, library_item = result + controller = self.mass.music.get_controller(media_type) + await controller.remove_provider_mapping( + int(library_item.item_id), self.instance_id, tag_id + ) + + self.logger.debug("Unlinked tag '%s'", tag_id) + return {"tag_id": tag_id} + + async def get_tag(self, tag_id: str) -> dict[str, Any]: + """Get the mapping for a single tag. + + :param tag_id: The tag identifier to look up. + """ + result = await self._find_tagged_item(tag_id) + if result is None: + raise MediaNotFoundError(f"Unknown tag: {tag_id}") + + media_type, library_item = result + uri = f"{self.domain}://{media_type.value}/{tag_id}" + return { + "tag_id": tag_id, + "media_type": media_type.value, + "item_id": int(library_item.item_id), + "name": library_item.name, + "uri": uri, + } + + async def list_tags(self) -> list[dict[str, Any]]: + """List all tag mappings.""" + tags: list[dict[str, Any]] = [] + for media_type in TAGGABLE_MEDIA_TYPES: + controller = self.mass.music.get_controller(media_type) + items = await controller.library_items(provider=self.instance_id) + for item in items: + for mapping in item.provider_mappings: + if mapping.provider_instance == self.instance_id: + tags.append( + { + "tag_id": mapping.item_id, + "media_type": media_type.value, + "item_id": int(item.item_id), + "name": item.name, + "uri": f"{self.domain}://{media_type.value}/{mapping.item_id}", + } + ) + return tags + + async def play_tag( + self, + tag_id: str, + player_id: str, + queue_option: QueueOption = QueueOption.PLAY, + ) -> None: + """Resolve a tag and play the linked media on a player. + + :param tag_id: The tag identifier to play. + :param player_id: The player to play on. + :param queue_option: How to add to queue (default: PLAY). + """ + result = await self._find_tagged_item(tag_id) + if result is None: + raise MediaNotFoundError(f"Unknown tag: {tag_id}") + + media_type, library_item = result + self.logger.debug( + "Playing tag '%s' -> %s/%s on %s", + tag_id, + media_type.value, + library_item.item_id, + player_id, + ) + await self.mass.player_queues.play_media(player_id, library_item, queue_option) + + async def _find_tagged_item(self, tag_id: str) -> tuple[MediaType, MediaItemType] | None: + """Search all media types for a library item with the given tag mapping. + + :param tag_id: The tag identifier to search for. + """ + for media_type in TAGGABLE_MEDIA_TYPES: + controller = self.mass.music.get_controller(media_type) + if item := await controller.get_library_item_by_prov_id(tag_id, self.instance_id): + return (media_type, item) + return None + + @staticmethod + def _parse_target(target: str) -> tuple[MediaType, int]: + """Parse a target string like 'track/42' into (MediaType, library_id). + + :param target: Target string in the format "media_type/library_id". + """ + try: + media_type_str, item_id_str = target.strip("/").split("/", 1) + media_type = MediaType(media_type_str) + if media_type not in TAGGABLE_MEDIA_TYPES: + raise ValueError + return (media_type, int(item_id_str)) + except (ValueError, KeyError) as err: + msg = f"Invalid target format: {target}. Expected: type/id (e.g., track/42)" + raise ValueError(msg) from err diff --git a/music_assistant/providers/tagplayer/manifest.json b/music_assistant/providers/tagplayer/manifest.json new file mode 100644 index 0000000000..eb059911b7 --- /dev/null +++ b/music_assistant/providers/tagplayer/manifest.json @@ -0,0 +1,11 @@ +{ + "type": "plugin", + "domain": "tagplayer", + "stage": "alpha", + "name": "Tag Player", + "description": "Link NFC tags, QR codes, or any identifier to media items for quick playback.", + "codeowners": ["@ztripez"], + "requirements": [], + "documentation": "https://music-assistant.io/music-providers/tagplayer/", + "icon": "nfc" +} diff --git a/tests/providers/test_tagplayer.py b/tests/providers/test_tagplayer.py new file mode 100644 index 0000000000..a9793ab70e --- /dev/null +++ b/tests/providers/test_tagplayer.py @@ -0,0 +1,421 @@ +"""Tests for Tag Player plugin.""" + +from __future__ import annotations + +from typing import cast +from unittest.mock import AsyncMock, MagicMock + +import pytest +from music_assistant_models.enums import MediaType, QueueOption +from music_assistant_models.errors import MediaNotFoundError +from music_assistant_models.media_items import ProviderMapping + +from music_assistant.providers.tagplayer import TagPlayerProvider + + +def _mock_library_item( + item_id: int, + name: str, + media_type: MediaType, + provider_mappings: set[ProviderMapping] | None = None, +) -> MagicMock: + """Create a mock library item with the given properties.""" + item = MagicMock() + item.item_id = str(item_id) + item.name = name + item.media_type = media_type + item.provider_mappings = provider_mappings or set() + return item + + +def _mock_controller( + media_type: MediaType, + items_by_prov_id: dict[str, MagicMock] | None = None, + library_items: list[MagicMock] | None = None, +) -> MagicMock: + """Create a mock media type controller. + + :param media_type: The media type this controller handles. + :param items_by_prov_id: Map of prov_item_id -> library item for get_library_item_by_prov_id. + :param library_items: Items returned by library_items(). + """ + ctrl = MagicMock() + ctrl.media_type = media_type + + prov_id_map = items_by_prov_id or {} + + async def _get_by_prov_id( + item_id: str, + provider_instance_id_or_domain: str, # noqa: ARG001 + ) -> MagicMock | None: + return prov_id_map.get(item_id) + + ctrl.get_library_item_by_prov_id = AsyncMock(side_effect=_get_by_prov_id) + ctrl.get_library_item = AsyncMock() + ctrl.add_provider_mapping = AsyncMock() + ctrl.remove_provider_mapping = AsyncMock() + ctrl.library_items = AsyncMock(return_value=library_items or []) + return ctrl + + +@pytest.fixture +def mock_mass() -> MagicMock: + """Create a mock MusicAssistant instance.""" + mass = MagicMock() + mass.register_api_command = MagicMock(side_effect=lambda *_a, **_kw: MagicMock()) + + # default empty controllers for all taggable types + controllers: dict[MediaType, MagicMock] = {} + for mt in ( + MediaType.TRACK, + MediaType.ALBUM, + MediaType.PLAYLIST, + MediaType.ARTIST, + MediaType.RADIO, + MediaType.AUDIOBOOK, + MediaType.PODCAST, + ): + controllers[mt] = _mock_controller(mt) + + mass.music = MagicMock() + mass.music.get_controller = MagicMock(side_effect=lambda mt: controllers[mt]) + + mass.player_queues = MagicMock() + mass.player_queues.play_media = AsyncMock() + + # expose controllers dict for per-test customization + mass._test_controllers = controllers + return mass + + +@pytest.fixture +def mock_manifest() -> MagicMock: + """Create a mock manifest.""" + manifest = MagicMock() + manifest.domain = "tagplayer" + manifest.name = "Tag Player" + return manifest + + +@pytest.fixture +def mock_config() -> MagicMock: + """Create a mock config.""" + config = MagicMock() + config.instance_id = "tagplayer" + config.get_value.return_value = "GLOBAL" + return config + + +@pytest.fixture +async def provider( + mock_mass: MagicMock, mock_manifest: MagicMock, mock_config: MagicMock +) -> TagPlayerProvider: + """Create a TagPlayerProvider instance with API commands registered.""" + prov = TagPlayerProvider(mock_mass, mock_manifest, mock_config, set()) + await prov.loaded_in_mass() + return prov + + +class TestLifecycle: + """Tests for provider lifecycle.""" + + async def test_registers_five_commands( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """All five API commands should be registered on load.""" + assert mock_mass.register_api_command.call_count == 5 + commands = [c[0][0] for c in mock_mass.register_api_command.call_args_list] + assert sorted(commands) == [ + "tagplayer/get", + "tagplayer/link", + "tagplayer/list", + "tagplayer/play", + "tagplayer/unlink", + ] + + async def test_unload_deregisters_commands(self, provider: TagPlayerProvider) -> None: + """Unload should call every unregister handle and clear the list.""" + handles = [cast("MagicMock", h) for h in provider._unregister_commands] + assert len(handles) == 5 + await provider.unload() + for handle in handles: + handle.assert_called_once() + assert len(provider._unregister_commands) == 0 + + +class TestLinkTag: + """Tests for the tagplayer/link command.""" + + async def test_link_creates_provider_mapping( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Linking a tag should add a provider mapping with available=False.""" + ctrl = mock_mass._test_controllers[MediaType.TRACK] + mock_item = _mock_library_item(42, "Test Track", MediaType.TRACK) + ctrl.get_library_item.return_value = mock_item + + result = await provider.link_tag("nfc-001", "track/42") + + assert result["tag_id"] == "nfc-001" + assert result["uri"] == "tagplayer://track/nfc-001" + + ctrl.add_provider_mapping.assert_called_once() + call_args = ctrl.add_provider_mapping.call_args + assert call_args[0][0] == 42 # library_id + mapping: ProviderMapping = call_args[0][1] + assert mapping.item_id == "nfc-001" + assert mapping.provider_domain == "tagplayer" + assert mapping.provider_instance == "tagplayer" + assert mapping.available is False + + async def test_link_replaces_existing_mapping( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Re-linking a tag should remove the old mapping first.""" + # existing tag is on a playlist + old_item = _mock_library_item(10, "Old Playlist", MediaType.PLAYLIST) + playlist_ctrl = mock_mass._test_controllers[MediaType.PLAYLIST] + playlist_ctrl.get_library_item_by_prov_id = AsyncMock(return_value=old_item) + + # new target is a track + track_ctrl = mock_mass._test_controllers[MediaType.TRACK] + new_item = _mock_library_item(42, "New Track", MediaType.TRACK) + track_ctrl.get_library_item.return_value = new_item + + await provider.link_tag("nfc-001", "track/42") + + # old mapping should be removed + playlist_ctrl.remove_provider_mapping.assert_called_once_with(10, "tagplayer", "nfc-001") + # new mapping should be added + track_ctrl.add_provider_mapping.assert_called_once() + + async def test_link_empty_tag_id_raises(self, provider: TagPlayerProvider) -> None: + """Empty or whitespace-only tag_id should raise ValueError.""" + with pytest.raises(ValueError, match="tag_id cannot be empty"): + await provider.link_tag("", "track/42") + with pytest.raises(ValueError, match="tag_id cannot be empty"): + await provider.link_tag(" ", "track/42") + + async def test_link_invalid_target_raises(self, provider: TagPlayerProvider) -> None: + """Invalid target format should raise ValueError.""" + with pytest.raises(ValueError, match="Invalid target format"): + await provider.link_tag("nfc-001", "invalid") + with pytest.raises(ValueError, match="Invalid target format"): + await provider.link_tag("nfc-001", "track/not-a-number") + + +class TestUnlinkTag: + """Tests for the tagplayer/unlink command.""" + + async def test_unlink_removes_mapping( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Unlinking should remove the provider mapping from the library item.""" + mock_item = _mock_library_item(42, "Test Track", MediaType.TRACK) + ctrl = mock_mass._test_controllers[MediaType.TRACK] + ctrl.get_library_item_by_prov_id = AsyncMock(return_value=mock_item) + + result = await provider.unlink_tag("nfc-001") + + assert result["tag_id"] == "nfc-001" + ctrl.remove_provider_mapping.assert_called_once_with(42, "tagplayer", "nfc-001") + + async def test_unlink_unknown_tag_raises(self, provider: TagPlayerProvider) -> None: + """Unlinking a tag that doesn't exist should raise MediaNotFoundError.""" + with pytest.raises(MediaNotFoundError, match="Unknown tag"): + await provider.unlink_tag("nonexistent") + + +class TestGetTag: + """Tests for the tagplayer/get command.""" + + async def test_get_returns_tag_info( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Getting a tag should return its mapping details.""" + mock_item = _mock_library_item(5, "Party Mix", MediaType.PLAYLIST) + ctrl = mock_mass._test_controllers[MediaType.PLAYLIST] + ctrl.get_library_item_by_prov_id = AsyncMock(return_value=mock_item) + + result = await provider.get_tag("party-mix") + + assert result["tag_id"] == "party-mix" + assert result["media_type"] == "playlist" + assert result["item_id"] == 5 + assert result["name"] == "Party Mix" + assert result["uri"] == "tagplayer://playlist/party-mix" + + async def test_get_unknown_tag_raises(self, provider: TagPlayerProvider) -> None: + """Getting a tag that doesn't exist should raise MediaNotFoundError.""" + with pytest.raises(MediaNotFoundError, match="Unknown tag"): + await provider.get_tag("nonexistent") + + +class TestListTags: + """Tests for the tagplayer/list command.""" + + async def test_list_returns_all_tags( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Listing should return tags across all media types.""" + tag_mapping = ProviderMapping( + item_id="nfc-001", + provider_domain="tagplayer", + provider_instance="tagplayer", + available=False, + ) + other_mapping = ProviderMapping( + item_id="spotify:abc", + provider_domain="spotify", + provider_instance="spotify--xyz", + available=True, + ) + track = _mock_library_item( + 42, + "Tagged Track", + MediaType.TRACK, + provider_mappings={tag_mapping, other_mapping}, + ) + mock_mass._test_controllers[MediaType.TRACK].library_items = AsyncMock(return_value=[track]) + + result = await provider.list_tags() + + assert len(result) == 1 + assert result[0]["tag_id"] == "nfc-001" + assert result[0]["media_type"] == "track" + assert result[0]["item_id"] == 42 + assert result[0]["name"] == "Tagged Track" + + async def test_list_empty(self, provider: TagPlayerProvider) -> None: + """Listing with no tags should return an empty list.""" + result = await provider.list_tags() + assert result == [] + + async def test_list_across_media_types( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Tags from different media types should all appear.""" + track_tag = ProviderMapping( + item_id="tag-a", + provider_domain="tagplayer", + provider_instance="tagplayer", + available=False, + ) + playlist_tag = ProviderMapping( + item_id="tag-b", + provider_domain="tagplayer", + provider_instance="tagplayer", + available=False, + ) + track = _mock_library_item(1, "Track", MediaType.TRACK, {track_tag}) + playlist = _mock_library_item(2, "Playlist", MediaType.PLAYLIST, {playlist_tag}) + + mock_mass._test_controllers[MediaType.TRACK].library_items = AsyncMock(return_value=[track]) + mock_mass._test_controllers[MediaType.PLAYLIST].library_items = AsyncMock( + return_value=[playlist] + ) + + result = await provider.list_tags() + + assert len(result) == 2 + tag_ids = {t["tag_id"] for t in result} + assert tag_ids == {"tag-a", "tag-b"} + + +class TestPlayTag: + """Tests for the tagplayer/play command.""" + + async def test_play_resolves_and_plays( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Playing a tag should resolve it and call play_media.""" + mock_item = _mock_library_item(42, "Test Track", MediaType.TRACK) + ctrl = mock_mass._test_controllers[MediaType.TRACK] + ctrl.get_library_item_by_prov_id = AsyncMock(return_value=mock_item) + + await provider.play_tag("nfc-001", "living_room") + + mock_mass.player_queues.play_media.assert_called_once_with( + "living_room", mock_item, QueueOption.PLAY + ) + + async def test_play_with_queue_option( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Playing with a custom queue option should pass it through.""" + mock_item = _mock_library_item(42, "Test Track", MediaType.TRACK) + ctrl = mock_mass._test_controllers[MediaType.TRACK] + ctrl.get_library_item_by_prov_id = AsyncMock(return_value=mock_item) + + await provider.play_tag("nfc-001", "living_room", QueueOption.ADD) + + mock_mass.player_queues.play_media.assert_called_once_with( + "living_room", mock_item, QueueOption.ADD + ) + + async def test_play_unknown_tag_raises(self, provider: TagPlayerProvider) -> None: + """Playing a tag that doesn't exist should raise MediaNotFoundError.""" + with pytest.raises(MediaNotFoundError, match="Unknown tag"): + await provider.play_tag("nonexistent", "living_room") + + +class TestFindTaggedItem: + """Tests for the _find_tagged_item helper.""" + + async def test_finds_item_in_first_matching_type( + self, provider: TagPlayerProvider, mock_mass: MagicMock + ) -> None: + """Should return the first matching item across media types.""" + mock_item = _mock_library_item(7, "An Album", MediaType.ALBUM) + mock_mass._test_controllers[MediaType.ALBUM].get_library_item_by_prov_id = AsyncMock( + return_value=mock_item + ) + + result = await provider._find_tagged_item("album-tag") + + assert result is not None + media_type, item = result + assert media_type == MediaType.ALBUM + assert item.name == "An Album" + + async def test_returns_none_when_not_found(self, provider: TagPlayerProvider) -> None: + """Should return None when no media type has the tag.""" + result = await provider._find_tagged_item("nonexistent") + assert result is None + + +class TestParseTarget: + """Tests for the _parse_target static method.""" + + def test_track_target(self) -> None: + """Should parse track targets.""" + media_type, item_id = TagPlayerProvider._parse_target("track/42") + assert media_type == MediaType.TRACK + assert item_id == 42 + + def test_playlist_target(self) -> None: + """Should parse playlist targets.""" + media_type, item_id = TagPlayerProvider._parse_target("playlist/5") + assert media_type == MediaType.PLAYLIST + assert item_id == 5 + + def test_strips_leading_slash(self) -> None: + """Should handle leading slashes.""" + media_type, item_id = TagPlayerProvider._parse_target("/album/10") + assert media_type == MediaType.ALBUM + assert item_id == 10 + + def test_invalid_format_raises(self) -> None: + """Should raise ValueError for invalid formats.""" + with pytest.raises(ValueError, match="Invalid target format"): + TagPlayerProvider._parse_target("invalid") + + def test_non_integer_id_raises(self) -> None: + """Should raise ValueError for non-integer IDs.""" + with pytest.raises(ValueError, match="Invalid target format"): + TagPlayerProvider._parse_target("track/abc") + + def test_invalid_media_type_raises(self) -> None: + """Should raise ValueError for unknown media types.""" + with pytest.raises(ValueError, match="Invalid target format"): + TagPlayerProvider._parse_target("widget/42")