From b4ad388b131e5e6789c8c28e8258f3702ec19b33 Mon Sep 17 00:00:00 2001 From: Gav Date: Wed, 1 Apr 2026 02:26:30 +1000 Subject: [PATCH 1/7] Add radio stream artwork lookup from ICY/HLS metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Looks up artist/track artwork for radio streams using ICY and HLS metadata. Uses MusicBrainz to find release groups, then queries fanart.tv and TheAudioDB for artwork. Prefers single artwork first, falls back to album, then artist. Includes artist name normalization for radio metadata (handles "Last, First" format, business suffixes, known band names), swap detection for stations that send "Track - Artist" format, and "The " prefix handling for searches. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- music_assistant/controllers/metadata.py | 556 +++++++++++++++++- music_assistant/controllers/streams/audio.py | 67 ++- music_assistant/helpers/tags.py | 8 + music_assistant/helpers/util.py | 74 +++ .../providers/theaudiodb/__init__.py | 2 + 5 files changed, 702 insertions(+), 5 deletions(-) diff --git a/music_assistant/controllers/metadata.py b/music_assistant/controllers/metadata.py index a623bbad9f..5da4caa5da 100644 --- a/music_assistant/controllers/metadata.py +++ b/music_assistant/controllers/metadata.py @@ -11,7 +11,7 @@ from base64 import b64encode from contextlib import suppress from time import time -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast from uuid import NAMESPACE_URL, uuid4, uuid5 import aiofiles @@ -21,12 +21,19 @@ from music_assistant_models.enums import ( AlbumType, ConfigEntryType, + ExternalID, ImageType, MediaType, ProviderFeature, ProviderType, ) -from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError +from music_assistant_models.errors import ( + InvalidDataError, + MediaNotFoundError, + MusicAssistantError, + ProviderUnavailableError, + ResourceTemporarilyUnavailable, +) from music_assistant_models.helpers import get_global_cache_value from music_assistant_models.media_items import ( Album, @@ -35,11 +42,13 @@ BrowseFolder, ItemMapping, MediaItemImage, + MediaItemMetadata, MediaItemType, Playlist, Podcast, Track, ) +from music_assistant_models.streamdetails import StreamMetadata from music_assistant_models.unique_list import UniqueList from music_assistant.constants import ( @@ -66,17 +75,19 @@ get_image_thumb, ) from music_assistant.helpers.security import is_safe_path +from music_assistant.helpers.tags import split_artists from music_assistant.helpers.throttle_retry import Throttler -from music_assistant.helpers.util import try_parse_int +from music_assistant.helpers.util import clean_title_for_search, try_parse_int from music_assistant.models.core_controller import CoreController from music_assistant.models.music_provider import MusicProvider if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig + from music_assistant_models.streamdetails import StreamDetails from music_assistant import MusicAssistant from music_assistant.models.metadata_provider import MetadataProvider - from music_assistant.providers.musicbrainz import MusicbrainzProvider + from music_assistant.providers.musicbrainz import MusicbrainzProvider, MusicBrainzReleaseGroup def _detect_image_format(path: str) -> str: @@ -131,6 +142,20 @@ def _detect_image_format(path: str) -> str: } DEFAULT_LANGUAGE = "en_US" +REFRESH_INTERVAL_ARTISTS = 60 * 60 * 24 * 90 # 90 days +REFRESH_INTERVAL_ALBUMS = 60 * 60 * 24 * 90 # 90 days +REFRESH_INTERVAL_TRACKS = 60 * 60 * 24 * 90 # 90 days +REFRESH_INTERVAL_AUDIOBOOKS = 60 * 60 * 24 * 90 # 90 days +REFRESH_INTERVAL_PODCASTS = 60 * 60 * 24 * 90 # 90 days +REFRESH_INTERVAL_PLAYLISTS = 60 * 60 * 24 * 14 # 14 days + +# Radio stream artwork cache settings +CACHE_CATEGORY_RADIO_ARTWORK = 101 +CACHE_EXPIRATION_RADIO_ARTWORK = 86400 * 90 # 90 days +CACHE_EXPIRATION_RADIO_ARTWORK_MISS = 86400 * 7 # 7 days +AD_DETECTION_PHRASES = ("asset link", "asset stop", "asset spot", "advert") + +PERIODIC_SCAN_INTERVAL = 60 * 60 * 6 # 6 hours REFRESH_INTERVAL = 60 * 60 * 24 * 90 # 90 days CONF_ENABLE_ONLINE_METADATA = "enable_online_metadata" MISSING_ARTIST_METADATA_SCAN_TASK_ID = "metadata_missing_artist_metadata_scan" @@ -610,6 +635,529 @@ async def get_track_lyrics( return metadata.lyrics, metadata.lrc_lyrics return None, None + # ========== Radio Stream Artwork Methods ========== + + async def _get_release_group_artwork( + self, mb_release_group: MusicBrainzReleaseGroup + ) -> MediaItemMetadata | None: + """Try to get thumb artwork for a release group from metadata providers. + + :param mb_release_group: MusicBrainz release group to look up. + """ + self.logger.debug( + "Looking up artwork for release group '%s' (mbid: %s)", + mb_release_group.title, + mb_release_group.id, + ) + temp_album = Album( + item_id="temp", + provider="temp", + name=mb_release_group.title, + provider_mappings=set(), + ) + temp_album.add_external_id(ExternalID.MB_RELEASEGROUP, mb_release_group.id) + for provider in self.providers: + if ProviderFeature.ALBUM_METADATA not in provider.supported_features: + continue + try: + if metadata := await provider.get_album_metadata(temp_album): + if thumb := self._get_thumb_image(metadata): + return thumb + except ( + ProviderUnavailableError, + ResourceTemporarilyUnavailable, + InvalidDataError, + ): + pass + return None + + async def _search_musicbrainz_with_variants( + self, + musicbrainz: MusicbrainzProvider, + artist_name: str, + track_name: str, + ) -> tuple[Any, bool]: + """Search MusicBrainz with fallback variants (swapped, without 'The'). + + :param musicbrainz: MusicBrainz provider instance. + :param artist_name: Artist name to search for. + :param track_name: Track name to search for. + :returns: Tuple of (mb_result, swapped) where swapped indicates artist/track were reversed. + """ + # Try original order + mb_result = await musicbrainz.get_release_group_by_track_name(artist_name, track_name) + if mb_result: + return mb_result, False + + # Try swapped (some stations send "Track - Artist") + self.logger.debug( + "No MusicBrainz match for '%s - %s', trying swapped", + artist_name, + track_name, + ) + mb_result = await musicbrainz.get_release_group_by_track_name(track_name, artist_name) + if mb_result: + return mb_result, True + + # Try without "The " prefix + artist_no_the = artist_name[4:] if artist_name.lower().startswith("the ") else None + track_no_the = track_name[4:] if track_name.lower().startswith("the ") else None + + if artist_no_the: + self.logger.debug( + "No match, trying without 'The': '%s - %s'", artist_no_the, track_name + ) + mb_result = await musicbrainz.get_release_group_by_track_name(artist_no_the, track_name) + if mb_result: + return mb_result, False + + if track_no_the: + self.logger.debug( + "No match, trying swapped without 'The': '%s - %s'", track_no_the, artist_name + ) + mb_result = await musicbrainz.get_release_group_by_track_name(track_no_the, artist_name) + if mb_result: + return mb_result, True + + return None, False + + async def get_track_metadata_by_name( + self, + artist_name: str, + track_name: str, + ) -> tuple[MediaItemMetadata | None, str | None, str | None, str | None]: + """Search for track/artist metadata by name. + + Checks library first for immediate results, then falls back to + MusicBrainz for external metadata lookups. + + :param artist_name: Artist name to search for. + :param track_name: Track title to search for. + :returns: Tuple of (metadata, source_description, corrected_artist, corrected_track). + """ + # Clean track name by stripping version suffixes and featuring credits + clean_track_name = clean_title_for_search(track_name) + + # Check library track first - fast, no API calls, respects user-curated images + if metadata := await self._get_library_track_metadata(artist_name, clean_track_name): + return metadata, "library track", artist_name, clean_track_name + + # Use MusicBrainz to get IDs for accurate external metadata lookups + musicbrainz_provider = self.mass.get_provider("musicbrainz") + if not musicbrainz_provider: + # No MusicBrainz, try library artist as fallback + if metadata := await self._get_library_artist_metadata(artist_name): + return metadata, f"library artist '{artist_name}'", artist_name, clean_track_name + return None, None, None, None + musicbrainz: MusicbrainzProvider = cast("MusicbrainzProvider", musicbrainz_provider) + + mb_result, swapped = await self._search_musicbrainz_with_variants( + musicbrainz, artist_name, clean_track_name + ) + + if not mb_result: + self.logger.debug("No MusicBrainz match for '%s - %s'", artist_name, clean_track_name) + # No MB match, try library artist as fallback + if metadata := await self._get_library_artist_metadata(artist_name): + return metadata, f"library artist '{artist_name}'", artist_name, clean_track_name + return None, None, None, None + + mb_artist, mb_release_groups = mb_result + if swapped: + # Swap the variables so subsequent lookups use the correct order + artist_name, clean_track_name = clean_track_name, artist_name + self.logger.debug( + "MusicBrainz matched with swapped artist/track: '%s - %s'", + artist_name, + clean_track_name, + ) + + # Prefer single artwork (exact track art), then fall back to album artwork + singles = [rg for rg in mb_release_groups if rg.primary_type == "Single"] + albums = [rg for rg in mb_release_groups if rg.primary_type == "Album"] + + for mb_release_group in singles: + if thumb := await self._get_release_group_artwork(mb_release_group): + return thumb, f"single '{mb_release_group.title}'", artist_name, clean_track_name + + if singles: + self.logger.debug( + "No artwork found for single release of '%s - %s', trying album artwork", + artist_name, + clean_track_name, + ) + + for mb_release_group in albums: + if thumb := await self._get_release_group_artwork(mb_release_group): + return thumb, f"album '{mb_release_group.title}'", artist_name, clean_track_name + + # Log when falling back to artist artwork + self.logger.debug( + "No album artwork for '%s - %s', trying artist artwork", + artist_name, + clean_track_name, + ) + + # Check library for artist before external lookup + if metadata := await self._get_library_artist_metadata(mb_artist.name): + return metadata, f"library artist '{mb_artist.name}'", artist_name, clean_track_name + + # Fall back to external artist artwork + temp_artist = Artist( + item_id="temp", + provider="temp", + name=mb_artist.name, + provider_mappings=set(), + ) + temp_artist.mbid = mb_artist.id + for provider in self.providers: + if ProviderFeature.ARTIST_METADATA not in provider.supported_features: + continue + try: + if metadata := await provider.get_artist_metadata(temp_artist): + if thumb := self._get_thumb_image(metadata): + return ( + thumb, + f"artist '{mb_artist.name}' via {provider.name}", + artist_name, + clean_track_name, + ) + except ( + ProviderUnavailableError, + ResourceTemporarilyUnavailable, + InvalidDataError, + ): + pass + + return None, None, None, None + + def _get_thumb_image(self, metadata: MediaItemMetadata) -> MediaItemMetadata | None: + """Extract only THUMB type image from metadata. + + Returns new metadata with only the thumb image, or None if no thumb found. + Used for radio artwork where we specifically need artist/album thumbnails, + not logos or banners. + + :param metadata: Metadata to extract thumb from. + """ + if not metadata.images: + return None + for img in metadata.images: + if img.type == ImageType.THUMB: + return MediaItemMetadata(images=UniqueList([img])) + return None + + async def _get_library_track_metadata( + self, artist_name: str, track_name: str + ) -> MediaItemMetadata | None: + """Search library for matching track and return its metadata. + + :param artist_name: Artist name to match. + :param track_name: Track title to match. + """ + try: + search_query = f"{artist_name} {track_name}" + library_tracks = await self.mass.music.tracks.search(search_query, "library", limit=5) + for track in library_tracks: + if not self._match_artist_name(artist_name, track.artists): + continue + if not compare_strings(track_name, track.name, strict=False): + continue + if image_url := await self._get_library_item_image(track): + return MediaItemMetadata( + images=UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider="library", + remotely_accessible=True, + ) + ] + ) + ) + except InvalidDataError: + pass + return None + + async def _get_library_artist_metadata(self, artist_name: str) -> MediaItemMetadata | None: + """Search library for matching artist and return its metadata. + + :param artist_name: Artist name to match. + """ + try: + library_artists = await self.mass.music.artists.search(artist_name, "library", limit=5) + for artist in library_artists: + if not compare_strings(artist_name, artist.name, strict=False): + continue + if artist.metadata and artist.metadata.images: + for img in artist.metadata.images: + if img.type == ImageType.THUMB: + return MediaItemMetadata( + images=UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=self.get_image_url(img, prefer_proxy=True), + provider="library", + remotely_accessible=True, + ) + ] + ) + ) + except InvalidDataError: + pass + return None + + def _match_artist_name(self, search_name: str, artists: list[Artist | ItemMapping]) -> bool: + """Check if any artist matches the search name. + + :param search_name: Artist name to search for. + :param artists: List of artists to check against. + """ + for artist in artists: + if compare_strings(search_name, artist.name, strict=False): + return True + # Handle "The" prefix variations + if compare_strings(f"The {search_name}", artist.name, strict=False): + return True + if artist.name.lower().startswith("the "): + if compare_strings(search_name, artist.name[4:], strict=False): + return True + return False + + async def _get_library_item_image(self, track: Track) -> str | None: + """Get image URL for library track with fallback: track -> album -> artist. + + :param track: Track to get image for. + """ + # Try track image + if track.metadata and track.metadata.images: + for img in track.metadata.images: + if img.type == ImageType.THUMB: + return self.get_image_url(img, prefer_proxy=True) + + # Try album image + if track.album: + album = track.album + if isinstance(album, ItemMapping): + try: + full_album = await self.mass.music.albums.get_library_item(album.item_id) + if full_album and full_album.metadata and full_album.metadata.images: + for img in full_album.metadata.images: + if img.type == ImageType.THUMB: + return self.get_image_url(img, prefer_proxy=True) + except MediaNotFoundError: + pass + elif isinstance(album, Album) and album.metadata and album.metadata.images: + for img in album.metadata.images: + if img.type == ImageType.THUMB: + return self.get_image_url(img, prefer_proxy=True) + + # Try artist image + for artist in track.artists: + if isinstance(artist, ItemMapping): + try: + full_artist = await self.mass.music.artists.get_library_item(artist.item_id) + if full_artist and full_artist.metadata and full_artist.metadata.images: + for img in full_artist.metadata.images: + if img.type == ImageType.THUMB: + return self.get_image_url(img, prefer_proxy=True) + except MediaNotFoundError: + pass + elif isinstance(artist, Artist) and artist.metadata and artist.metadata.images: + for img in artist.metadata.images: + if img.type == ImageType.THUMB: + return self.get_image_url(img, prefer_proxy=True) + + return None + + def get_radio_stream_station_image(self, streamdetails: StreamDetails) -> str | None: + """Get station image URL from queue current item. + + :param streamdetails: StreamDetails for the radio stream. + """ + if streamdetails.queue_id and ( + queue := self.mass.player_queues.get(streamdetails.queue_id) + ): + if queue.current_item and queue.current_item.media_item: + if station_image := queue.current_item.media_item.image: + return station_image.path + return None + + @staticmethod + def normalize_radio_artist_name(artist_name: str) -> str: + """Normalize artist name from radio stream metadata. + + Handles common formats like "Squier, Billy" -> "Billy Squier" while + avoiding mangling of names like "Lipps, Inc." or "Portugal. The Man". + + :param artist_name: Raw artist name to normalize. + """ + # Business/title suffixes that should not be flipped + no_flip_suffixes = ("inc", "inc.", "ltd", "ltd.", "llc", "corp") + # Specific known bands that are 2 words total and split by a comma + valid_artist_names = { + "hello, goodbye", + "wait, what", + "goodnight, sunrise", + "slaughter beach, dog", + "mount, eerie", + "american, native", + } + + normalized = artist_name.replace("_", " ") + + if "," not in normalized: + return normalized + + # Check against known artist exceptions first + if normalized.lower() in valid_artist_names: + return normalized + + # Don't flip if contains "and" or "&" (e.g., "Crosby, Stills & Nash") + if " and " in normalized.lower() or " & " in normalized: + return normalized + + parts = normalized.split(",", 1) + if len(parts) != 2: + return normalized + + before_comma = parts[0].strip() + after_comma = parts[1].strip() + after_comma_lower = after_comma.lower() + + # Don't flip if suffix is a business/title term + if after_comma_lower in no_flip_suffixes: + return normalized + + # Flip if suffix is exactly "The" (e.g., "Beatles, The" -> "The Beatles") + if after_comma_lower == "the": + return f"{after_comma} {before_comma}" + + # Don't flip if 2+ words after comma (e.g., "Portugal, The Man") + if len(after_comma.split()) >= 2: + return normalized + + # Standard flip (e.g., "Squier, Billy" -> "Billy Squier") + return f"{after_comma} {before_comma}" + + async def get_radio_stream_artwork( + self, + artist_name: str, + track_name: str, + fallback_image_url: str | None = None, + ) -> str | None: + """Fetch artwork for radio stream based on current track metadata. + + :param artist_name: Artist name (already normalized). + :param track_name: Track title. + :param fallback_image_url: Fallback image URL (e.g., station logo). + """ + if " / " in artist_name: + artist_name = artist_name.split(" / ")[0].strip() + else: + artists_tuple = split_artists(artist_name) + artist_name = artists_tuple[0] if artists_tuple else artist_name + + if any(phrase in artist_name.lower() for phrase in AD_DETECTION_PHRASES): + return fallback_image_url + + cache_key = f"{artist_name.lower()}|{track_name.lower()}" + cached_result = await self.mass.cache.get( + key=cache_key, + category=CACHE_CATEGORY_RADIO_ARTWORK, + ) + if cached_result is not None: + if cached_result != "": + self.logger.debug( + "Radio artwork for '%s - %s': cached", + artist_name, + track_name, + ) + return str(cached_result) + self.logger.debug( + "Radio artwork for '%s - %s': cached miss", + artist_name, + track_name, + ) + return fallback_image_url + + image_url = None + try: + ( + metadata, + source, + corrected_artist, + corrected_track, + ) = await self.get_track_metadata_by_name( + artist_name=artist_name, + track_name=track_name, + ) + # Use corrected artist/track for logging if available (handles swapped metadata) + log_artist = corrected_artist or artist_name + log_track = corrected_track or track_name + if metadata and metadata.images: + image_url = metadata.images[0].path + self.logger.debug( + "Radio artwork found for '%s - %s': %s", + log_artist, + log_track, + source, + ) + if "imageproxy" not in image_url: + await self.mass.cache.set( + key=cache_key, + data=image_url, + expiration=CACHE_EXPIRATION_RADIO_ARTWORK, + category=CACHE_CATEGORY_RADIO_ARTWORK, + ) + else: + self.logger.debug( + "Radio artwork for '%s - %s': not found", + log_artist, + log_track, + ) + await self.mass.cache.set( + key=cache_key, + data="", + expiration=CACHE_EXPIRATION_RADIO_ARTWORK_MISS, + category=CACHE_CATEGORY_RADIO_ARTWORK, + ) + except (ProviderUnavailableError, ResourceTemporarilyUnavailable, InvalidDataError): + pass + + return image_url or fallback_image_url + + async def update_radio_stream_artwork(self, streamdetails: StreamDetails) -> None: + """Fetch and update radio stream artwork. + + :param streamdetails: StreamDetails to update with artwork. + """ + if not streamdetails.stream_metadata: + return + if not streamdetails.stream_metadata.artist or not streamdetails.stream_metadata.title: + return + + try: + fallback_url = streamdetails.stream_metadata.image_url + image_url = await self.get_radio_stream_artwork( + artist_name=streamdetails.stream_metadata.artist, + track_name=streamdetails.stream_metadata.title, + fallback_image_url=fallback_url, + ) + if image_url and image_url != fallback_url: + streamdetails.stream_metadata = StreamMetadata( + title=streamdetails.stream_metadata.title, + artist=streamdetails.stream_metadata.artist, + image_url=image_url, + ) + streamdetails.stream_metadata_last_updated = time() + if streamdetails.queue_id: + self.mass.player_queues.signal_update(streamdetails.queue_id) + except MusicAssistantError: + pass + async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = False) -> None: """Get/update rich metadata for an artist.""" # collect metadata from all (online) music + metadata providers diff --git a/music_assistant/controllers/streams/audio.py b/music_assistant/controllers/streams/audio.py index dc23eed21a..d53a696d19 100644 --- a/music_assistant/controllers/streams/audio.py +++ b/music_assistant/controllers/streams/audio.py @@ -600,7 +600,7 @@ async def get_icy_radio_stream( # fallback to iso-8859-1 stream_title = stream_title_re.group(1).decode("iso-8859-1", errors="replace") cleaned_stream_title = clean_stream_title(stream_title) - if cleaned_stream_title != streamdetails.stream_title: + if cleaned_stream_title and cleaned_stream_title != streamdetails.stream_title: self.logger.log( VERBOSE_LOG_LEVEL, "ICY Radio streamtitle original: %s", stream_title ) @@ -609,6 +609,40 @@ async def get_icy_radio_stream( ) streamdetails.stream_title = cleaned_stream_title + if " - " in cleaned_stream_title: + parts = cleaned_stream_title.split(" - ", 1) + artist_name_raw = parts[0].strip() + track_name = parts[1].strip() + + if artist_name_raw and track_name: + self.logger.debug( + "ICY metadata: artist='%s', track='%s'", + artist_name_raw, + track_name, + ) + # Set metadata with station image initially + station_image_url = self.mass.metadata.get_radio_stream_station_image( + streamdetails + ) + artist_normalized = self.mass.metadata.normalize_radio_artist_name( + artist_name_raw + ) + streamdetails.stream_metadata = StreamMetadata( + title=track_name, + artist=artist_normalized, + image_url=station_image_url, + ) + streamdetails.stream_metadata_last_updated = time.time() + if streamdetails.queue_id: + self.mass.player_queues.signal_update(streamdetails.queue_id) + # Fetch artist artwork in background + self.mass.call_later( + 0.2, + self.mass.metadata.update_radio_stream_artwork, + streamdetails, + task_id=f"update_radio_artwork_{streamdetails.queue_id}", + ) + async def get_reconnecting_radio_stream(self, url: str) -> AsyncGenerator[bytes, None]: """ Yield continuous radio stream data, automatically reconnecting on disconnect. @@ -2177,6 +2211,9 @@ async def _update_hls_radio_metadata( # Build stream title from title and artist title = metadata.get("title", "") artist = metadata.get("artist", "") + image_url = ( + metadata.get("image") or metadata.get("artwork") or metadata.get("cover") + ) if title or artist: # Format as "Artist - Title" @@ -2197,6 +2234,34 @@ async def _update_hls_radio_metadata( ) streamdetails.stream_title = cleaned_title + # Set metadata immediately with provided/station image + station_image_url = ( + image_url + or mass.metadata.get_radio_stream_station_image(streamdetails) + ) + artist_normalized = ( + mass.metadata.normalize_radio_artist_name(artist) + if artist + else None + ) + streamdetails.stream_metadata = StreamMetadata( + title=title or cleaned_title, + artist=artist_normalized, + image_url=station_image_url, + ) + streamdetails.stream_metadata_last_updated = time.time() + if streamdetails.queue_id: + mass.player_queues.signal_update(streamdetails.queue_id) + + # Fetch artist artwork if not provided in stream metadata + if artist and title and not image_url: + mass.call_later( + 0.2, + mass.metadata.update_radio_stream_artwork, + streamdetails, + task_id=f"update_radio_artwork_{streamdetails.queue_id}", + ) + # Only check the most recent EXTINF break diff --git a/music_assistant/helpers/tags.py b/music_assistant/helpers/tags.py index b50633f008..2a98864657 100644 --- a/music_assistant/helpers/tags.py +++ b/music_assistant/helpers/tags.py @@ -95,6 +95,14 @@ def split_items( " Ft. ", " vs. ", " Vs. ", + " (feat. ", + " (Feat. ", + " (ft. ", + " (Ft. ", + "(feat. ", + "(Feat. ", + "(ft. ", + "(Ft. ", ] # Extra splitters - only use these when we have MB ID evidence of multiple artists diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index 23f431eb0f..c1bf8b03b1 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -151,11 +151,21 @@ def get_total_system_memory() -> float: "instrumental", "karaoke", "remaster", + "remastered", "versie", "unplugged", "disco", "akoestisch", "deluxe", + "video", + "radio", + "extended", + "single", + "edition", + "anniversary", + "stereo", + "album", + "bonus", ) IGNORE_TITLE_PARTS = ( # strings that may be stripped off a title part @@ -176,6 +186,70 @@ def get_total_system_memory() -> float: "no", ) +# Keywords for aggressive search cleaning (includes featuring). +_VERSION_PATTERN = "|".join(re.escape(v) for v in VERSION_PARTS) +_FEAT_PATTERN = r"feat(?:uring)?|ft" +_SEARCH_PATTERN = rf"{_VERSION_PATTERN}|{_FEAT_PATTERN}" + +_SEARCH_PAREN_PATTERN = re.compile( + rf"[\(\[][^\)\]]*\b({_SEARCH_PATTERN})\b[^\)\]]*[\)\]]", + re.IGNORECASE, +) +_SEARCH_HYPHEN_PATTERN = re.compile( + rf"(\s*-\s*(\d{{4}}|{_SEARCH_PATTERN}).*)$", + re.IGNORECASE, +) + +_DISPLAY_STRIP_PATTERN = re.compile( + r"\s*[\(\[](official\s+)?(lyric\s+|music\s+)?(video|audio)[\)\]]$", + re.IGNORECASE, +) + +# Featuring patterns for stripping from titles (not in parentheses). +_FEATURING_PATTERNS = ( + " featuring ", + " feat. ", + " feat ", + " ft. ", + " ft ", +) + + +def clean_title_for_search(title: str) -> str: + """Remove version info and featuring credits from a song title for search matching. + + Performs aggressive cleaning to maximize search API matching accuracy. + Removes parenthetical/bracketed metadata (remastered, live, featuring, etc.), + hyphen-separated suffixes, and standalone featuring credits. + + TODO: Refactor genius_lyrics provider to use this function instead of its + own clean_song_title helper (providers/genius_lyrics/helpers.py). + + :param title: The song title to clean. + """ + # Strip parentheses/brackets containing keywords (including feat) + cleaned = _SEARCH_PAREN_PATTERN.sub("", title) + + # Strip hyphen suffixes like "- Remastered 2019" or "- 2019" + cleaned = _SEARCH_HYPHEN_PATTERN.sub("", cleaned) + + # Strip bare featuring credits (not in parentheses) + cleaned_lower = cleaned.lower() + for pattern in _FEATURING_PATTERNS: + if pattern in cleaned_lower: + idx = cleaned_lower.find(pattern) + cleaned = cleaned[:idx] + break + + # Clean up dangling hyphens and extra spaces + cleaned = re.sub(r"\s*-\s*$", "", cleaned) + return re.sub(r"\s+", " ", cleaned).strip() + + +def clean_title_for_display(title: str) -> str: + """Remove video-related suffixes from a song title for display.""" + return _DISPLAY_STRIP_PATTERN.sub("", title).strip() + def filename_from_string(string: str) -> str: """Create filename from unsafe string.""" diff --git a/music_assistant/providers/theaudiodb/__init__.py b/music_assistant/providers/theaudiodb/__init__.py index 13b117fe21..61b0b28fbf 100644 --- a/music_assistant/providers/theaudiodb/__init__.py +++ b/music_assistant/providers/theaudiodb/__init__.py @@ -155,6 +155,7 @@ async def get_artist_metadata(self, artist: Artist) -> MediaItemMetadata | None: if not artist.mbid: # for 100% accuracy we require the musicbrainz id for all lookups return None + self.logger.debug("Fetching metadata for Artist %s on The Audio DB", artist.name) if data := await self._get_data("artist-mb.php", i=artist.mbid): if data.get("artists"): return self.__parse_artist(data["artists"][0]) @@ -164,6 +165,7 @@ async def get_album_metadata(self, album: Album) -> MediaItemMetadata | None: """Retrieve metadata for album on theaudiodb.""" if not self.config.get_value(CONF_ENABLE_ALBUM_METADATA): return None + self.logger.debug("Fetching metadata for Album %s on The Audio DB", album.name) if mbid := album.get_external_id(ExternalID.MB_RELEASEGROUP): result = await self._get_data("album-mb.php", i=mbid) if result and result.get("album"): From b07d6e025a7c5bb732b6115cff54233d7b15fe85 Mon Sep 17 00:00:00 2001 From: Gav Date: Wed, 1 Apr 2026 02:31:47 +1000 Subject: [PATCH 2/7] Fix missing maxsplit arg in split call MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- music_assistant/controllers/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/music_assistant/controllers/metadata.py b/music_assistant/controllers/metadata.py index 5da4caa5da..76e94ec60b 100644 --- a/music_assistant/controllers/metadata.py +++ b/music_assistant/controllers/metadata.py @@ -1055,7 +1055,7 @@ async def get_radio_stream_artwork( :param fallback_image_url: Fallback image URL (e.g., station logo). """ if " / " in artist_name: - artist_name = artist_name.split(" / ")[0].strip() + artist_name = artist_name.split(" / ", 1)[0].strip() else: artists_tuple = split_artists(artist_name) artist_name = artists_tuple[0] if artists_tuple else artist_name From 714109435dd09b0d1b6fb7ab44f176a92f85d9c7 Mon Sep 17 00:00:00 2001 From: Gav Date: Wed, 1 Apr 2026 10:10:47 +1000 Subject: [PATCH 3/7] Better handling of unexpected metadata --- music_assistant/controllers/metadata.py | 67 +++++++++++++------ .../providers/musicbrainz/__init__.py | 5 ++ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/music_assistant/controllers/metadata.py b/music_assistant/controllers/metadata.py index 76e94ec60b..bd5849a45c 100644 --- a/music_assistant/controllers/metadata.py +++ b/music_assistant/controllers/metadata.py @@ -153,7 +153,7 @@ def _detect_image_format(path: str) -> str: CACHE_CATEGORY_RADIO_ARTWORK = 101 CACHE_EXPIRATION_RADIO_ARTWORK = 86400 * 90 # 90 days CACHE_EXPIRATION_RADIO_ARTWORK_MISS = 86400 * 7 # 7 days -AD_DETECTION_PHRASES = ("asset link", "asset stop", "asset spot", "advert") +AD_DETECTION_PHRASES = ("asset link", "asset stop", "asset spot", "advert", "promo") PERIODIC_SCAN_INTERVAL = 60 * 60 * 6 # 6 hours REFRESH_INTERVAL = 60 * 60 * 24 * 90 # 90 days @@ -639,10 +639,11 @@ async def get_track_lyrics( async def _get_release_group_artwork( self, mb_release_group: MusicBrainzReleaseGroup - ) -> MediaItemMetadata | None: + ) -> tuple[MediaItemMetadata, str] | None: """Try to get thumb artwork for a release group from metadata providers. :param mb_release_group: MusicBrainz release group to look up. + :returns: Tuple of (metadata, provider_name) or None if not found. """ self.logger.debug( "Looking up artwork for release group '%s' (mbid: %s)", @@ -662,7 +663,7 @@ async def _get_release_group_artwork( try: if metadata := await provider.get_album_metadata(temp_album): if thumb := self._get_thumb_image(metadata): - return thumb + return thumb, provider.name except ( ProviderUnavailableError, ResourceTemporarilyUnavailable, @@ -777,8 +778,14 @@ async def get_track_metadata_by_name( albums = [rg for rg in mb_release_groups if rg.primary_type == "Album"] for mb_release_group in singles: - if thumb := await self._get_release_group_artwork(mb_release_group): - return thumb, f"single '{mb_release_group.title}'", artist_name, clean_track_name + if result := await self._get_release_group_artwork(mb_release_group): + thumb, provider_name = result + return ( + thumb, + f"single '{mb_release_group.title}' via {provider_name}", + artist_name, + clean_track_name, + ) if singles: self.logger.debug( @@ -788,8 +795,14 @@ async def get_track_metadata_by_name( ) for mb_release_group in albums: - if thumb := await self._get_release_group_artwork(mb_release_group): - return thumb, f"album '{mb_release_group.title}'", artist_name, clean_track_name + if result := await self._get_release_group_artwork(mb_release_group): + thumb, provider_name = result + return ( + thumb, + f"album '{mb_release_group.title}' via {provider_name}", + artist_name, + clean_track_name, + ) # Log when falling back to artist artwork self.logger.debug( @@ -814,10 +827,10 @@ async def get_track_metadata_by_name( if ProviderFeature.ARTIST_METADATA not in provider.supported_features: continue try: - if metadata := await provider.get_artist_metadata(temp_artist): - if thumb := self._get_thumb_image(metadata): + if artist_metadata := await provider.get_artist_metadata(temp_artist): + if artist_thumb := self._get_thumb_image(artist_metadata): return ( - thumb, + artist_thumb, f"artist '{mb_artist.name}' via {provider.name}", artist_name, clean_track_name, @@ -1047,12 +1060,13 @@ async def get_radio_stream_artwork( artist_name: str, track_name: str, fallback_image_url: str | None = None, - ) -> str | None: + ) -> tuple[str | None, str | None, str | None]: """Fetch artwork for radio stream based on current track metadata. :param artist_name: Artist name (already normalized). :param track_name: Track title. :param fallback_image_url: Fallback image URL (e.g., station logo). + :returns: Tuple of (image_url, corrected_artist, corrected_track). """ if " / " in artist_name: artist_name = artist_name.split(" / ", 1)[0].strip() @@ -1061,7 +1075,7 @@ async def get_radio_stream_artwork( artist_name = artists_tuple[0] if artists_tuple else artist_name if any(phrase in artist_name.lower() for phrase in AD_DETECTION_PHRASES): - return fallback_image_url + return fallback_image_url, None, None cache_key = f"{artist_name.lower()}|{track_name.lower()}" cached_result = await self.mass.cache.get( @@ -1075,15 +1089,17 @@ async def get_radio_stream_artwork( artist_name, track_name, ) - return str(cached_result) + return str(cached_result), None, None self.logger.debug( "Radio artwork for '%s - %s': cached miss", artist_name, track_name, ) - return fallback_image_url + return fallback_image_url, None, None image_url = None + corrected_artist = None + corrected_track = None try: ( metadata, @@ -1127,7 +1143,7 @@ async def get_radio_stream_artwork( except (ProviderUnavailableError, ResourceTemporarilyUnavailable, InvalidDataError): pass - return image_url or fallback_image_url + return image_url or fallback_image_url, corrected_artist, corrected_track async def update_radio_stream_artwork(self, streamdetails: StreamDetails) -> None: """Fetch and update radio stream artwork. @@ -1141,15 +1157,24 @@ async def update_radio_stream_artwork(self, streamdetails: StreamDetails) -> Non try: fallback_url = streamdetails.stream_metadata.image_url - image_url = await self.get_radio_stream_artwork( - artist_name=streamdetails.stream_metadata.artist, - track_name=streamdetails.stream_metadata.title, + original_artist = streamdetails.stream_metadata.artist + original_title = streamdetails.stream_metadata.title + image_url, corrected_artist, corrected_track = await self.get_radio_stream_artwork( + artist_name=original_artist, + track_name=original_title, fallback_image_url=fallback_url, ) - if image_url and image_url != fallback_url: + # Use corrected artist/track if metadata was swapped + final_artist = corrected_artist or original_artist + final_title = corrected_track or original_title + if ( + image_url != fallback_url + or final_artist != original_artist + or final_title != original_title + ): streamdetails.stream_metadata = StreamMetadata( - title=streamdetails.stream_metadata.title, - artist=streamdetails.stream_metadata.artist, + title=final_title, + artist=final_artist, image_url=image_url, ) streamdetails.stream_metadata_last_updated = time() diff --git a/music_assistant/providers/musicbrainz/__init__.py b/music_assistant/providers/musicbrainz/__init__.py index f685697662..9d0566ee38 100644 --- a/music_assistant/providers/musicbrainz/__init__.py +++ b/music_assistant/providers/musicbrainz/__init__.py @@ -522,6 +522,11 @@ def _get_release_groups_with_dates( seen: dict[str, tuple[MusicBrainzReleaseGroup, str]] = {} for release in releases: + # Skip bootleg and pseudo-releases + release_status = release.get("status", "") + if release_status in ("Bootleg", "Pseudo-Release"): + continue + rg = release.get("release-group", {}) rg_id = rg.get("id") if not rg_id: From 84aae391652a01cf61461d2e2ed43e4f94da31dd Mon Sep 17 00:00:00 2001 From: Gav Date: Mon, 6 Apr 2026 13:55:06 +1000 Subject: [PATCH 4/7] Rebase --- music_assistant/controllers/metadata.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/music_assistant/controllers/metadata.py b/music_assistant/controllers/metadata.py index bd5849a45c..d42aebac84 100644 --- a/music_assistant/controllers/metadata.py +++ b/music_assistant/controllers/metadata.py @@ -142,12 +142,6 @@ def _detect_image_format(path: str) -> str: } DEFAULT_LANGUAGE = "en_US" -REFRESH_INTERVAL_ARTISTS = 60 * 60 * 24 * 90 # 90 days -REFRESH_INTERVAL_ALBUMS = 60 * 60 * 24 * 90 # 90 days -REFRESH_INTERVAL_TRACKS = 60 * 60 * 24 * 90 # 90 days -REFRESH_INTERVAL_AUDIOBOOKS = 60 * 60 * 24 * 90 # 90 days -REFRESH_INTERVAL_PODCASTS = 60 * 60 * 24 * 90 # 90 days -REFRESH_INTERVAL_PLAYLISTS = 60 * 60 * 24 * 14 # 14 days # Radio stream artwork cache settings CACHE_CATEGORY_RADIO_ARTWORK = 101 @@ -155,7 +149,6 @@ def _detect_image_format(path: str) -> str: CACHE_EXPIRATION_RADIO_ARTWORK_MISS = 86400 * 7 # 7 days AD_DETECTION_PHRASES = ("asset link", "asset stop", "asset spot", "advert", "promo") -PERIODIC_SCAN_INTERVAL = 60 * 60 * 6 # 6 hours REFRESH_INTERVAL = 60 * 60 * 24 * 90 # 90 days CONF_ENABLE_ONLINE_METADATA = "enable_online_metadata" MISSING_ARTIST_METADATA_SCAN_TASK_ID = "metadata_missing_artist_metadata_scan" From c8ba96dde2a15b97614c317cf06d742c21adae00 Mon Sep 17 00:00:00 2001 From: Gav Date: Mon, 6 Apr 2026 15:54:03 +1000 Subject: [PATCH 5/7] PR Review comments --- music_assistant/controllers/metadata.py | 32 ++++-- music_assistant/controllers/streams/audio.py | 111 +++++++++++-------- music_assistant/helpers/tags.py | 85 ++++++++------ music_assistant/helpers/util.py | 81 +++++++------- 4 files changed, 175 insertions(+), 134 deletions(-) diff --git a/music_assistant/controllers/metadata.py b/music_assistant/controllers/metadata.py index d42aebac84..dcb14e06d4 100644 --- a/music_assistant/controllers/metadata.py +++ b/music_assistant/controllers/metadata.py @@ -1,4 +1,9 @@ -"""All logic for metadata retrieval.""" +"""All logic for metadata retrieval. + +TODO: This controller is getting large. Refactor into a dedicated subfolder +with split files (controller.py, helpers.py, etc.) following the pattern +of other controllers. +""" from __future__ import annotations @@ -77,7 +82,7 @@ from music_assistant.helpers.security import is_safe_path from music_assistant.helpers.tags import split_artists from music_assistant.helpers.throttle_retry import Throttler -from music_assistant.helpers.util import clean_title_for_search, try_parse_int +from music_assistant.helpers.util import parse_title_and_version, try_parse_int from music_assistant.models.core_controller import CoreController from music_assistant.models.music_provider import MusicProvider @@ -730,7 +735,7 @@ async def get_track_metadata_by_name( :returns: Tuple of (metadata, source_description, corrected_artist, corrected_track). """ # Clean track name by stripping version suffixes and featuring credits - clean_track_name = clean_title_for_search(track_name) + clean_track_name, _ = parse_title_and_version(track_name, strip_for_search=True) # Check library track first - fast, no API calls, respects user-curated images if metadata := await self._get_library_track_metadata(artist_name, clean_track_name): @@ -869,7 +874,7 @@ async def _get_library_track_metadata( continue if not compare_strings(track_name, track.name, strict=False): continue - if image_url := await self._get_library_item_image(track): + if image_url := await self._get_library_item_thumb(track): return MediaItemMetadata( images=UniqueList( [ @@ -932,7 +937,7 @@ def _match_artist_name(self, search_name: str, artists: list[Artist | ItemMappin return True return False - async def _get_library_item_image(self, track: Track) -> str | None: + async def _get_library_item_thumb(self, track: Track) -> str | None: """Get image URL for library track with fallback: track -> album -> artist. :param track: Track to get image for. @@ -1048,17 +1053,22 @@ def normalize_radio_artist_name(artist_name: str) -> str: # Standard flip (e.g., "Squier, Billy" -> "Billy Squier") return f"{after_comma} {before_comma}" - async def get_radio_stream_artwork( + async def get_image_url_by_name( self, artist_name: str, track_name: str, fallback_image_url: str | None = None, ) -> tuple[str | None, str | None, str | None]: - """Fetch artwork for radio stream based on current track metadata. + """ + Look up artwork by artist and track name. + + Searches library and external providers for matching artwork. + Also returns corrected artist/track names if the search detects + swapped metadata (e.g., "Track - Artist" instead of "Artist - Track"). - :param artist_name: Artist name (already normalized). - :param track_name: Track title. - :param fallback_image_url: Fallback image URL (e.g., station logo). + :param artist_name: Artist name to search for. + :param track_name: Track title to search for. + :param fallback_image_url: Fallback image URL if no artwork found. :returns: Tuple of (image_url, corrected_artist, corrected_track). """ if " / " in artist_name: @@ -1152,7 +1162,7 @@ async def update_radio_stream_artwork(self, streamdetails: StreamDetails) -> Non fallback_url = streamdetails.stream_metadata.image_url original_artist = streamdetails.stream_metadata.artist original_title = streamdetails.stream_metadata.title - image_url, corrected_artist, corrected_track = await self.get_radio_stream_artwork( + image_url, corrected_artist, corrected_track = await self.get_image_url_by_name( artist_name=original_artist, track_name=original_title, fallback_image_url=fallback_url, diff --git a/music_assistant/controllers/streams/audio.py b/music_assistant/controllers/streams/audio.py index d53a696d19..53241c1203 100644 --- a/music_assistant/controllers/streams/audio.py +++ b/music_assistant/controllers/streams/audio.py @@ -90,7 +90,12 @@ from music_assistant.helpers.ffmpeg import FFMpeg, get_ffmpeg_stream from music_assistant.helpers.playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER -from music_assistant.helpers.util import clean_stream_title, detect_charset, remove_file +from music_assistant.helpers.util import ( + clean_stream_title, + detect_charset, + parse_title_and_version, + remove_file, +) from music_assistant.models.smart_fades import SmartFadesMode from music_assistant.providers.sync_group.constants import SGP_PREFIX from music_assistant.providers.universal_group.constants import UGP_PREFIX @@ -143,6 +148,50 @@ def smart_fades_mixer(self) -> SmartFadesMixer: assert self._smart_fades_mixer is not None, "StreamsAudio.setup() not called" return self._smart_fades_mixer + def _update_radio_stream_metadata( + self, + streamdetails: StreamDetails, + artist: str | None, + title: str, + image_url: str | None = None, + album: str | None = None, + ) -> None: + """ + Update radio stream metadata and trigger artwork lookup. + + :param streamdetails: The stream details to update. + :param artist: Artist name (will be normalized). + :param title: Track title (will be cleaned for display). + :param image_url: Optional image URL from stream metadata. + :param album: Optional album name. + """ + station_image_url = image_url or self.mass.metadata.get_radio_stream_station_image( + streamdetails + ) + artist_normalized = ( + self.mass.metadata.normalize_radio_artist_name(artist) if artist else None + ) + display_title, _ = parse_title_and_version(title, strip_for_display=True) + + streamdetails.stream_metadata = StreamMetadata( + title=display_title, + artist=artist_normalized, + album=album, + image_url=station_image_url, + ) + streamdetails.stream_metadata_last_updated = time.time() + if streamdetails.queue_id: + self.mass.player_queues.signal_update(streamdetails.queue_id) + + # Fetch artwork in background (track, album then artist) + if artist and title and not image_url: + self.mass.call_later( + 0.2, + self.mass.metadata.update_radio_stream_artwork, + streamdetails, + task_id=f"update_radio_artwork_{streamdetails.queue_id}", + ) + # --- Public methods --- async def get_stream_details( @@ -367,8 +416,11 @@ def _on_inband_metadata(metadata: dict[str, str]) -> None: if cleaned_title and cleaned_title != streamdetails.stream_title: self.logger.log(VERBOSE_LOG_LEVEL, "In-band metadata: %s", cleaned_title) streamdetails.stream_title = cleaned_title - streamdetails.stream_metadata = StreamMetadata( - title=title or cleaned_title, artist=artist or None, album=album or None + self._update_radio_stream_metadata( + streamdetails, + artist=artist or None, + title=title or cleaned_title, + album=album or None, ) audio_source = get_chained_ogg_stream( @@ -620,27 +672,10 @@ async def get_icy_radio_stream( artist_name_raw, track_name, ) - # Set metadata with station image initially - station_image_url = self.mass.metadata.get_radio_stream_station_image( - streamdetails - ) - artist_normalized = self.mass.metadata.normalize_radio_artist_name( - artist_name_raw - ) - streamdetails.stream_metadata = StreamMetadata( - title=track_name, - artist=artist_normalized, - image_url=station_image_url, - ) - streamdetails.stream_metadata_last_updated = time.time() - if streamdetails.queue_id: - self.mass.player_queues.signal_update(streamdetails.queue_id) - # Fetch artist artwork in background - self.mass.call_later( - 0.2, - self.mass.metadata.update_radio_stream_artwork, + self._update_radio_stream_metadata( streamdetails, - task_id=f"update_radio_artwork_{streamdetails.queue_id}", + artist=artist_name_raw, + title=track_name, ) async def get_reconnecting_radio_stream(self, url: str) -> AsyncGenerator[bytes, None]: @@ -2233,34 +2268,12 @@ async def _update_hls_radio_metadata( VERBOSE_LOG_LEVEL, "HLS Radio metadata updated: %s", cleaned_title ) streamdetails.stream_title = cleaned_title - - # Set metadata immediately with provided/station image - station_image_url = ( - image_url - or mass.metadata.get_radio_stream_station_image(streamdetails) - ) - artist_normalized = ( - mass.metadata.normalize_radio_artist_name(artist) - if artist - else None - ) - streamdetails.stream_metadata = StreamMetadata( + self._update_radio_stream_metadata( + streamdetails, + artist=artist or None, title=title or cleaned_title, - artist=artist_normalized, - image_url=station_image_url, + image_url=image_url, ) - streamdetails.stream_metadata_last_updated = time.time() - if streamdetails.queue_id: - mass.player_queues.signal_update(streamdetails.queue_id) - - # Fetch artist artwork if not provided in stream metadata - if artist and title and not image_url: - mass.call_later( - 0.2, - mass.metadata.update_radio_stream_artwork, - streamdetails, - task_id=f"update_radio_artwork_{streamdetails.queue_id}", - ) # Only check the most recent EXTINF break diff --git a/music_assistant/helpers/tags.py b/music_assistant/helpers/tags.py index 2a98864657..3e5d88449d 100644 --- a/music_assistant/helpers/tags.py +++ b/music_assistant/helpers/tags.py @@ -43,7 +43,8 @@ def clean_tuple(values: Iterable[str]) -> tuple[str, ...]: def split_items( org_str: str | list[str] | tuple[str, ...] | None, allow_unsafe_splitters: bool = False ) -> tuple[str, ...]: - """Split a tag string into multiple values. + """ + Split a tag string into multiple values. Splits on semicolon (;) first as the standard multi-value delimiter. @@ -82,43 +83,40 @@ def split_items( # ARTISTS tag parsing or ARTIST tag splitting entirely. # # Featuring splitters - always split on these to capture featuring artists in the database +# Featuring splitters - case-insensitive patterns (searched with lower()) +# These always split to capture featuring artists in the database FEATURING_SPLITTERS = [ " featuring ", - " Featuring ", " feat. ", - " Feat. ", " feat ", - " Feat ", " duet with ", - " Duet With ", " ft. ", - " Ft. ", " vs. ", - " Vs. ", + " vs ", " (feat. ", - " (Feat. ", " (ft. ", - " (Ft. ", "(feat. ", - "(Feat. ", "(ft. ", - "(Ft. ", ] # Extra splitters - only use these when we have MB ID evidence of multiple artists -EXTRA_SPLITTERS = [" & ", ", ", " + ", " with ", " With "] +EXTRA_SPLITTERS = [" & ", ", ", " + ", " with "] def _split_on_featuring(item: str) -> list[str]: """Split a string on featuring splitters, returns list of parts.""" + item_lower = item.lower() for splitter in FEATURING_SPLITTERS: - if splitter in item: + if splitter in item_lower: + # Find the position in original string (case-insensitive) + pos = item_lower.find(splitter) parts = [] - for subitem in item.split(splitter): - clean_item = subitem.strip() - if clean_item: - # Recursively process each part for nested featuring splitters - parts.extend(_split_on_featuring(clean_item)) + before = item[:pos].strip() + after = item[pos + len(splitter) :].strip() + if before: + parts.extend(_split_on_featuring(before)) + if after: + parts.extend(_split_on_featuring(after)) return parts return [item] @@ -128,7 +126,8 @@ def _split_to_target_count( expected_count: int, org_artists: str | tuple[str, ...], ) -> list[str]: - """Split artists on extra splitters to reach expected count. + """ + Split artists on extra splitters to reach expected count. :param artists: List of artists after featuring splits. :param expected_count: Target number of artists. @@ -204,7 +203,8 @@ def split_artists( org_artists: str | tuple[str, ...], expected_count: int | None = None, ) -> tuple[str, ...]: - """Parse artists from a string, guided by expected artist count. + """ + Parse artists from a string, guided by expected artist count. :param org_artists: The artist string or tuple of strings to parse. :param expected_count: Expected number of artists (typically from MB artist IDs). @@ -770,7 +770,8 @@ def get_file_duration(input_file: str) -> float: def _decode_mp4_freeform_single(values: list[Any]) -> str: - """Decode a single-value MP4 freeform tag (bytes to string). + """ + Decode a single-value MP4 freeform tag (bytes to string). :param values: List of MP4FreeForm values (typically contains one item). """ @@ -783,7 +784,8 @@ def _decode_mp4_freeform_single(values: list[Any]) -> str: def _decode_mp4_freeform_list(values: list[Any]) -> list[str]: - """Decode a multi-value MP4 freeform tag (bytes to strings). + """ + Decode a multi-value MP4 freeform tag (bytes to strings). :param values: List of MP4FreeForm values. """ @@ -797,7 +799,8 @@ def _decode_mp4_freeform_list(values: list[Any]) -> list[str]: def _parse_mp4_tags(tags: MP4Tags) -> dict[str, Any]: # noqa: PLR0915 - """Parse MP4/M4A/AAC tags from mutagen MP4Tags object. + """ + Parse MP4/M4A/AAC tags from mutagen MP4Tags object. See: https://mutagen.readthedocs.io/en/latest/api/mp4.html @@ -914,7 +917,8 @@ def _parse_mp4_tags(tags: MP4Tags) -> dict[str, Any]: # noqa: PLR0915 def _parse_id3_tags(tags: dict[str, Any]) -> dict[str, Any]: - """Parse ID3 tags (MP3 files) from mutagen tags dict. + """ + Parse ID3 tags (MP3 files) from mutagen tags dict. See: https://mutagen-specs.readthedocs.io/en/latest/id3/id3v2.4.0-frames.html See: https://picard-docs.musicbrainz.org/en/appendices/tag_mapping.html @@ -989,7 +993,8 @@ def _parse_id3_tags(tags: dict[str, Any]) -> dict[str, Any]: def _vorbis_get_single(tags: VCommentDict, key: str) -> str | None: - """Get single value from Vorbis comments (first item if multiple exist). + """ + Get single value from Vorbis comments (first item if multiple exist). :param tags: VCommentDict from mutagen. :param key: Tag name (case insensitive). @@ -999,7 +1004,8 @@ def _vorbis_get_single(tags: VCommentDict, key: str) -> str | None: def _vorbis_get_multi(tags: VCommentDict, key: str) -> list[str] | None: - """Get all values from Vorbis comments as a list. + """ + Get all values from Vorbis comments as a list. :param tags: VCommentDict from mutagen. :param key: Tag name (case insensitive). @@ -1009,7 +1015,8 @@ def _vorbis_get_multi(tags: VCommentDict, key: str) -> list[str] | None: def _parse_vorbis_artist_tags(tags: VCommentDict, result: dict[str, Any]) -> None: - """Parse artist-related tags from Vorbis comments into result dict. + """ + Parse artist-related tags from Vorbis comments into result dict. Handles multiple ARTIST/ALBUMARTIST fields per Vorbis spec, as well as explicit ARTISTS tag which take precedence. @@ -1048,7 +1055,8 @@ def _parse_vorbis_artist_tags(tags: VCommentDict, result: dict[str, Any]) -> Non def _parse_vorbis_tags(tags: VCommentDict) -> dict[str, Any]: - """Parse Vorbis comment tags (FLAC, OGG Vorbis, OGG Opus, etc.). + """ + Parse Vorbis comment tags (FLAC, OGG Vorbis, OGG Opus, etc.). Vorbis comments support multiple values for the same field name per the spec. For example, multiple ARTIST fields can be used instead of a single ARTISTS field. @@ -1133,7 +1141,8 @@ def _parse_vorbis_tags(tags: VCommentDict) -> dict[str, Any]: def _apev2_get_values(tags: APEv2, key: str) -> list[str]: - """Get values from an APEv2 tag, splitting on null bytes for multi-value fields. + """ + Get values from an APEv2 tag, splitting on null bytes for multi-value fields. :param tags: APEv2 tags object. :param key: Tag key (case-insensitive in APEv2). @@ -1148,7 +1157,8 @@ def _apev2_get_values(tags: APEv2, key: str) -> list[str]: def _apev2_get_single(tags: APEv2, key: str) -> str | None: - """Get a single value from an APEv2 tag. + """ + Get a single value from an APEv2 tag. :param tags: APEv2 tags object. :param key: Tag key. @@ -1158,7 +1168,8 @@ def _apev2_get_single(tags: APEv2, key: str) -> str | None: def _apev2_get_multi(tags: APEv2, key: str) -> list[str] | None: - """Get multiple values from an APEv2 tag. + """ + Get multiple values from an APEv2 tag. :param tags: APEv2 tags object. :param key: Tag key. @@ -1168,7 +1179,8 @@ def _apev2_get_multi(tags: APEv2, key: str) -> list[str] | None: def _parse_apev2_tags(tags: APEv2) -> dict[str, Any]: # noqa: PLR0915 - r"""Parse APEv2 tags into a normalized dictionary. + r""" + Parse APEv2 tags into a normalized dictionary. APEv2 tags are used by WavPack, Musepack, Monkey's Audio, OptimFROG, and TAK. Multi-value fields use null byte (\x00) as separator. @@ -1270,7 +1282,8 @@ def _parse_apev2_tags(tags: APEv2) -> dict[str, Any]: # noqa: PLR0915 def parse_tags_mutagen(input_file: str) -> dict[str, Any]: - """Parse tags from an audio file using Mutagen. + """ + Parse tags from an audio file using Mutagen. Supports Vorbis comments (FLAC, OGG), ID3 tags (MP3), MP4 tags (AAC/M4A/ALAC), and APEv2 tags (WavPack, Musepack, Monkey's Audio). @@ -1304,7 +1317,8 @@ def parse_tags_mutagen(input_file: str) -> dict[str, Any]: def _format_uses_apev2(format_name: str) -> bool: - """Check if an audio format exclusively uses APEv2 tags. + """ + Check if an audio format exclusively uses APEv2 tags. These formats ONLY use APEv2 tags and cannot have cover art detected by ffprobe's video stream detection (unlike ID3's APIC which shows as mjpeg/png stream). @@ -1323,7 +1337,8 @@ def _format_uses_apev2(format_name: str) -> bool: def get_apev2_image(input_file: str) -> bytes | None: - """Extract cover art from APEv2 tags using mutagen. + """ + Extract cover art from APEv2 tags using mutagen. APEv2 tags (used by WavPack, Musepack, etc.) store cover art differently than ID3 tags. FFmpeg does not expose these as video streams, so we use diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index c1bf8b03b1..a1fd8e5b70 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -200,8 +200,11 @@ def get_total_system_memory() -> float: re.IGNORECASE, ) +# Superfluous suffixes to strip for display (video/audio markers, etc.) _DISPLAY_STRIP_PATTERN = re.compile( - r"\s*[\(\[](official\s+)?(lyric\s+|music\s+)?(video|audio)[\)\]]$", + r"\s*[\(\[]" + r"(official\s+)?(lyric\s+|music\s+)?(video|audio|visualizer|clip)" + r"[\)\]]$", re.IGNORECASE, ) @@ -215,42 +218,6 @@ def get_total_system_memory() -> float: ) -def clean_title_for_search(title: str) -> str: - """Remove version info and featuring credits from a song title for search matching. - - Performs aggressive cleaning to maximize search API matching accuracy. - Removes parenthetical/bracketed metadata (remastered, live, featuring, etc.), - hyphen-separated suffixes, and standalone featuring credits. - - TODO: Refactor genius_lyrics provider to use this function instead of its - own clean_song_title helper (providers/genius_lyrics/helpers.py). - - :param title: The song title to clean. - """ - # Strip parentheses/brackets containing keywords (including feat) - cleaned = _SEARCH_PAREN_PATTERN.sub("", title) - - # Strip hyphen suffixes like "- Remastered 2019" or "- 2019" - cleaned = _SEARCH_HYPHEN_PATTERN.sub("", cleaned) - - # Strip bare featuring credits (not in parentheses) - cleaned_lower = cleaned.lower() - for pattern in _FEATURING_PATTERNS: - if pattern in cleaned_lower: - idx = cleaned_lower.find(pattern) - cleaned = cleaned[:idx] - break - - # Clean up dangling hyphens and extra spaces - cleaned = re.sub(r"\s*-\s*$", "", cleaned) - return re.sub(r"\s+", " ", cleaned).strip() - - -def clean_title_for_display(title: str) -> str: - """Remove video-related suffixes from a song title for display.""" - return _DISPLAY_STRIP_PATTERN.sub("", title).strip() - - def filename_from_string(string: str) -> str: """Create filename from unsafe string.""" keepcharacters = (" ", ".", "_") @@ -309,9 +276,45 @@ def normalize_unicode(value: str | None) -> str | None: return unicodedata.normalize("NFC", value) -def parse_title_and_version(title: str, track_version: str | None = None) -> tuple[str, str]: - """Try to parse version from the title.""" +def parse_title_and_version( + title: str, + track_version: str | None = None, + strip_for_search: bool = False, + strip_for_display: bool = False, +) -> tuple[str, str]: + """ + Parse version from the title and optionally clean for search or display. + + :param title: The title to parse. + :param track_version: Optional existing version string. + :param strip_for_search: Aggressively strip for search matching (removes featuring, + version info in brackets, hyphen suffixes like "- Remastered 2019"). + :param strip_for_display: Strip superfluous suffixes like "(Official Video)". + """ version = track_version or "" + + # Aggressive search cleaning - strip parentheses/brackets with version/feat keywords + if strip_for_search: + title = _SEARCH_PAREN_PATTERN.sub("", title) + title = _SEARCH_HYPHEN_PATTERN.sub("", title) + # Strip bare featuring credits (not in parentheses) + title_lower = title.lower() + for pattern in _FEATURING_PATTERNS: + if pattern in title_lower: + idx = title_lower.find(pattern) + title = title[:idx] + break + # Clean up dangling hyphens and extra spaces + title = re.sub(r"\s*-\s*$", "", title) + title = re.sub(r"\s+", " ", title).strip() + return title, version + + # Display cleaning - just strip video-related suffixes + if strip_for_display: + title = _DISPLAY_STRIP_PATTERN.sub("", title).strip() + return title, version + + # Standard version parsing for regex in (r"\(.*?\)", r"\[.*?\]", r" - .*"): for title_part in re.findall(regex, title): # Extract the content without brackets/dashes for checking From d2a6976f604d565f002cedf490e5d6f5fdb00530 Mon Sep 17 00:00:00 2001 From: Gav Date: Mon, 6 Apr 2026 15:55:24 +1000 Subject: [PATCH 6/7] Formatting --- music_assistant/controllers/metadata.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/music_assistant/controllers/metadata.py b/music_assistant/controllers/metadata.py index dcb14e06d4..c6dd07a090 100644 --- a/music_assistant/controllers/metadata.py +++ b/music_assistant/controllers/metadata.py @@ -1,4 +1,5 @@ -"""All logic for metadata retrieval. +""" +All logic for metadata retrieval. TODO: This controller is getting large. Refactor into a dedicated subfolder with split files (controller.py, helpers.py, etc.) following the pattern @@ -1059,8 +1060,7 @@ async def get_image_url_by_name( track_name: str, fallback_image_url: str | None = None, ) -> tuple[str | None, str | None, str | None]: - """ - Look up artwork by artist and track name. + """Look up artwork by artist and track name. Searches library and external providers for matching artwork. Also returns corrected artist/track names if the search detects From d1e8c62c04b47f75a6e41b46312db51aa839c5b1 Mon Sep 17 00:00:00 2001 From: Gav Date: Mon, 6 Apr 2026 16:18:14 +1000 Subject: [PATCH 7/7] Docstring cleaning --- music_assistant/controllers/metadata.py | 36 ++++++++++++++++--------- music_assistant/helpers/util.py | 9 +++---- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/music_assistant/controllers/metadata.py b/music_assistant/controllers/metadata.py index c6dd07a090..32ea530ad3 100644 --- a/music_assistant/controllers/metadata.py +++ b/music_assistant/controllers/metadata.py @@ -639,7 +639,8 @@ async def get_track_lyrics( async def _get_release_group_artwork( self, mb_release_group: MusicBrainzReleaseGroup ) -> tuple[MediaItemMetadata, str] | None: - """Try to get thumb artwork for a release group from metadata providers. + """ + Try to get thumb artwork for a release group from metadata providers. :param mb_release_group: MusicBrainz release group to look up. :returns: Tuple of (metadata, provider_name) or None if not found. @@ -677,7 +678,8 @@ async def _search_musicbrainz_with_variants( artist_name: str, track_name: str, ) -> tuple[Any, bool]: - """Search MusicBrainz with fallback variants (swapped, without 'The'). + """ + Search MusicBrainz with fallback variants (swapped, without 'The'). :param musicbrainz: MusicBrainz provider instance. :param artist_name: Artist name to search for. @@ -726,7 +728,8 @@ async def get_track_metadata_by_name( artist_name: str, track_name: str, ) -> tuple[MediaItemMetadata | None, str | None, str | None, str | None]: - """Search for track/artist metadata by name. + """ + Search for track/artist metadata by name. Checks library first for immediate results, then falls back to MusicBrainz for external metadata lookups. @@ -844,7 +847,8 @@ async def get_track_metadata_by_name( return None, None, None, None def _get_thumb_image(self, metadata: MediaItemMetadata) -> MediaItemMetadata | None: - """Extract only THUMB type image from metadata. + """ + Extract only THUMB type image from metadata. Returns new metadata with only the thumb image, or None if no thumb found. Used for radio artwork where we specifically need artist/album thumbnails, @@ -862,7 +866,8 @@ def _get_thumb_image(self, metadata: MediaItemMetadata) -> MediaItemMetadata | N async def _get_library_track_metadata( self, artist_name: str, track_name: str ) -> MediaItemMetadata | None: - """Search library for matching track and return its metadata. + """ + Search library for matching track and return its metadata. :param artist_name: Artist name to match. :param track_name: Track title to match. @@ -893,7 +898,8 @@ async def _get_library_track_metadata( return None async def _get_library_artist_metadata(self, artist_name: str) -> MediaItemMetadata | None: - """Search library for matching artist and return its metadata. + """ + Search library for matching artist and return its metadata. :param artist_name: Artist name to match. """ @@ -922,7 +928,8 @@ async def _get_library_artist_metadata(self, artist_name: str) -> MediaItemMetad return None def _match_artist_name(self, search_name: str, artists: list[Artist | ItemMapping]) -> bool: - """Check if any artist matches the search name. + """ + Check if any artist matches the search name. :param search_name: Artist name to search for. :param artists: List of artists to check against. @@ -939,7 +946,8 @@ def _match_artist_name(self, search_name: str, artists: list[Artist | ItemMappin return False async def _get_library_item_thumb(self, track: Track) -> str | None: - """Get image URL for library track with fallback: track -> album -> artist. + """ + Get image URL for library track with fallback: track -> album -> artist. :param track: Track to get image for. """ @@ -985,7 +993,8 @@ async def _get_library_item_thumb(self, track: Track) -> str | None: return None def get_radio_stream_station_image(self, streamdetails: StreamDetails) -> str | None: - """Get station image URL from queue current item. + """ + Get station image URL from queue current item. :param streamdetails: StreamDetails for the radio stream. """ @@ -999,7 +1008,8 @@ def get_radio_stream_station_image(self, streamdetails: StreamDetails) -> str | @staticmethod def normalize_radio_artist_name(artist_name: str) -> str: - """Normalize artist name from radio stream metadata. + """ + Normalize artist name from radio stream metadata. Handles common formats like "Squier, Billy" -> "Billy Squier" while avoiding mangling of names like "Lipps, Inc." or "Portugal. The Man". @@ -1060,7 +1070,8 @@ async def get_image_url_by_name( track_name: str, fallback_image_url: str | None = None, ) -> tuple[str | None, str | None, str | None]: - """Look up artwork by artist and track name. + """ + Look up artwork by artist and track name. Searches library and external providers for matching artwork. Also returns corrected artist/track names if the search detects @@ -1149,7 +1160,8 @@ async def get_image_url_by_name( return image_url or fallback_image_url, corrected_artist, corrected_track async def update_radio_stream_artwork(self, streamdetails: StreamDetails) -> None: - """Fetch and update radio stream artwork. + """ + Fetch and update radio stream artwork. :param streamdetails: StreamDetails to update with artwork. """ diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index a1fd8e5b70..b63fc283f9 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -287,13 +287,12 @@ def parse_title_and_version( :param title: The title to parse. :param track_version: Optional existing version string. - :param strip_for_search: Aggressively strip for search matching (removes featuring, - version info in brackets, hyphen suffixes like "- Remastered 2019"). - :param strip_for_display: Strip superfluous suffixes like "(Official Video)". + :param strip_for_search: Aggressively strip for search matching. + :param strip_for_display: Strip superfluous suffixes for display. """ version = track_version or "" - # Aggressive search cleaning - strip parentheses/brackets with version/feat keywords + # Strip featuring, bracketed version info, and hyphen suffixes (e.g. "- Remastered 2019") if strip_for_search: title = _SEARCH_PAREN_PATTERN.sub("", title) title = _SEARCH_HYPHEN_PATTERN.sub("", title) @@ -309,7 +308,7 @@ def parse_title_and_version( title = re.sub(r"\s+", " ", title).strip() return title, version - # Display cleaning - just strip video-related suffixes + # Strip video/audio suffixes like "(Official Video)" if strip_for_display: title = _DISPLAY_STRIP_PATTERN.sub("", title).strip() return title, version