diff --git a/music_assistant/controllers/music.py b/music_assistant/controllers/music.py index 2158758aa3..3e314eecb0 100644 --- a/music_assistant/controllers/music.py +++ b/music_assistant/controllers/music.py @@ -801,8 +801,13 @@ async def get_item_by_uri( @api_command("music/recommendations") async def recommendations(self) -> list[RecommendationFolder]: """Get all recommendations.""" + user = get_current_user() + user_provider_filter = user.provider_filter if user else None recommendation_providers = [ - x for x in self.providers if ProviderFeature.RECOMMENDATIONS in x.supported_features + x + for x in self.mass.providers + if ProviderFeature.RECOMMENDATIONS in x.supported_features + and (not user_provider_filter or x.instance_id in user_provider_filter) ] results_per_provider: list[list[RecommendationFolder]] = await asyncio.gather( self._get_default_recommendations(), diff --git a/music_assistant/models/metadata_provider.py b/music_assistant/models/metadata_provider.py index f7cb8c2ada..1045df6d35 100644 --- a/music_assistant/models/metadata_provider.py +++ b/music_assistant/models/metadata_provider.py @@ -9,7 +9,13 @@ from .provider import Provider if TYPE_CHECKING: - from music_assistant_models.media_items import Album, Artist, MediaItemMetadata, Track + from music_assistant_models.media_items import ( + Album, + Artist, + MediaItemMetadata, + RecommendationFolder, + Track, + ) class MetadataProvider(Provider): @@ -49,3 +55,14 @@ async def resolve_image(self, path: str) -> str | bytes: a string with an http(s) URL or local path that is accessible from the server. """ return path + + async def recommendations(self) -> list[RecommendationFolder]: + """ + Get this provider's recommendations. + + Returns an actual (and often personalised) list of recommendations + from this provider for the user/account. + """ + if ProviderFeature.RECOMMENDATIONS in self.supported_features: + raise NotImplementedError + return [] diff --git a/music_assistant/providers/lastfm_recommendations/__init__.py b/music_assistant/providers/lastfm_recommendations/__init__.py new file mode 100644 index 0000000000..742728f095 --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/__init__.py @@ -0,0 +1,303 @@ +"""Last.fm Recommendations music provider for Music Assistant.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.config_entries import ( + ConfigEntry, + ConfigValueOption, + ConfigValueType, +) +from music_assistant_models.enums import ConfigEntryType, ProviderFeature +from music_assistant_models.errors import MusicAssistantError + +from music_assistant.models.metadata_provider import MetadataProvider +from music_assistant.providers.lastfm_recommendations.api_client import LastFMAPIClient +from music_assistant.providers.lastfm_recommendations.mbid_resolver import MBIDResolver +from music_assistant.providers.lastfm_recommendations.recommendations import ( + LastFMRecommendationManager, +) + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.media_items import RecommendationFolder + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + + +SUPPORTED_FEATURES = { + ProviderFeature.RECOMMENDATIONS, +} + +# Config action constants +CONF_ACTION_CLEAR_CACHE = "clear_cache" + +# Curated list of popular countries for Last.fm geo charts +# Last.fm API expects full country names (not ISO codes) +# This list covers major music markets and can be expanded based on user requests +GEO_COUNTRIES = [ + "Argentina", + "Australia", + "Austria", + "Belgium", + "Brazil", + "Canada", + "China", + "Czech Republic", + "Denmark", + "Finland", + "France", + "Germany", + "Greece", + "Hungary", + "Iceland", + "India", + "Ireland", + "Israel", + "Italy", + "Japan", + "Lithuania", + "Mexico", + "Netherlands", + "New Zealand", + "Norway", + "Philippines", + "Poland", + "Portugal", + "Serbia", + "Singapore", + "Slovenia", + "South Africa", + "South Korea", + "Spain", + "Sweden", + "Switzerland", + "Thailand", + "Turkey", + "Ukraine", + "United Arab Emirates", + "United Kingdom", + "United States", +] + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> LastFMRecommendationsProvider: + """Initialize provider(instance) with given configuration.""" + return LastFMRecommendationsProvider(mass, manifest, config, SUPPORTED_FEATURES) + + +async def get_config_entries( + mass: MusicAssistant, + instance_id: str | None = None, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + if action == CONF_ACTION_CLEAR_CACHE and instance_id: + provider = mass.get_provider(instance_id) + if isinstance(provider, LastFMRecommendationsProvider): + await provider.recommendations_manager.clear_cache() + mass.create_task(provider._populate_recommendations()) + + return ( + ConfigEntry( + key="api_key", + type=ConfigEntryType.SECURE_STRING, + label="Last.fm API Key", + required=True, + description="Get your API key from https://www.last.fm/api/account/create", + value=values.get("api_key") if values else None, + ), + ConfigEntry( + key="username", + type=ConfigEntryType.STRING, + label="Last.fm Username", + required=False, + description="Your Last.fm username for genre-based recommendations (optional)", + value=values.get("username") if values else None, + ), + ConfigEntry( + key="refresh_interval", + type=ConfigEntryType.INTEGER, + label="Refresh Interval (hours)", + default_value=6, + description="How often to refresh recommendations (0 to disable automatic refresh)", + category="Recommendations", + range=(0, 168), # 0 to 1 week + ), + ConfigEntry( + key="enable_personalized", + type=ConfigEntryType.BOOLEAN, + label="Enable Personalized Recommendations", + default_value=False, + description=( + "Provide 'Similar Artists' and 'Similar Tracks' rows based on your " + "listening history" + ), + category="Recommendations", + ), + ConfigEntry( + key="enable_global_charts", + type=ConfigEntryType.BOOLEAN, + label="Enable Global Charts", + default_value=False, + description=( + "Provide 'Global Top Artists' and 'Global Top Tracks' rows from " + "Last.fm's worldwide charts" + ), + category="Recommendations", + ), + ConfigEntry( + key="enable_genre", + type=ConfigEntryType.BOOLEAN, + label="Enable Genre Recommendations", + default_value=False, + description=( + "Provide 'Top Artists', 'Top Albums' and 'Top Tracks' rows for your " + "most played genre (requires username)" + ), + category="Recommendations", + ), + ConfigEntry( + key="geo_country", + type=ConfigEntryType.STRING, + label="Country for Geographic Charts", + default_value="Argentina", + description="Select country for geography-based top artists and tracks", + options=[ConfigValueOption(country, country) for country in GEO_COUNTRIES], + category="Recommendations", + ), + ConfigEntry( + key="enable_geo", + type=ConfigEntryType.BOOLEAN, + label="Enable Geographic Charts", + default_value=False, + description=("Provide 'Top Artists' and 'Top Tracks' rows for the selected country"), + category="Recommendations", + ), + ConfigEntry( + key=CONF_ACTION_CLEAR_CACHE, + type=ConfigEntryType.ACTION, + label="Refresh Recommendations", + description=( + "Rebuild recommendations immediately instead of waiting for the next " + "scheduled refresh." + ), + action=CONF_ACTION_CLEAR_CACHE, + action_label="Refresh Now", + category="Recommendations", + advanced=True, + required=False, + ), + ) + + +class LastFMRecommendationsProvider(MetadataProvider): + """Last.fm Recommendations Provider for Music Assistant.""" + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + self.api = LastFMAPIClient(self) + self.mbid_resolver = MBIDResolver(self) + self.recommendations_manager = LastFMRecommendationManager(self) + + self._recommendation_folders: list[RecommendationFolder] = [] + self._recommendations_populated = False + + # Delay the initial populate so other providers (e.g. Spotify) finish loading first; + # without this, resolution fails when no streaming providers are available yet. + self.mass.call_later( + 20, + self._populate_recommendations, + task_id=f"lastfm_recommendations_initial_populate_{self.instance_id}", + ) + + self._schedule_refresh() + + async def _populate_recommendations(self) -> None: + """Populate recommendation folders in the background.""" + try: + self.logger.info("Building Last.fm recommendations") + + # Build folders incrementally so each category appears as soon as it's ready. + personalized_folders = ( + await self.recommendations_manager._get_personalized_recommendations() + ) + if personalized_folders: + self._recommendation_folders.extend(personalized_folders) + self.logger.debug( + "Added %d personalized recommendation folder(s)", len(personalized_folders) + ) + + global_folders = await self.recommendations_manager._get_global_recommendations() + if global_folders: + self._recommendation_folders.extend(global_folders) + self.logger.debug("Added %d global recommendation folder(s)", len(global_folders)) + + genre_folders = await self.recommendations_manager._get_genre_based_recommendations() + if genre_folders: + self._recommendation_folders.extend(genre_folders) + self.logger.debug( + "Added %d genre-based recommendation folder(s)", len(genre_folders) + ) + + geo_folders = await self.recommendations_manager._get_geo_based_recommendations() + if geo_folders: + self._recommendation_folders.extend(geo_folders) + self.logger.debug( + "Added %d geography-based recommendation folder(s)", len(geo_folders) + ) + + self._recommendations_populated = True + self.logger.info( + "Last.fm recommendations built (%d folders)", + len(self._recommendation_folders), + ) + # TODO: signal the frontend to refresh the Discover view once a suitable + # event exists upstream (e.g. a RECOMMENDATIONS_UPDATED EventType). + except MusicAssistantError as err: + self.logger.warning("Failed to populate recommendations: %s", err) + + def _schedule_refresh(self) -> None: + """Schedule the next periodic refresh of recommendations.""" + refresh_interval_value = self.config.get_value("refresh_interval") + if isinstance(refresh_interval_value, (int, float)): + refresh_interval_hours = int(refresh_interval_value) + else: + refresh_interval_hours = 6 + if refresh_interval_hours <= 0: + self.logger.debug("Automatic refresh disabled (interval set to 0)") + return + + refresh_interval_seconds = float(refresh_interval_hours * 3600) + + self.mass.call_later( + refresh_interval_seconds, + self._refresh_recommendations, + task_id=f"lastfm_recommendations_refresh_{self.instance_id}", + ) + self.logger.debug( + "Scheduled next recommendations refresh in %d hours", refresh_interval_hours + ) + + async def _refresh_recommendations(self) -> None: + """Re-populate recommendations and reschedule the next refresh.""" + try: + self.logger.debug("Refreshing Last.fm recommendations (scheduled)") + await self._populate_recommendations() + except MusicAssistantError as err: + self.logger.warning("Failed to refresh recommendations: %s", err) + finally: + self._schedule_refresh() + + async def recommendations(self) -> list[RecommendationFolder]: + """Return this provider's recommendation folders. + + On first call (before background population completes) this returns an empty list. + Subsequent calls return progressively more populated folders. + """ + return self._recommendation_folders diff --git a/music_assistant/providers/lastfm_recommendations/api_client.py b/music_assistant/providers/lastfm_recommendations/api_client.py new file mode 100644 index 0000000000..4705304869 --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/api_client.py @@ -0,0 +1,330 @@ +"""Last.fm API client for recommendations.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aiohttp import ClientError +from music_assistant_models.errors import InvalidDataError + +from music_assistant.helpers.throttle_retry import ThrottlerManager + +if TYPE_CHECKING: + from aiohttp import ClientSession + + from music_assistant.providers.lastfm_recommendations import LastFMRecommendationsProvider + + +class LastFMAPIClient: + """Last.fm API client for fetching recommendations.""" + + BASE_URL = "https://ws.audioscrobbler.com/2.0/" + throttler = ThrottlerManager(rate_limit=5, period=1) # 5 requests per second + + # Last.fm error codes that should be logged as warnings + CRITICAL_ERROR_CODES = { + 4, # Authentication Failed + 10, # Invalid API key + 26, # Suspended API key + 29, # Rate limit exceeded + } + + def __init__(self, provider: LastFMRecommendationsProvider) -> None: + """Initialize Last.fm API client. + + :param provider: The Last.fm recommendations provider instance. + """ + self.provider = provider + self.logger = provider.logger + self.http_session: ClientSession = provider.mass.http_session + + async def _get_data(self, method: str, **params: Any) -> dict[str, Any]: + """Make a request to the Last.fm API. + + :param method: The Last.fm API method to call. + :param params: Additional query parameters. + """ + async with self.throttler.acquire(): + params.update( + { + "method": method, + "api_key": self.provider.config.get_value("api_key"), + "format": "json", + } + ) + + async with self.http_session.get(self.BASE_URL, params=params) as response: + response.raise_for_status() + data: dict[str, Any] = await response.json() + + # Last.fm returns errors in the response body rather than as an HTTP status. + if "error" in data: + error_code = data.get("error", 0) + error_msg = data.get("message", "Unknown error") + + if error_code in self.CRITICAL_ERROR_CODES: + self.logger.warning( + "Last.fm API error %s: %s (method: %s)", + error_code, + error_msg, + method, + ) + else: + self.logger.debug( + "Last.fm API error %s: %s (method: %s)", + error_code, + error_msg, + method, + ) + + msg = f"Last.fm API error {error_code}: {error_msg}" + raise InvalidDataError(msg) + + return data + + async def get_similar_artists( + self, artist_name: str, artist_mbid: str | None = None, limit: int = 10 + ) -> list[dict[str, Any]]: + """Get similar artists from Last.fm. + + :param artist_name: Name of the artist. + :param artist_mbid: Optional MusicBrainz ID for more accurate matching. + :param limit: Maximum number of similar artists to return. + """ + params: dict[str, Any] = {"limit": limit} + + # Prefer MBID for more accurate matching; fall back to name with autocorrect. + if artist_mbid: + params["mbid"] = artist_mbid + else: + params["artist"] = artist_name + params["autocorrect"] = 1 + + try: + self.logger.debug( + "Fetching similar artists for: %s (MBID: %s)", + artist_name, + artist_mbid or "none", + ) + data = await self._get_data("artist.getSimilar", **params) + similar_artists: list[dict[str, Any]] | dict[str, Any] = data.get( + "similarartists", {} + ).get("artist", []) + + # Last.fm returns a single dict when only one result is present. + if isinstance(similar_artists, dict): + return [similar_artists] + + return similar_artists + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_similar_tracks( + self, + artist_name: str, + track_name: str, + track_mbid: str | None = None, + limit: int = 10, + ) -> list[dict[str, Any]]: + """Get similar tracks from Last.fm. + + :param artist_name: Name of the track's artist. + :param track_name: Name of the track. + :param track_mbid: Optional MusicBrainz ID for more accurate matching. + :param limit: Maximum number of similar tracks to return. + """ + params: dict[str, Any] = {"limit": limit} + + # Prefer MBID for more accurate matching; fall back to name with autocorrect. + if track_mbid: + params["mbid"] = track_mbid + else: + params["artist"] = artist_name + params["track"] = track_name + params["autocorrect"] = 1 + + try: + self.logger.debug( + "Fetching similar tracks for: %s - %s (MBID: %s)", + artist_name, + track_name, + track_mbid or "none", + ) + data = await self._get_data("track.getSimilar", **params) + similar_tracks: list[dict[str, Any]] | dict[str, Any] = data.get( + "similartracks", {} + ).get("track", []) + + # Last.fm returns a single dict when only one result is present. + if isinstance(similar_tracks, dict): + return [similar_tracks] + + return similar_tracks + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_chart_top_artists(self, limit: int = 10) -> list[dict[str, Any]]: + """Get global top artists chart from Last.fm. + + :param limit: Maximum number of artists to return. + """ + try: + data = await self._get_data("chart.getTopArtists", limit=limit) + artists: list[dict[str, Any]] | dict[str, Any] = data.get("artists", {}).get( + "artist", [] + ) + + # Last.fm returns a single dict when only one result is present. + if isinstance(artists, dict): + return [artists] + + return artists + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_chart_top_tracks(self, limit: int = 10) -> list[dict[str, Any]]: + """Get global top tracks chart from Last.fm. + + :param limit: Maximum number of tracks to return. + """ + try: + data = await self._get_data("chart.getTopTracks", limit=limit) + tracks: list[dict[str, Any]] | dict[str, Any] = data.get("tracks", {}).get("track", []) + + # Last.fm returns a single dict when only one result is present. + if isinstance(tracks, dict): + return [tracks] + + return tracks + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_user_top_tags(self, username: str, limit: int = 1) -> list[dict[str, Any]]: + """Get a user's top tags from Last.fm. + + :param username: Last.fm username. + :param limit: Maximum number of tags to return (default 1 for top genre). + """ + try: + self.logger.debug("Fetching top tags for user: %s", username) + data = await self._get_data("user.getTopTags", user=username, limit=limit) + tags: list[dict[str, Any]] | dict[str, Any] = data.get("toptags", {}).get("tag", []) + + # Last.fm returns a single dict when only one result is present. + if isinstance(tags, dict): + return [tags] + + return tags + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_tag_top_artists(self, tag: str, limit: int = 10) -> list[dict[str, Any]]: + """Get top artists for a tag from Last.fm. + + :param tag: Tag name (genre). + :param limit: Maximum number of artists to return. + """ + try: + self.logger.debug("Fetching top artists for tag: %s (limit: %d)", tag, limit) + data = await self._get_data("tag.getTopArtists", tag=tag, limit=limit) + artists: list[dict[str, Any]] | dict[str, Any] = data.get("topartists", {}).get( + "artist", [] + ) + + # Last.fm returns a single dict when only one result is present. + if isinstance(artists, dict): + return [artists] + + return artists + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_tag_top_albums(self, tag: str, limit: int = 10) -> list[dict[str, Any]]: + """Get top albums for a tag from Last.fm. + + :param tag: Tag name (genre). + :param limit: Maximum number of albums to return. + """ + try: + self.logger.debug("Fetching top albums for tag: %s (limit: %d)", tag, limit) + data = await self._get_data("tag.getTopAlbums", tag=tag, limit=limit) + albums: list[dict[str, Any]] | dict[str, Any] = data.get("albums", {}).get("album", []) + + # Last.fm returns a single dict when only one result is present. + if isinstance(albums, dict): + return [albums] + + return albums + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_tag_top_tracks(self, tag: str, limit: int = 10) -> list[dict[str, Any]]: + """Get top tracks for a tag from Last.fm. + + :param tag: Tag name (genre). + :param limit: Maximum number of tracks to return. + """ + try: + self.logger.debug("Fetching top tracks for tag: %s (limit: %d)", tag, limit) + data = await self._get_data("tag.getTopTracks", tag=tag, limit=limit) + tracks: list[dict[str, Any]] | dict[str, Any] = data.get("tracks", {}).get("track", []) + + # Last.fm returns a single dict when only one result is present. + if isinstance(tracks, dict): + return [tracks] + + return tracks + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_geo_top_artists(self, country: str, limit: int = 10) -> list[dict[str, Any]]: + """Get top artists for a country from Last.fm. + + :param country: Country name (e.g., "United States", "Spain"). + :param limit: Maximum number of artists to return. + """ + try: + self.logger.debug( + "Fetching geo top artists for country: %s (limit: %d)", country, limit + ) + data = await self._get_data("geo.getTopArtists", country=country, limit=limit) + artists: list[dict[str, Any]] | dict[str, Any] = data.get("topartists", {}).get( + "artist", [] + ) + + # Last.fm returns a single dict when only one result is present. + if isinstance(artists, dict): + return [artists] + + return artists + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] + + async def get_geo_top_tracks(self, country: str, limit: int = 10) -> list[dict[str, Any]]: + """Get top tracks for a country from Last.fm. + + :param country: Country name (e.g., "United States", "Spain"). + :param limit: Maximum number of tracks to return. + """ + try: + self.logger.debug("Fetching geo top tracks for country: %s (limit: %d)", country, limit) + data = await self._get_data("geo.getTopTracks", country=country, limit=limit) + tracks: list[dict[str, Any]] | dict[str, Any] = data.get("tracks", {}).get("track", []) + + # Last.fm returns a single dict when only one result is present. + if isinstance(tracks, dict): + return [tracks] + + return tracks + + except (TimeoutError, ClientError, InvalidDataError, KeyError): + return [] diff --git a/music_assistant/providers/lastfm_recommendations/constants.py b/music_assistant/providers/lastfm_recommendations/constants.py new file mode 100644 index 0000000000..8fac2e0713 --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/constants.py @@ -0,0 +1,53 @@ +"""Constants for Last.fm Recommendations Provider.""" + +from __future__ import annotations + +# Cache settings +# Cache category for resolved Artist/Album/Track objects +CACHE_CATEGORY_RESOLVED_ITEMS = 1 + +# Expiration time for cached resolved items (in seconds) +CACHE_EXPIRATION_SECONDS = 60 * 60 * 24 * 90 # 90 days + +# Concurrency limits +# Maximum number of concurrent provider searches to prevent overwhelming APIs +SEARCH_CONCURRENCY_LIMIT = 5 + +# Item counts and limits +# Target number of items to return in recommendation folders +TARGET_ITEM_COUNT = 10 + +# Number of items to fetch when we expect some resolution failures (small buffer) +RESOLUTION_BUFFER_SMALL = 15 + +# Number of items to fetch when we expect many resolution failures (large buffer) +RESOLUTION_BUFFER_LARGE = 30 + +# Number of top items to always include when sampling (before random selection) +TOP_ITEMS_TO_TAKE = 3 + +# Number of similar items to fetch before filtering to target count +SIMILAR_ITEMS_BUFFER = 12 + +# Number of similar items to fetch for each seed artist/track +SIMILAR_ITEMS_PER_SEED = 3 + +# Number of top artists to use as seeds for personalized recommendations +TOP_ARTISTS_LIMIT = 5 + +# Number of top tracks to use as seeds for personalized recommendations +TOP_TRACKS_LIMIT = 5 + +# Number of top tags to fetch for genre-based recommendations +TOP_TAGS_LIMIT = 1 + +# API search settings +# Search limit for provider API calls (workaround for Spotify API bug with limit=1) +PROVIDER_SEARCH_LIMIT = 2 + +# Image processing +# Priority order for selecting Last.fm images (largest to smallest) +IMAGE_SIZE_PRIORITY = ["mega", "extralarge", "large", "medium", "small"] + +# Suffix used to identify Last.fm placeholder images to filter out +IMAGE_PLACEHOLDER_SUFFIX = "/default.png" diff --git a/music_assistant/providers/lastfm_recommendations/icon.svg b/music_assistant/providers/lastfm_recommendations/icon.svg new file mode 100644 index 0000000000..d4e508951b --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/icon.svg @@ -0,0 +1,37 @@ + + + + + + diff --git a/music_assistant/providers/lastfm_recommendations/manifest.json b/music_assistant/providers/lastfm_recommendations/manifest.json new file mode 100644 index 0000000000..053d511fe6 --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/manifest.json @@ -0,0 +1,11 @@ +{ + "type": "metadata", + "domain": "lastfm_recommendations", + "name": "Last.fm Recommendations", + "description": "Get music recommendations from Last.fm based on your listening history", + "codeowners": [ + "ozGav" + ], + "stage": "beta", + "documentation": "https://music-assistant.io/music-providers/lastfm_recommendations" +} diff --git a/music_assistant/providers/lastfm_recommendations/mbid_resolver.py b/music_assistant/providers/lastfm_recommendations/mbid_resolver.py new file mode 100644 index 0000000000..21d1aea9d0 --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/mbid_resolver.py @@ -0,0 +1,81 @@ +"""MBID to ISRC resolver using MusicBrainz provider.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +from aiohttp import ClientError +from music_assistant_models.errors import InvalidDataError, ProviderUnavailableError + +if TYPE_CHECKING: + from music_assistant.providers.lastfm_recommendations import LastFMRecommendationsProvider + from music_assistant.providers.musicbrainz import MusicbrainzProvider + + +CACHE_CATEGORY_MBID_ISRC = 0 # Cache category for MBID->ISRC mappings + + +class MBIDResolver: + """Resolves MusicBrainz recording IDs to ISRCs.""" + + # 90 days: ISRC mappings rarely change, and this sits on top of MusicBrainz's own 30-day cache. + CACHE_EXPIRATION = 86400 * 90 + + def __init__(self, provider: LastFMRecommendationsProvider) -> None: + """Initialize MBID resolver. + + :param provider: The Last.fm recommendations provider instance. + """ + self.provider = provider + self.mass = provider.mass + self.logger = provider.logger + + async def get_isrcs_for_recording(self, mbid: str) -> list[str]: + """Get ISRCs for a recording MBID via MusicBrainz. + + :param mbid: MusicBrainz recording ID. + """ + cache_key = f"recording_{mbid}" + + cached = await self.mass.cache.get( + key=cache_key, + category=CACHE_CATEGORY_MBID_ISRC, + provider=self.provider.instance_id, + ) + + if cached is not None: + return cast("list[str]", cached.get("isrcs", [])) + + mb_provider = self.mass.get_provider("musicbrainz") + if not mb_provider: + msg = "MusicBrainz provider not available" + raise ProviderUnavailableError(msg) + + try: + recording = await cast("MusicbrainzProvider", mb_provider).get_recording_details(mbid) + + isrcs = recording.isrcs if recording and recording.isrcs else [] + + # Cache empty results too, to avoid repeated failed lookups. + await self.mass.cache.set( + key=cache_key, + data={"isrcs": isrcs}, + category=CACHE_CATEGORY_MBID_ISRC, + provider=self.provider.instance_id, + expiration=self.CACHE_EXPIRATION, + ) + + return isrcs + + except (TimeoutError, ClientError, AttributeError, InvalidDataError) as err: + self.logger.debug("Failed to get ISRCs for MBID %s: %s", mbid, type(err).__name__) + + await self.mass.cache.set( + key=cache_key, + data={"isrcs": []}, + category=CACHE_CATEGORY_MBID_ISRC, + provider=self.provider.instance_id, + expiration=self.CACHE_EXPIRATION, + ) + + return [] diff --git a/music_assistant/providers/lastfm_recommendations/parsers.py b/music_assistant/providers/lastfm_recommendations/parsers.py new file mode 100644 index 0000000000..202b51f7c5 --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/parsers.py @@ -0,0 +1,419 @@ +"""Parsers to convert Last.fm API responses to Music Assistant media items.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING, Any, cast + +from music_assistant_models.enums import ExternalID, ImageType, MediaType, ProviderFeature +from music_assistant_models.errors import MusicAssistantError +from music_assistant_models.media_items import Album, Artist, ItemMapping, MediaItemImage, Track + +from music_assistant.constants import MASS_LOGGER_NAME +from music_assistant.helpers.compare import compare_strings +from music_assistant.providers.lastfm_recommendations.constants import ( + IMAGE_PLACEHOLDER_SUFFIX, + IMAGE_SIZE_PRIORITY, + PROVIDER_SEARCH_LIMIT, + SEARCH_CONCURRENCY_LIMIT, +) + +if TYPE_CHECKING: + from music_assistant import MusicAssistant + from music_assistant.controllers.media.albums import AlbumsController + from music_assistant.controllers.media.artists import ArtistsController + from music_assistant.controllers.media.tracks import TracksController + from music_assistant.providers.lastfm_recommendations.mbid_resolver import MBIDResolver + +LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.lastfm_recommendations") + +# Limit concurrent provider searches to avoid overwhelming their APIs. +_SEARCH_SEMAPHORE = asyncio.Semaphore(SEARCH_CONCURRENCY_LIMIT) + + +def _has_matching_external_ids( + item_mapping: ItemMapping, media_item: Artist | Album | Track +) -> bool: + """Return True if the ItemMapping shares any external IDs with the media item. + + :param item_mapping: ItemMapping with external IDs from Last.fm. + :param media_item: Artist or Track to compare against. + """ + if not item_mapping.external_ids: + return False + + return bool(item_mapping.external_ids & media_item.external_ids) + + +def _extract_image_url(image_array: list[dict[str, Any]]) -> str | None: + """Extract the largest available image URL from Last.fm's image array. + + :param image_array: List of image dicts from Last.fm API. + """ + if not image_array: + return None + + for size in IMAGE_SIZE_PRIORITY: + for img in image_array: + if img.get("size") == size and img.get("#text"): + url = str(img["#text"]).strip() + if url and not url.endswith(IMAGE_PLACEHOLDER_SUFFIX): + return url + + return None + + +def _get_streaming_providers( + mass: MusicAssistant, item_mapping: ItemMapping, provider_instance_to_skip: str +) -> list[Any]: + """Return streaming providers that support the ItemMapping's media type. + + :param mass: MusicAssistant instance. + :param item_mapping: ItemMapping with the media type to search for. + :param provider_instance_to_skip: Provider instance to skip (ourselves). + """ + streaming_providers = [] + for p in mass.music.providers: + if p.instance_id == provider_instance_to_skip: + continue + if not p.is_streaming_provider: + continue + + if item_mapping.media_type == MediaType.ARTIST: + if ProviderFeature.LIBRARY_ARTISTS not in p.supported_features: + continue + elif item_mapping.media_type == MediaType.ALBUM: + if ProviderFeature.LIBRARY_ALBUMS not in p.supported_features: + continue + elif item_mapping.media_type == MediaType.TRACK: + if ProviderFeature.LIBRARY_TRACKS not in p.supported_features: + continue + + streaming_providers.append(p) + return streaming_providers + + +async def _search_provider( + ctrl: ArtistsController | AlbumsController | TracksController, + item_mapping: ItemMapping, + provider: Any, +) -> Artist | Album | Track | None: + """Search a single provider for a matching item. + + :param ctrl: Controller for the media type. + :param item_mapping: ItemMapping to search for. + :param provider: Provider instance to search. + """ + async with _SEARCH_SEMAPHORE: + try: + LOGGER.debug( + "Searching %s on %s for: %s", + item_mapping.media_type.value, + provider.name, + item_mapping.name, + ) + # Use a higher limit to work around provider bugs (e.g. Spotify misbehaves at limit=1). + search_results = await ctrl.search( + item_mapping.name, provider.instance_id, limit=PROVIDER_SEARCH_LIMIT + ) + if not search_results: + return None + + return search_results[0] + except MusicAssistantError as err: + LOGGER.debug("Provider %s search failed: %s", provider.name, type(err).__name__) + return None + + +async def _search_providers_concurrent( + ctrl: ArtistsController | AlbumsController | TracksController, + item_mapping: ItemMapping, + providers: list[Any], + require_external_id_match: bool, +) -> Artist | Album | Track | None: + """Search multiple providers concurrently and return the best match. + + :param ctrl: Controller for the media type. + :param item_mapping: ItemMapping to search for. + :param providers: List of providers to search. + :param require_external_id_match: If True, prefer matches on external IDs and reject + results whose external IDs contradict the ItemMapping. + """ + tasks = [ + asyncio.create_task(_search_provider(ctrl, item_mapping, provider)) + for provider in providers + ] + + fallback_result = None + + for task in asyncio.as_completed(tasks): + result = await task + if result is None: + continue + + if not require_external_id_match: + names_match = compare_strings(item_mapping.name, result.name, strict=False) + + # Album/track search names may be "Artist - Title" while results expose only "Title". + if not names_match and " - " in item_mapping.name: + title_part = item_mapping.name.split(" - ", 1)[1] + names_match = compare_strings(title_part, result.name, strict=False) + + if names_match: + LOGGER.debug( + "Name match on %s: %s (searched: %s)", + result.provider, + result.name, + item_mapping.name, + ) + for t in tasks: + if not t.done(): + t.cancel() + return result + + LOGGER.debug( + "Rejecting %s from %s: name mismatch (searched: %s)", + result.name, + result.provider, + item_mapping.name, + ) + if not fallback_result: + fallback_result = result + continue + + if _has_matching_external_ids(item_mapping, result): + LOGGER.debug( + "External ID match on %s: %s", + result.provider, + result.name, + ) + for t in tasks: + if not t.done(): + t.cancel() + return result + + # A result that exposes external IDs of a matching type but none that match is a + # different item; only consider results without such IDs as a name-based fallback. + result_has_external_ids = any( + ext_id[0] in {ext_id_check[0] for ext_id_check in item_mapping.external_ids} + for ext_id in result.external_ids + ) + + if result_has_external_ids: + LOGGER.debug( + "Rejecting %s from %s: has external IDs but they don't match", + result.name, + result.provider, + ) + elif not fallback_result: + names_match = compare_strings(item_mapping.name, result.name, strict=False) + + if not names_match and " - " in item_mapping.name: + title_part = item_mapping.name.split(" - ", 1)[1] + names_match = compare_strings(title_part, result.name, strict=False) + + if names_match: + LOGGER.debug( + "Saving %s from %s as fallback (no external IDs to verify)", + result.name, + result.provider, + ) + fallback_result = result + else: + LOGGER.debug( + "Not saving %s from %s as fallback: name mismatch", + result.name, + result.provider, + ) + + if fallback_result: + LOGGER.debug("No external ID matches found, using fallback result") + return fallback_result + + +async def _resolve_item( + item_mapping: ItemMapping, mass: MusicAssistant, provider_instance_to_skip: str +) -> Artist | Album | Track | None: + """Resolve an ItemMapping to a library or provider item. + + :param item_mapping: ItemMapping with metadata and external IDs from Last.fm. + :param mass: MusicAssistant instance. + :param provider_instance_to_skip: Provider instance to skip (ourselves). + """ + ctrl: ArtistsController | AlbumsController | TracksController + if item_mapping.media_type == MediaType.ARTIST: + ctrl = mass.music.artists + elif item_mapping.media_type == MediaType.ALBUM: + ctrl = mass.music.albums + elif item_mapping.media_type == MediaType.TRACK: + ctrl = mass.music.tracks + else: + return None + + LOGGER.debug( + "Resolving %s: %s (external IDs: %s)", + item_mapping.media_type.value, + item_mapping.name, + item_mapping.external_ids or "none", + ) + + if library_item := await ctrl.get_library_item_by_external_ids(item_mapping.external_ids): + LOGGER.debug("Found %s in library: %s", item_mapping.media_type.value, library_item.name) + return library_item + + streaming_providers = _get_streaming_providers(mass, item_mapping, provider_instance_to_skip) + if not streaming_providers: + LOGGER.debug("No streaming providers available for resolution") + return None + + # Streaming providers only expose ISRCs for tracks; for artists/albums rely on name matching. + require_external_id_match = False + if item_mapping.media_type == MediaType.TRACK: + has_isrc = any(ext_id[0] == ExternalID.ISRC for ext_id in item_mapping.external_ids) + if has_isrc: + require_external_id_match = True + + result = await _search_providers_concurrent( + ctrl, item_mapping, streaming_providers, require_external_id_match + ) + if result is None: + LOGGER.debug("Could not resolve %s: %s", item_mapping.media_type.value, item_mapping.name) + return result + + +async def parse_artist( + lastfm_artist: dict[str, Any], mass: MusicAssistant, provider_instance: str +) -> Artist | None: + """Parse a Last.fm artist and resolve it to a library or provider Artist. + + :param lastfm_artist: Raw Last.fm artist dict with 'name' and 'mbid' fields. + :param mass: MusicAssistant instance for accessing library and providers. + :param provider_instance: Provider instance ID to skip when searching. + """ + name = lastfm_artist.get("name", "Unknown Artist") + mbid = lastfm_artist.get("mbid") + + external_ids = set() + if mbid: + external_ids.add((ExternalID.MB_ARTIST, mbid)) + + image = None + if image_url := _extract_image_url(lastfm_artist.get("image", [])): + image = MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider="lastfm", + ) + + item_mapping = ItemMapping( + media_type=MediaType.ARTIST, + item_id="temp", + provider="lastfm_recommendations", + name=name, + external_ids=external_ids, + image=image, + ) + + return cast("Artist | None", await _resolve_item(item_mapping, mass, provider_instance)) + + +async def parse_track( + lastfm_track: dict[str, Any], + mbid_resolver: MBIDResolver, + mass: MusicAssistant, + provider_instance: str, +) -> Track | None: + """Parse a Last.fm track and resolve it to a library or provider Track. + + :param lastfm_track: Raw Last.fm track dict with 'name', 'artist', 'mbid', 'duration'. + :param mbid_resolver: MBID resolver instance for ISRC lookups. + :param mass: MusicAssistant instance for accessing library and providers. + :param provider_instance: Provider instance ID to skip when searching. + """ + name = lastfm_track.get("name", "Unknown Track") + mbid = lastfm_track.get("mbid") + + artist_data = lastfm_track.get("artist", {}) + if isinstance(artist_data, str): + artist_name = artist_data + else: + artist_name = artist_data.get("name", "Unknown Artist") + + external_ids = set() + + if mbid: + external_ids.add((ExternalID.MB_RECORDING, mbid)) + + # Streaming providers match tracks on ISRC, so enrich MBIDs with ISRCs where possible. + LOGGER.debug("Resolving MBID %s to ISRCs via MusicBrainz", mbid) + isrcs = await mbid_resolver.get_isrcs_for_recording(mbid) + if isrcs: + LOGGER.debug("Found %d ISRCs for MBID %s: %s", len(isrcs), mbid, isrcs) + for isrc in isrcs: + external_ids.add((ExternalID.ISRC, isrc)) + else: + LOGGER.debug("No ISRCs found for MBID %s", mbid) + else: + LOGGER.debug("Track has no MBID, cannot resolve ISRCs") + + image = None + if image_url := _extract_image_url(lastfm_track.get("image", [])): + image = MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider="lastfm", + ) + + item_mapping = ItemMapping( + media_type=MediaType.TRACK, + item_id="temp", + provider="lastfm_recommendations", + name=f"{artist_name} - {name}", + external_ids=external_ids, + image=image, + ) + + return cast("Track | None", await _resolve_item(item_mapping, mass, provider_instance)) + + +async def parse_album( + lastfm_album: dict[str, Any], mass: MusicAssistant, provider_instance: str +) -> Album | None: + """Parse a Last.fm album and resolve it to a library or provider Album. + + :param lastfm_album: Raw Last.fm album dict with 'name', 'artist', 'mbid'. + :param mass: MusicAssistant instance for accessing library and providers. + :param provider_instance: Provider instance ID to skip when searching. + """ + name = lastfm_album.get("name", "Unknown Album") + mbid = lastfm_album.get("mbid") + + artist_data = lastfm_album.get("artist", {}) + if isinstance(artist_data, str): + artist_name = artist_data + else: + artist_name = artist_data.get("name", "Unknown Artist") + + external_ids = set() + if mbid: + external_ids.add((ExternalID.MB_ALBUM, mbid)) + + image = None + if image_url := _extract_image_url(lastfm_album.get("image", [])): + image = MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider="lastfm", + ) + + item_mapping = ItemMapping( + media_type=MediaType.ALBUM, + item_id="temp", + provider="lastfm_recommendations", + name=f"{artist_name} - {name}", + external_ids=external_ids, + image=image, + ) + + return cast("Album | None", await _resolve_item(item_mapping, mass, provider_instance)) diff --git a/music_assistant/providers/lastfm_recommendations/recommendations.py b/music_assistant/providers/lastfm_recommendations/recommendations.py new file mode 100644 index 0000000000..1d43c06ba4 --- /dev/null +++ b/music_assistant/providers/lastfm_recommendations/recommendations.py @@ -0,0 +1,792 @@ +"""Recommendation logic for Last.fm.""" + +from __future__ import annotations + +import asyncio +import datetime +import random +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import ExternalID, MediaType +from music_assistant_models.media_items import ( + Album, + Artist, + RecommendationFolder, + Track, + UniqueList, +) + +from music_assistant.providers.lastfm_recommendations.constants import ( + CACHE_CATEGORY_RESOLVED_ITEMS, + CACHE_EXPIRATION_SECONDS, + RESOLUTION_BUFFER_LARGE, + RESOLUTION_BUFFER_SMALL, + SIMILAR_ITEMS_BUFFER, + SIMILAR_ITEMS_PER_SEED, + TARGET_ITEM_COUNT, + TOP_ARTISTS_LIMIT, + TOP_ITEMS_TO_TAKE, + TOP_TAGS_LIMIT, + TOP_TRACKS_LIMIT, +) +from music_assistant.providers.lastfm_recommendations.parsers import ( + parse_album, + parse_artist, + parse_track, +) + +if TYPE_CHECKING: + from music_assistant.providers.lastfm_recommendations import LastFMRecommendationsProvider + + +class LastFMRecommendationManager: + """Manages Last.fm recommendations.""" + + def __init__(self, provider: LastFMRecommendationsProvider) -> None: + """Initialize recommendation manager. + + :param provider: The Last.fm recommendations provider instance. + """ + self.provider = provider + self.api = provider.api + self.mbid_resolver = provider.mbid_resolver + self.logger = provider.logger + self.mass = provider.mass + + # Resolved items keyed by MBID (preferred) or name to avoid re-resolving. + self._resolved_cache: dict[str, Artist | Album | Track] = {} + + async def clear_cache(self) -> None: + """Clear in-memory and persistent recommendation caches.""" + self._resolved_cache.clear() + + await self.mass.cache.clear( + category_filter=CACHE_CATEGORY_RESOLVED_ITEMS, + provider_filter=self.provider.instance_id, + ) + + self.provider._recommendation_folders.clear() + self.provider._recommendations_populated = False + + self.logger.info("Cleared all recommendation caches (in-memory and persistent)") + + async def _is_in_library(self, item_data: dict[str, Any], media_type: MediaType) -> bool: + """Return True if the Last.fm item already exists in the MA library. + + :param item_data: Raw Last.fm item data (artist, album, or track dict). + :param media_type: Type of media item to check. + """ + # MBID lookup is the most reliable; fall back to name search for items without MBID. + mbid = item_data.get("mbid") + if mbid: + if media_type == MediaType.ARTIST: + if await self.mass.music.artists.get_library_item_by_external_id( + mbid, ExternalID.MB_ARTIST + ): + return True + elif media_type == MediaType.ALBUM: + if await self.mass.music.albums.get_library_item_by_external_id( + mbid, ExternalID.MB_ALBUM + ): + return True + elif media_type == MediaType.TRACK: + if await self.mass.music.tracks.get_library_item_by_external_id( + mbid, ExternalID.MB_RECORDING + ): + return True + + if media_type == MediaType.ARTIST: + name = item_data.get("name", "") + if name: + artist_results = await self.mass.music.artists.library_items(search=name, limit=1) + return len(artist_results) > 0 + + elif media_type == MediaType.ALBUM: + name = item_data.get("name", "") + if name: + album_results = await self.mass.music.albums.library_items(search=name, limit=1) + return len(album_results) > 0 + + elif media_type == MediaType.TRACK: + artist_info = item_data.get("artist", {}) + artist_name = ( + artist_info if isinstance(artist_info, str) else artist_info.get("name", "") + ) + track_name = item_data.get("name", "") + if track_name and artist_name: + search_query = f"{artist_name} {track_name}" + track_results = await self.mass.music.tracks.library_items( + search=search_query, limit=1 + ) + return len(track_results) > 0 + + return False + + def _sample_items( + self, items: list[dict[str, Any]], seed_suffix: str, target_count: int = TARGET_ITEM_COUNT + ) -> list[dict[str, Any]]: + """Sample items using a 'top N + random remainder' strategy with a daily seed. + + :param items: List of items to sample from (already filtered). + :param seed_suffix: Unique suffix for random seed (to vary between recommendation types). + :param target_count: Target number of items to return. + """ + if len(items) <= target_count: + return items + + top_items = items[:TOP_ITEMS_TO_TAKE] + + remaining = items[TOP_ITEMS_TO_TAKE:] + random_count = target_count - TOP_ITEMS_TO_TAKE + + # Daily seed keeps recommendations stable within the day and rotates them overnight. + seed = f"{datetime.datetime.now(tz=datetime.UTC).date().isoformat()}_{seed_suffix}" + random.seed(seed) + random_items = random.sample(remaining, min(random_count, len(remaining))) + + return top_items + random_items + + async def _get_or_resolve_artist(self, lastfm_artist: dict[str, Any]) -> Artist | None: + """Return an Artist from cache (in-memory or persistent) or resolve and cache it. + + :param lastfm_artist: Raw Last.fm artist dict. + """ + cache_key = lastfm_artist.get("mbid") or lastfm_artist.get("name", "") + if not cache_key: + return None + + if cache_key in self._resolved_cache: + cached = self._resolved_cache[cache_key] + if isinstance(cached, Artist): + return cached + + persistent_cache_key = f"artist_{cache_key}" + cached_artist = await self.mass.cache.get( + key=persistent_cache_key, + category=CACHE_CATEGORY_RESOLVED_ITEMS, + provider=self.provider.instance_id, + base_class=Artist, + ) + if isinstance(cached_artist, Artist): + self._resolved_cache[cache_key] = cached_artist + return cached_artist + + artist = await parse_artist(lastfm_artist, self.mass, self.provider.instance_id) + if artist: + self._resolved_cache[cache_key] = artist + await self.mass.cache.set( + persistent_cache_key, + artist.to_dict(), + category=CACHE_CATEGORY_RESOLVED_ITEMS, + provider=self.provider.instance_id, + expiration=CACHE_EXPIRATION_SECONDS, + ) + return artist + + async def _get_or_resolve_track(self, lastfm_track: dict[str, Any]) -> Track | None: + """Return a Track from cache (in-memory or persistent) or resolve and cache it. + + :param lastfm_track: Raw Last.fm track dict. + """ + cache_key = lastfm_track.get("mbid") + if not cache_key: + artist_data = lastfm_track.get("artist", {}) + artist_name = ( + artist_data if isinstance(artist_data, str) else artist_data.get("name", "") + ) + track_name = lastfm_track.get("name", "") + cache_key = f"{artist_name}_{track_name}" if artist_name and track_name else "" + + if not cache_key: + return None + + if cache_key in self._resolved_cache: + cached = self._resolved_cache[cache_key] + if isinstance(cached, Track): + return cached + + persistent_cache_key = f"track_{cache_key}" + cached_track = await self.mass.cache.get( + key=persistent_cache_key, + category=CACHE_CATEGORY_RESOLVED_ITEMS, + provider=self.provider.instance_id, + base_class=Track, + ) + if isinstance(cached_track, Track): + self._resolved_cache[cache_key] = cached_track + return cached_track + + track = await parse_track( + lastfm_track, self.mbid_resolver, self.mass, self.provider.instance_id + ) + if track: + self._resolved_cache[cache_key] = track + await self.mass.cache.set( + persistent_cache_key, + track.to_dict(), + category=CACHE_CATEGORY_RESOLVED_ITEMS, + provider=self.provider.instance_id, + expiration=CACHE_EXPIRATION_SECONDS, + ) + return track + + async def _get_or_resolve_album(self, lastfm_album: dict[str, Any]) -> Album | None: + """Return an Album from cache (in-memory or persistent) or resolve and cache it. + + :param lastfm_album: Raw Last.fm album dict. + """ + cache_key = lastfm_album.get("mbid") + if not cache_key: + artist_data = lastfm_album.get("artist", {}) + artist_name = ( + artist_data if isinstance(artist_data, str) else artist_data.get("name", "") + ) + album_name = lastfm_album.get("name", "") + cache_key = f"{artist_name}_{album_name}" if artist_name and album_name else "" + + if not cache_key: + return None + + if cache_key in self._resolved_cache: + cached = self._resolved_cache[cache_key] + if isinstance(cached, Album): + return cached + + persistent_cache_key = f"album_{cache_key}" + cached_album = await self.mass.cache.get( + key=persistent_cache_key, + category=CACHE_CATEGORY_RESOLVED_ITEMS, + provider=self.provider.instance_id, + base_class=Album, + ) + if isinstance(cached_album, Album): + self._resolved_cache[cache_key] = cached_album + return cached_album + + album = await parse_album(lastfm_album, self.mass, self.provider.instance_id) + if album: + self._resolved_cache[cache_key] = album + await self.mass.cache.set( + persistent_cache_key, + album.to_dict(), + category=CACHE_CATEGORY_RESOLVED_ITEMS, + provider=self.provider.instance_id, + expiration=CACHE_EXPIRATION_SECONDS, + ) + return album + + async def get_recommendations(self) -> list[RecommendationFolder]: + """Return all recommendation folders for this provider. + + Individual category methods return empty lists on failure, so errors should not + bubble up here; if they do it indicates a bug. + """ + folders: list[RecommendationFolder] = [] + + folders.extend(await self._get_personalized_recommendations()) + folders.extend(await self._get_global_recommendations()) + folders.extend(await self._get_genre_based_recommendations()) + folders.extend(await self._get_geo_based_recommendations()) + + return folders + + async def _get_personalized_recommendations(self) -> list[RecommendationFolder]: + """Return personalized recommendation folders based on the user's listening history.""" + folders: list[RecommendationFolder] = [] + + if not self.provider.config.get_value("enable_personalized"): + return folders + + # TODO: evaluate recent play history (e.g. last_played, last 7 days) instead of all-time + # play_count, possibly weighted. Needs user feedback. + + top_artists = await self.mass.music.artists.library_items( + limit=TOP_ARTISTS_LIMIT, order_by="play_count_desc" + ) + + if top_artists: + similar_artists = await self._get_similar_artists_from_seeds(top_artists) + + if similar_artists: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_similar_artists", + name="Discover Similar Artists", + provider=self.provider.instance_id, + items=UniqueList(similar_artists[:TARGET_ITEM_COUNT]), + subtitle=f"Based on your top {len(top_artists)} artists", + icon="mdi-account-music-outline", + ) + ) + + top_tracks = await self.mass.music.tracks.library_items( + limit=TOP_TRACKS_LIMIT, order_by="play_count_desc" + ) + + if top_tracks: + similar_tracks = await self._get_similar_tracks_from_seeds(top_tracks) + + if similar_tracks: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_similar_tracks", + name="Discover Similar Tracks", + provider=self.provider.instance_id, + items=UniqueList(similar_tracks[:TARGET_ITEM_COUNT]), + subtitle=f"Based on your top {len(top_tracks)} tracks", + icon="mdi-music-note-outline", + ) + ) + + return folders + + async def _get_global_recommendations(self) -> list[RecommendationFolder]: + """Return global chart recommendation folders (worldwide top artists and tracks).""" + folders: list[RecommendationFolder] = [] + + if not self.provider.config.get_value("enable_global_charts"): + return folders + + # Over-fetch so deduplication and resolution failures still leave TARGET_ITEM_COUNT. + top_artists_raw = await self.api.get_chart_top_artists(limit=RESOLUTION_BUFFER_SMALL) + if top_artists_raw: + resolved_artists = await asyncio.gather( + *[self._get_or_resolve_artist(artist_data) for artist_data in top_artists_raw] + ) + all_resolved = [artist for artist in resolved_artists if artist is not None] + deduplicated = list(UniqueList(all_resolved))[:TARGET_ITEM_COUNT] + top_artists = UniqueList(deduplicated) + + if len(top_artists) < TARGET_ITEM_COUNT: + self.logger.debug( + "Global Top Artists: only %d/%d items after resolution " + "(requested %d, resolved %d)", + len(top_artists), + TARGET_ITEM_COUNT, + RESOLUTION_BUFFER_SMALL, + len([a for a in resolved_artists if a is not None]), + ) + + if top_artists: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_chart_top_artists", + name="Global Top Artists", + provider=self.provider.instance_id, + items=UniqueList(top_artists), + subtitle="Most popular artists worldwide", + icon="mdi-chart-line", + ) + ) + + top_tracks_raw = await self.api.get_chart_top_tracks(limit=RESOLUTION_BUFFER_SMALL) + if top_tracks_raw: + resolved_tracks = await asyncio.gather( + *[self._get_or_resolve_track(track_data) for track_data in top_tracks_raw] + ) + all_resolved_tracks = [track for track in resolved_tracks if track is not None] + deduplicated_tracks = list(UniqueList(all_resolved_tracks))[:TARGET_ITEM_COUNT] + top_tracks = UniqueList(deduplicated_tracks) + + if len(top_tracks) < TARGET_ITEM_COUNT: + self.logger.debug( + "Global Top Tracks: only %d/%d items after resolution " + "(requested %d, resolved %d)", + len(top_tracks), + TARGET_ITEM_COUNT, + RESOLUTION_BUFFER_SMALL, + len(all_resolved_tracks), + ) + + if top_tracks: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_chart_top_tracks", + name="Global Top Tracks", + provider=self.provider.instance_id, + items=UniqueList(top_tracks), + subtitle="Most popular tracks worldwide", + icon="mdi-chart-box", + ) + ) + + return folders + + async def _get_genre_based_recommendations(self) -> list[RecommendationFolder]: + """Return genre-based recommendation folders derived from the user's top Last.fm tag. + + Requires a username to be configured. + """ + folders: list[RecommendationFolder] = [] + + if not self.provider.config.get_value("enable_genre"): + return folders + + username = self.provider.config.get_value("username") + if not username or not isinstance(username, str): + return folders + + top_tags = await self.api.get_user_top_tags(username, limit=TOP_TAGS_LIMIT) + if not top_tags: + return folders + + tag_name = top_tags[0].get("name") + if not tag_name: + return folders + + # Over-fetch so there's enough left after library filtering and resolution failures. + genre_artists_raw = await self.api.get_tag_top_artists( + tag_name, limit=RESOLUTION_BUFFER_LARGE + ) + if genre_artists_raw: + # Drop items already in the library using a cheap DB lookup, before the + # expensive MusicBrainz + provider resolution step. + non_library_artists_raw = [ + artist_data + for artist_data in genre_artists_raw + if not await self._is_in_library(artist_data, MediaType.ARTIST) + ] + + sampled_artists_raw = self._sample_items( + non_library_artists_raw, + seed_suffix="genre_artists", + target_count=RESOLUTION_BUFFER_SMALL, + ) + + resolved_artists = await asyncio.gather( + *[self._get_or_resolve_artist(artist_data) for artist_data in sampled_artists_raw] + ) + all_resolved = [artist for artist in resolved_artists if artist is not None] + deduplicated = list(UniqueList(all_resolved))[:TARGET_ITEM_COUNT] + genre_artists = UniqueList(deduplicated) + + if len(genre_artists) < TARGET_ITEM_COUNT: + self.logger.debug( + "Genre Artists (%s): only %d/%d items after resolution " + "(requested %d, resolved %d)", + tag_name, + len(genre_artists), + TARGET_ITEM_COUNT, + RESOLUTION_BUFFER_SMALL, + len([a for a in resolved_artists if a is not None]), + ) + + if genre_artists: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_genre_artists", + name=f"Discover {tag_name.title()} Artists", + provider=self.provider.instance_id, + items=UniqueList(genre_artists), + subtitle="Top artists in your most played genre", + icon="mdi-account-music", + ) + ) + + genre_albums_raw = await self.api.get_tag_top_albums( + tag_name, limit=RESOLUTION_BUFFER_LARGE + ) + if genre_albums_raw: + non_library_albums_raw = [ + album_data + for album_data in genre_albums_raw + if not await self._is_in_library(album_data, MediaType.ALBUM) + ] + + sampled_albums_raw = self._sample_items( + non_library_albums_raw, + seed_suffix="genre_albums", + target_count=RESOLUTION_BUFFER_SMALL, + ) + + resolved_albums = await asyncio.gather( + *[self._get_or_resolve_album(album_data) for album_data in sampled_albums_raw] + ) + all_resolved_albums = [album for album in resolved_albums if album is not None] + genre_albums = list(UniqueList(all_resolved_albums))[:TARGET_ITEM_COUNT] + + if len(genre_albums) < TARGET_ITEM_COUNT: + self.logger.debug( + "Genre Albums (%s): only %d/%d items after resolution " + "(requested %d, resolved %d)", + tag_name, + len(genre_albums), + TARGET_ITEM_COUNT, + RESOLUTION_BUFFER_SMALL, + len(all_resolved_albums), + ) + + if genre_albums: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_genre_albums", + name=f"Discover {tag_name.title()} Albums", + provider=self.provider.instance_id, + items=UniqueList(genre_albums), + subtitle="Top albums in your most played genre", + icon="mdi-album", + ) + ) + + genre_tracks_raw = await self.api.get_tag_top_tracks( + tag_name, limit=RESOLUTION_BUFFER_LARGE + ) + if genre_tracks_raw: + non_library_tracks_raw = [ + track_data + for track_data in genre_tracks_raw + if not await self._is_in_library(track_data, MediaType.TRACK) + ] + + sampled_tracks_raw = self._sample_items( + non_library_tracks_raw, + seed_suffix="genre_tracks", + target_count=RESOLUTION_BUFFER_SMALL, + ) + + resolved_tracks = await asyncio.gather( + *[self._get_or_resolve_track(track_data) for track_data in sampled_tracks_raw] + ) + all_resolved_genre_tracks = [track for track in resolved_tracks if track is not None] + genre_tracks = list(UniqueList(all_resolved_genre_tracks))[:TARGET_ITEM_COUNT] + + if len(genre_tracks) < TARGET_ITEM_COUNT: + self.logger.debug( + "Genre Tracks (%s): only %d/%d items after resolution " + "(requested %d, resolved %d)", + tag_name, + len(genre_tracks), + TARGET_ITEM_COUNT, + RESOLUTION_BUFFER_SMALL, + len(all_resolved_genre_tracks), + ) + + if genre_tracks: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_genre_tracks", + name=f"Discover {tag_name.title()} Tracks", + provider=self.provider.instance_id, + items=UniqueList(genre_tracks), + subtitle="Top tracks in your most played genre", + icon="mdi-music", + ) + ) + + return folders + + async def _get_geo_based_recommendations(self) -> list[RecommendationFolder]: + """Return geography-based recommendation folders for the configured country.""" + folders: list[RecommendationFolder] = [] + + if not self.provider.config.get_value("enable_geo"): + return folders + + country = self.provider.config.get_value("geo_country") + if not country or not isinstance(country, str): + return folders + + geo_artists_raw = await self.api.get_geo_top_artists(country, limit=RESOLUTION_BUFFER_SMALL) + if geo_artists_raw: + resolved_artists = await asyncio.gather( + *[self._get_or_resolve_artist(artist_data) for artist_data in geo_artists_raw] + ) + all_resolved = [artist for artist in resolved_artists if artist is not None] + geo_artists = list(UniqueList(all_resolved))[:TARGET_ITEM_COUNT] + + if len(geo_artists) < TARGET_ITEM_COUNT: + self.logger.debug( + "Geo Top Artists (%s): only %d/%d items after resolution " + "(requested %d, resolved %d)", + country, + len(geo_artists), + TARGET_ITEM_COUNT, + RESOLUTION_BUFFER_SMALL, + len([a for a in resolved_artists if a is not None]), + ) + + if geo_artists: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_geo_artists", + name=f"Top artists for {country}", + provider=self.provider.instance_id, + items=UniqueList(geo_artists), + subtitle=f"Most popular artists in {country}", + icon="mdi-earth", + ) + ) + + geo_tracks_raw = await self.api.get_geo_top_tracks(country, limit=RESOLUTION_BUFFER_SMALL) + if geo_tracks_raw: + resolved_tracks = await asyncio.gather( + *[self._get_or_resolve_track(track_data) for track_data in geo_tracks_raw] + ) + all_resolved_geo_tracks = [track for track in resolved_tracks if track is not None] + geo_tracks = list(UniqueList(all_resolved_geo_tracks))[:TARGET_ITEM_COUNT] + + if len(geo_tracks) < TARGET_ITEM_COUNT: + self.logger.debug( + "Geo Top Tracks (%s): only %d/%d items after resolution " + "(requested %d, resolved %d)", + country, + len(geo_tracks), + TARGET_ITEM_COUNT, + RESOLUTION_BUFFER_SMALL, + len(all_resolved_geo_tracks), + ) + + if geo_tracks: + folders.append( + RecommendationFolder( + item_id=f"{self.provider.instance_id}_geo_tracks", + name=f"Top tracks for {country}", + provider=self.provider.instance_id, + items=UniqueList(geo_tracks), + subtitle=f"Most popular tracks in {country}", + icon="mdi-earth", + ) + ) + + return folders + + async def _get_similar_artists_from_seeds(self, seed_artists: list[Artist]) -> list[Artist]: + """Return resolved artists similar to the given seed artists. + + :param seed_artists: Seed artists from the user's library. + """ + all_similar: list[dict[str, Any]] = [] + + # Seed identifiers are tracked so seeds don't appear in their own recommendations. + seed_mbids = { + seed_artist.get_external_id(ExternalID.MB_ARTIST) + for seed_artist in seed_artists + if seed_artist.get_external_id(ExternalID.MB_ARTIST) + } + seed_names = {seed_artist.name.lower() for seed_artist in seed_artists} + + for seed_artist in seed_artists: + mbid = seed_artist.get_external_id(ExternalID.MB_ARTIST) + + similar = await self.api.get_similar_artists( + artist_name=seed_artist.name, artist_mbid=mbid, limit=SIMILAR_ITEMS_PER_SEED + ) + all_similar.extend(similar) + + # Deduplicate by MBID and by name: Last.fm sometimes returns the same artist twice, + # once with an MBID and once without. + seen_mbids = set() + seen_names = set() + unique_similar: list[dict[str, Any]] = [] + for artist_data in all_similar: + mbid = artist_data.get("mbid") + name = artist_data.get("name", "").lower() + + if mbid and mbid in seed_mbids: + continue + if name and name in seed_names: + continue + + if mbid and mbid in seen_mbids: + continue + if name and name in seen_names: + continue + + unique_similar.append(artist_data) + if mbid: + seen_mbids.add(mbid) + if name: + seen_names.add(name) + + unique_similar.sort(key=lambda x: float(x.get("match", 0)), reverse=True) + + resolved_artists = await asyncio.gather( + *[ + self._get_or_resolve_artist(artist_data) + for artist_data in unique_similar[:SIMILAR_ITEMS_BUFFER] + ] + ) + result = [artist for artist in resolved_artists if artist is not None] + + if len(result) < len(resolved_artists): + self.logger.debug( + "Similar artists: %d/%d resolved successfully (%d failed)", + len(result), + len(resolved_artists), + len(resolved_artists) - len(result), + ) + + return result + + async def _get_similar_tracks_from_seeds(self, seed_tracks: list[Track]) -> list[Track]: + """Return resolved tracks similar to the given seed tracks. + + :param seed_tracks: Seed tracks from the user's library. + """ + all_similar: list[dict[str, Any]] = [] + + # Seed identifiers are tracked so seeds don't appear in their own recommendations. + seed_mbids = { + seed_track.get_external_id(ExternalID.MB_RECORDING) + for seed_track in seed_tracks + if seed_track.get_external_id(ExternalID.MB_RECORDING) + } + seed_name_keys = { + f"{seed_track.artists[0].name if seed_track.artists else ''}_{seed_track.name}".lower() + for seed_track in seed_tracks + } + + for seed_track in seed_tracks: + mbid = seed_track.get_external_id(ExternalID.MB_RECORDING) + + artist_name = seed_track.artists[0].name if seed_track.artists else "Unknown Artist" + + similar = await self.api.get_similar_tracks( + artist_name=artist_name, + track_name=seed_track.name, + track_mbid=mbid, + limit=SIMILAR_ITEMS_PER_SEED, + ) + all_similar.extend(similar) + + # Deduplicate by MBID and by artist+name: Last.fm sometimes returns the same track + # twice, once with an MBID and once without. + seen_mbids = set() + seen_names = set() + unique_similar: list[dict[str, Any]] = [] + for track_data in all_similar: + mbid = track_data.get("mbid") + + artist_info = track_data.get("artist", {}) + if isinstance(artist_info, str): + artist_name = artist_info + else: + artist_name = artist_info.get("name", "") + track_name = track_data.get("name", "") + name_key = f"{artist_name}_{track_name}".lower() if artist_name and track_name else "" + + if mbid and mbid in seed_mbids: + continue + if name_key and name_key in seed_name_keys: + continue + + if mbid and mbid in seen_mbids: + continue + if name_key and name_key in seen_names: + continue + + unique_similar.append(track_data) + if mbid: + seen_mbids.add(mbid) + if name_key: + seen_names.add(name_key) + + unique_similar.sort(key=lambda x: float(x.get("match", 0)), reverse=True) + + # Only resolve ISRCs for the top results to avoid unnecessary MusicBrainz lookups. + top_tracks_data = unique_similar[:TARGET_ITEM_COUNT] + + resolved_tracks = await asyncio.gather( + *[self._get_or_resolve_track(track_data) for track_data in top_tracks_data] + ) + return [track for track in resolved_tracks if track is not None] diff --git a/music_assistant/providers/musicbrainz/__init__.py b/music_assistant/providers/musicbrainz/__init__.py index f685697662..68aeb63949 100644 --- a/music_assistant/providers/musicbrainz/__init__.py +++ b/music_assistant/providers/musicbrainz/__init__.py @@ -323,7 +323,7 @@ async def get_artist_details(self, artist_id: str) -> MusicBrainzArtist: async def get_recording_details(self, recording_id: str) -> MusicBrainzRecording: """Get Recording details by providing a MusicBrainz Recording Id.""" - if result := await self.get_data(f"recording/{recording_id}?inc=artists+releases"): + if result := await self.get_data(f"recording/{recording_id}?inc=artists+releases+isrcs"): if "id" not in result: result["id"] = recording_id try: