Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions music_assistant/providers/musicbrainz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,112 @@ async def get_artist_details_by_resource_url(
return MusicBrainzArtist.from_raw(artist)
return None

async def get_release_group_by_track_name(
self, artist_name: str, track_name: str
) -> tuple[MusicBrainzArtist, list[MusicBrainzReleaseGroup]] | None:
"""Find release groups for a track by searching MusicBrainz recordings.

Returns matching release groups sorted by release date,
prioritizing the earliest original recording to find the correct releases.

:param artist_name: Artist name to search for.
:param track_name: Track name to search for.
:returns: Tuple of (artist, release_groups) or None.
"""
search_artist = re.sub(LUCENE_SPECIAL, r"\\\1", artist_name)
search_track = re.sub(LUCENE_SPECIAL, r"\\\1", track_name)
result = await self.get_data(
"recording",
query=f'"{search_track}" AND artist:"{search_artist}"',
limit="100",
)
if not result or "recordings" not in result:
return None

# Collect all matching recordings with their artist and first-release-date
matches: list[tuple[dict[str, Any], dict[str, Any], str]] = []
for strict in (True, False):
for item in result["recordings"]:
if not compare_strings(item["title"], track_name, strict):
continue
for artist_credit in item.get("artist-credit", []):
artist = artist_credit.get("artist", {})
artist_matches = compare_strings(artist.get("name", ""), artist_name, strict)
if not artist_matches:
for alias in artist.get("aliases", []):
if compare_strings(alias.get("name", ""), artist_name, strict):
artist_matches = True
break
if artist_matches:
first_release = item.get("first-release-date", "") or ""
matches.append((item, artist, first_release))
break
if matches:
break

if not matches:
return None

# Sort by first-release-date to find the earliest (likely original studio recording)
matches.sort(key=lambda x: x[2] if x[2] else "9999")

# Try recordings in order, looking for one with release groups
for recording, artist, first_release_date in matches:
release_groups = self._get_release_groups_by_date(recording, first_release_date)
if release_groups:
return (MusicBrainzArtist.from_raw(artist), release_groups)

# Fall back to the earliest recording (for artist lookup at least)
recording, artist, _ = matches[0]
return (MusicBrainzArtist.from_raw(artist), [])

def _get_release_groups_by_date(
self, recording: dict[str, Any], first_release_date: str
) -> list[MusicBrainzReleaseGroup]:
"""Collect release groups for a recording, sorted by release date.

Filters out compilations and other secondary-type releases.
Returns singles and studio albums sorted chronologically.

:param recording: MusicBrainz recording dict.
:param first_release_date: The recording's first-release-date (e.g. "1982-03-29").
"""
releases = recording.get("releases", [])
if not releases:
return []

# Collect release groups with their earliest release date, deduplicating by ID
seen: dict[str, tuple[MusicBrainzReleaseGroup, str]] = {}

for release in releases:
rg = release.get("release-group", {})
rg_id = rg.get("id")
if not rg_id:
continue

primary_type = rg.get("primary-type")
secondary_types = rg.get("secondary-types", [])

# Only include singles and studio albums (no compilations, live, etc.)
if primary_type not in ("Album", "Single"):
continue
if secondary_types:
continue

release_date = release.get("date", "") or ""

# Keep the earliest release date per release group
if rg_id in seen:
_, existing_date = seen[rg_id]
if release_date and (not existing_date or release_date < existing_date):
seen[rg_id] = (MusicBrainzReleaseGroup.from_raw(rg), release_date)
else:
seen[rg_id] = (MusicBrainzReleaseGroup.from_raw(rg), release_date)

# Sort by release date
sorted_groups = sorted(seen.values(), key=lambda x: x[1] if x[1] else "9999")
return [rg for rg, _ in sorted_groups]

@use_cache(86400 * 30) # Cache for 30 days
@throttle_with_retries
async def get_data(self, endpoint: str, **kwargs: str) -> Any:
Expand Down
Loading