From fd09ad22b3ff1823cddb757201b0c07c023c5b20 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 22 Jun 2026 09:12:29 +0000 Subject: [PATCH 1/5] feat: add Spotify auth state file helpers --- src/mopidy_spotify/__init__.py | 4 + src/mopidy_spotify/tokens.py | 222 +++++++++++++++++++++++++++++++++ src/mopidy_spotify/utils.py | 22 ++++ tests/test_extension.py | 6 + tests/test_tokens.py | 38 ++++++ tests/test_utils.py | 22 ++++ 6 files changed, 314 insertions(+) create mode 100644 src/mopidy_spotify/tokens.py create mode 100644 tests/test_tokens.py diff --git a/src/mopidy_spotify/__init__.py b/src/mopidy_spotify/__init__.py index cc0bfeec..31b9e4db 100644 --- a/src/mopidy_spotify/__init__.py +++ b/src/mopidy_spotify/__init__.py @@ -71,3 +71,7 @@ def get_credentials_dir(cls, config: config.Config) -> pathlib.Path: credentials_dir = data_dir / "credentials-cache" credentials_dir.mkdir(mode=0o700, exist_ok=True) return credentials_dir + + @classmethod + def get_auth_state_path(cls, config: config.Config) -> pathlib.Path: + return cls.get_data_dir(config) / "auth.json" diff --git a/src/mopidy_spotify/tokens.py b/src/mopidy_spotify/tokens.py new file mode 100644 index 00000000..88557840 --- /dev/null +++ b/src/mopidy_spotify/tokens.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import base64 +import hashlib +import secrets +import urllib.parse +from dataclasses import dataclass +from typing import TYPE_CHECKING, Annotated, Literal + +import requests +from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError + +from mopidy_spotify import utils + +if TYPE_CHECKING: + from pathlib import Path + +CLIENT_ID = "f88ee52f92724d51b7579a1d1cdb3128" +REDIRECT_URI = "https://mopidy.com/auth/spotify" +SCOPES = ( + "playlist-modify-public", + "playlist-modify-private", + "playlist-read-private", + "playlist-read-collaborative", + "user-follow-read", + "user-library-read", + "user-library-modify", + "user-read-recently-played", + "user-read-private", + "user-top-read", + "streaming", +) +AUTH_FILE_VERSION = 1 + + +class AuthPayloadBase(BaseModel): + model_config = ConfigDict(extra="forbid") + + version: Literal[1] = AUTH_FILE_VERSION + + +class PkceAuthorizedAuthPayload(AuthPayloadBase): + mode: Literal["pkce"] = "pkce" + state: Literal["authorized"] = "authorized" + refresh_token: str + + +class PkceRevokedAuthPayload(AuthPayloadBase): + mode: Literal["pkce"] = "pkce" + state: Literal["revoked"] = "revoked" + + +class BridgeConfiguredAuthPayload(AuthPayloadBase): + mode: Literal["bridge"] = "bridge" + state: Literal["configured"] = "configured" + + +class BridgeClearedAuthPayload(AuthPayloadBase): + mode: Literal["bridge"] = "bridge" + state: Literal["cleared"] = "cleared" + + +class BridgePermanentErrorAuthPayload(AuthPayloadBase): + mode: Literal["bridge"] = "bridge" + state: Literal["permanent_error"] = "permanent_error" + error_code: str + error_description: str | None = None + + +type AuthPayload = Annotated[ + PkceAuthorizedAuthPayload + | PkceRevokedAuthPayload + | BridgeConfiguredAuthPayload + | BridgeClearedAuthPayload + | BridgePermanentErrorAuthPayload, + Field(discriminator="state"), +] +AUTH_PAYLOAD_ADAPTER = TypeAdapter(AuthPayload) + + +class InvalidRefreshTokenError(ValueError): + pass + + +@dataclass(frozen=True) +class AuthorizationResult: + code: str + state: str + + +@dataclass(frozen=True) +class FileAuthStateStore: + path: Path + + def load(self) -> AuthPayload | None: + if not self.path.exists(): + return None + + try: + return AUTH_PAYLOAD_ADAPTER.validate_json( + self.path.read_text(encoding="utf-8") + ) + except (ValidationError, ValueError) as exc: + msg = f"Invalid Spotify auth.json: {self.path}" + raise InvalidRefreshTokenError(msg) from exc + + def save(self, payload: AuthPayload) -> None: + content = payload.model_dump_json().encode("utf-8") + with utils.replace(self.path, mode=0o600) as file_handle: + file_handle.write(content) + + +def generate_state() -> str: + return secrets.token_urlsafe(32) + + +def generate_pkce_verifier() -> tuple[str, str]: + verifier = secrets.token_urlsafe(96)[:128] + digest = hashlib.sha256(verifier.encode("ascii")).digest() + challenge = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + return verifier, challenge + + +def generate_authorization_url(challenge: str, state: str) -> str: + query = urllib.parse.urlencode( + { + "client_id": CLIENT_ID, + "response_type": "code", + "redirect_uri": REDIRECT_URI, + "code_challenge_method": "S256", + "code_challenge": challenge, + "state": state, + "scope": " ".join(SCOPES), + } + ) + return f"https://accounts.spotify.com/authorize?{query}" + + +def parse_authorization_result(result: str) -> AuthorizationResult: + result = result.strip() + for parser in ( + _parse_authorization_url, + _parse_base64_query_string, + ): + if parsed_result := parser(result): + error = parsed_result.get("error") + if error is not None: + error_description = parsed_result.get("error_description") + if error_description is None: + raise ValueError(error) + + msg = f"{error}: {error_description}" + raise ValueError(msg) + + code = parsed_result.get("code") + state = parsed_result.get("state") + if code is None or state is None: + msg = "missing code/state." + raise ValueError(msg) + + return AuthorizationResult(code=code, state=state) + + msg = "invalid authorization response." + raise ValueError(msg) + + +def _parse_authorization_url(result: str) -> dict[str, str] | None: + parsed = urllib.parse.urlsplit(result) + query = parsed.query or parsed.fragment + if not query: + return None + + return dict(urllib.parse.parse_qsl(query, keep_blank_values=True)) + + +def _parse_base64_query_string(result: str) -> dict[str, str] | None: + try: + padded_result = result + "=" * (-len(result) % 4) + decoded_result = base64.urlsafe_b64decode(padded_result).decode("utf-8") + except (ValueError, UnicodeDecodeError): + return None + + return dict(urllib.parse.parse_qsl(decoded_result, keep_blank_values=True)) + + +def exchange_code_request(code: str, verifier: str) -> requests.Request: + return requests.Request( + "POST", + "https://accounts.spotify.com/api/token", + data={ + "client_id": CLIENT_ID, + "grant_type": "authorization_code", + "code": code, + "redirect_uri": REDIRECT_URI, + "code_verifier": verifier, + }, + ) + + +def refresh_token_request(auth_state_path: Path) -> requests.Request: + payload = FileAuthStateStore(auth_state_path).load() + if payload is None: + msg = "missing refresh_token" + raise ValueError(msg) + if payload.mode != "pkce": + error = ( + "Spotify auth.json uses unsupported mode for refresh_token: " + f"{auth_state_path} ({payload.mode})" + ) + raise InvalidRefreshTokenError(error) + if payload.state == "revoked": + error = f"Spotify auth.json is revoked: {auth_state_path}" + raise InvalidRefreshTokenError(error) + return requests.Request( + "POST", + "https://accounts.spotify.com/api/token", + data={ + "client_id": CLIENT_ID, + "grant_type": "refresh_token", + "refresh_token": payload.refresh_token, + }, + ) diff --git a/src/mopidy_spotify/utils.py b/src/mopidy_spotify/utils.py index ab5dac8f..2e51f8df 100644 --- a/src/mopidy_spotify/utils.py +++ b/src/mopidy_spotify/utils.py @@ -4,7 +4,10 @@ import itertools import logging import operator +import os +import tempfile import time +from pathlib import Path as PathlibPath from typing import TYPE_CHECKING import requests @@ -14,6 +17,7 @@ if TYPE_CHECKING: from collections.abc import Generator, Iterable + from pathlib import Path from mopidy.config import ProxyConfig @@ -45,6 +49,24 @@ def time_logger(name: str, level: int = TRACE) -> Generator[None]: logger.log(level, f"{name} took {int(end * 1000)}ms") +@contextlib.contextmanager +def replace( + path: Path, + mode: int | None = None, +) -> Generator[tempfile._TemporaryFileWrapper[bytes]]: + path.parent.mkdir(mode=0o700, parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile(dir=path.parent, delete=False) as file_handle: + if mode is not None: + os.fchmod(file_handle.fileno(), mode) + temp_path = PathlibPath(file_handle.name) + try: + yield file_handle + temp_path.replace(path) + finally: + if temp_path.exists(): + temp_path.unlink() + + def flatten[T](list_of_lists: Iterable[Iterable[T]]) -> list[T]: return [item for sublist in list_of_lists for item in sublist] diff --git a/tests/test_extension.py b/tests/test_extension.py index 0b5f2d95..42ad582a 100644 --- a/tests/test_extension.py +++ b/tests/test_extension.py @@ -54,3 +54,9 @@ def test_get_credentials_dir(tmp_path: Path) -> None: result2 = ext.get_credentials_dir(config) # check exists_ok assert result == result2 + + +def test_get_auth_state_path(tmp_path: Path) -> None: + config = {"core": {"data_dir": tmp_path}} + + assert Extension.get_auth_state_path(config) == (tmp_path / "spotify" / "auth.json") diff --git a/tests/test_tokens.py b/tests/test_tokens.py new file mode 100644 index 00000000..4012c518 --- /dev/null +++ b/tests/test_tokens.py @@ -0,0 +1,38 @@ +import base64 + +import pytest + +from mopidy_spotify import tokens + + +def test_parse_authorization_result_parses_redirect_url() -> None: + result = tokens.parse_authorization_result( + "https://example.com/callback?code=code-123&state=state-123" + ) + + assert result == tokens.AuthorizationResult( + code="code-123", + state="state-123", + ) + + +def test_parse_authorization_result_parses_base64_query_string() -> None: + payload = ( + base64.urlsafe_b64encode(b"code=code-123&state=state-123") + .decode("ascii") + .rstrip("=") + ) + + result = tokens.parse_authorization_result(payload) + + assert result == tokens.AuthorizationResult( + code="code-123", + state="state-123", + ) + + +def test_parse_authorization_result_raises_on_provider_error() -> None: + with pytest.raises(ValueError, match="access_denied"): + tokens.parse_authorization_result( + "https://example.com/callback?error=access_denied&error_description=Denied" + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 10c96cd9..8c369183 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,4 +1,5 @@ import re +from pathlib import Path from unittest import mock import pytest @@ -51,3 +52,24 @@ def test_group_by_type_sorts(): [mocks[4]], [mocks[0], mocks[3]], ] + + +def test_replace(tmp_path: Path): + target = tmp_path / "refresh-token.txt" + target.write_text("old-token") + + with utils.replace(target, mode=0o600) as file_handle: + file_handle.write(b"new-token") + + assert target.read_text() == "new-token" + assert target.stat().st_mode & 0o777 == 0o600 + + +def test_replace_without_mode_keeps_tempfile_mode(tmp_path: Path): + target = tmp_path / "refresh-token.txt" + + with utils.replace(target) as file_handle: + file_handle.write(b"new-token") + + assert target.read_text() == "new-token" + assert target.stat().st_mode & 0o777 == 0o600 From 6704f417d606961411248e83db2f3f850edbe451 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Tue, 23 Jun 2026 21:36:59 +0000 Subject: [PATCH 2/5] feat: refine Spotify auth state machine --- src/mopidy_spotify/tokens.py | 27 +++++++++------------------ tests/test_tokens.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/src/mopidy_spotify/tokens.py b/src/mopidy_spotify/tokens.py index 88557840..b6b34199 100644 --- a/src/mopidy_spotify/tokens.py +++ b/src/mopidy_spotify/tokens.py @@ -45,23 +45,18 @@ class PkceAuthorizedAuthPayload(AuthPayloadBase): refresh_token: str -class PkceRevokedAuthPayload(AuthPayloadBase): - mode: Literal["pkce"] = "pkce" - state: Literal["revoked"] = "revoked" - - class BridgeConfiguredAuthPayload(AuthPayloadBase): mode: Literal["bridge"] = "bridge" state: Literal["configured"] = "configured" -class BridgeClearedAuthPayload(AuthPayloadBase): - mode: Literal["bridge"] = "bridge" +class ClearedAuthPayload(AuthPayloadBase): + mode: Literal["pkce", "bridge"] state: Literal["cleared"] = "cleared" -class BridgePermanentErrorAuthPayload(AuthPayloadBase): - mode: Literal["bridge"] = "bridge" +class PermanentErrorAuthPayload(AuthPayloadBase): + mode: Literal["pkce", "bridge"] state: Literal["permanent_error"] = "permanent_error" error_code: str error_description: str | None = None @@ -69,10 +64,9 @@ class BridgePermanentErrorAuthPayload(AuthPayloadBase): type AuthPayload = Annotated[ PkceAuthorizedAuthPayload - | PkceRevokedAuthPayload | BridgeConfiguredAuthPayload - | BridgeClearedAuthPayload - | BridgePermanentErrorAuthPayload, + | ClearedAuthPayload + | PermanentErrorAuthPayload, Field(discriminator="state"), ] AUTH_PAYLOAD_ADAPTER = TypeAdapter(AuthPayload) @@ -202,15 +196,12 @@ def refresh_token_request(auth_state_path: Path) -> requests.Request: if payload is None: msg = "missing refresh_token" raise ValueError(msg) - if payload.mode != "pkce": + if payload.state != "authorized" or payload.mode != "pkce": error = ( - "Spotify auth.json uses unsupported mode for refresh_token: " - f"{auth_state_path} ({payload.mode})" + "Spotify auth.json uses unsupported state for refresh_token: " + f"{auth_state_path} ({payload.mode}/{payload.state})" ) raise InvalidRefreshTokenError(error) - if payload.state == "revoked": - error = f"Spotify auth.json is revoked: {auth_state_path}" - raise InvalidRefreshTokenError(error) return requests.Request( "POST", "https://accounts.spotify.com/api/token", diff --git a/tests/test_tokens.py b/tests/test_tokens.py index 4012c518..4838abfd 100644 --- a/tests/test_tokens.py +++ b/tests/test_tokens.py @@ -1,4 +1,5 @@ import base64 +from pathlib import Path import pytest @@ -36,3 +37,33 @@ def test_parse_authorization_result_raises_on_provider_error() -> None: tokens.parse_authorization_result( "https://example.com/callback?error=access_denied&error_description=Denied" ) + + +def test_file_auth_state_store_round_trips_pkce_authorized(tmp_path: Path): + store = tokens.FileAuthStateStore(tmp_path / "auth.json") + refresh_token = "refresh-token-1" # noqa: S105 + + store.save(tokens.PkceAuthorizedAuthPayload(refresh_token=refresh_token)) + + assert store.load() == tokens.PkceAuthorizedAuthPayload( + refresh_token=refresh_token + ) + + +def test_file_auth_state_store_round_trips_cleared_bridge(tmp_path: Path): + store = tokens.FileAuthStateStore(tmp_path / "auth.json") + + store.save(tokens.ClearedAuthPayload(mode="bridge")) + + assert store.load() == tokens.ClearedAuthPayload(mode="bridge") + + +def test_refresh_token_request_requires_pkce_authorized(tmp_path: Path): + auth_state_path = tmp_path / "auth.json" + auth_state_path.write_text( + '{"version":1,"mode":"bridge","state":"configured"}', + encoding="utf-8", + ) + + with pytest.raises(tokens.InvalidRefreshTokenError): + tokens.refresh_token_request(auth_state_path) From f5be38d6845c4c861f938ceec15df2b495cb52ce Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Tue, 23 Jun 2026 22:17:57 +0000 Subject: [PATCH 3/5] refactor: split auth state and pkce modules --- src/mopidy_spotify/auth_state.py | 102 ++++++++++++++++++++++ src/mopidy_spotify/{tokens.py => pkce.py} | 95 -------------------- tests/test_auth_state.py | 36 ++++++++ tests/test_pkce.py | 38 ++++++++ tests/test_tokens.py | 69 --------------- 5 files changed, 176 insertions(+), 164 deletions(-) create mode 100644 src/mopidy_spotify/auth_state.py rename src/mopidy_spotify/{tokens.py => pkce.py} (55%) create mode 100644 tests/test_auth_state.py create mode 100644 tests/test_pkce.py delete mode 100644 tests/test_tokens.py diff --git a/src/mopidy_spotify/auth_state.py b/src/mopidy_spotify/auth_state.py new file mode 100644 index 00000000..e684a94b --- /dev/null +++ b/src/mopidy_spotify/auth_state.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Annotated, Literal + +import requests +from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError + +from mopidy_spotify import utils +from mopidy_spotify.pkce import CLIENT_ID + +if TYPE_CHECKING: + from pathlib import Path + +AUTH_FILE_VERSION = 1 + + +class AuthPayloadBase(BaseModel): + model_config = ConfigDict(extra="forbid") + + version: Literal[1] = AUTH_FILE_VERSION + + +class PkceAuthorizedAuthPayload(AuthPayloadBase): + mode: Literal["pkce"] = "pkce" + state: Literal["authorized"] = "authorized" + refresh_token: str + + +class BridgeConfiguredAuthPayload(AuthPayloadBase): + mode: Literal["bridge"] = "bridge" + state: Literal["configured"] = "configured" + + +class ClearedAuthPayload(AuthPayloadBase): + mode: Literal["pkce", "bridge"] + state: Literal["cleared"] = "cleared" + + +class PermanentErrorAuthPayload(AuthPayloadBase): + mode: Literal["pkce", "bridge"] + state: Literal["permanent_error"] = "permanent_error" + error_code: str + error_description: str | None = None + + +type AuthPayload = Annotated[ + PkceAuthorizedAuthPayload + | BridgeConfiguredAuthPayload + | ClearedAuthPayload + | PermanentErrorAuthPayload, + Field(discriminator="state"), +] +AUTH_PAYLOAD_ADAPTER = TypeAdapter(AuthPayload) + + +class InvalidRefreshTokenError(ValueError): + pass + + +@dataclass(frozen=True) +class FileAuthStateStore: + path: Path + + def load(self) -> AuthPayload | None: + if not self.path.exists(): + return None + + try: + return AUTH_PAYLOAD_ADAPTER.validate_json( + self.path.read_text(encoding="utf-8") + ) + except (ValidationError, ValueError) as exc: + msg = f"Invalid Spotify auth.json: {self.path}" + raise InvalidRefreshTokenError(msg) from exc + + def save(self, payload: AuthPayload) -> None: + content = payload.model_dump_json().encode("utf-8") + with utils.replace(self.path, mode=0o600) as file_handle: + file_handle.write(content) + + +def refresh_token_request(auth_state_path: Path) -> requests.Request: + payload = FileAuthStateStore(auth_state_path).load() + if payload is None: + msg = "missing refresh_token" + raise ValueError(msg) + if payload.state != "authorized" or payload.mode != "pkce": + error = ( + "Spotify auth.json uses unsupported state for refresh_token: " + f"{auth_state_path} ({payload.mode}/{payload.state})" + ) + raise InvalidRefreshTokenError(error) + return requests.Request( + "POST", + "https://accounts.spotify.com/api/token", + data={ + "client_id": CLIENT_ID, + "grant_type": "refresh_token", + "refresh_token": payload.refresh_token, + }, + ) diff --git a/src/mopidy_spotify/tokens.py b/src/mopidy_spotify/pkce.py similarity index 55% rename from src/mopidy_spotify/tokens.py rename to src/mopidy_spotify/pkce.py index b6b34199..772ccc49 100644 --- a/src/mopidy_spotify/tokens.py +++ b/src/mopidy_spotify/pkce.py @@ -5,15 +5,8 @@ import secrets import urllib.parse from dataclasses import dataclass -from typing import TYPE_CHECKING, Annotated, Literal import requests -from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError - -from mopidy_spotify import utils - -if TYPE_CHECKING: - from pathlib import Path CLIENT_ID = "f88ee52f92724d51b7579a1d1cdb3128" REDIRECT_URI = "https://mopidy.com/auth/spotify" @@ -30,50 +23,6 @@ "user-top-read", "streaming", ) -AUTH_FILE_VERSION = 1 - - -class AuthPayloadBase(BaseModel): - model_config = ConfigDict(extra="forbid") - - version: Literal[1] = AUTH_FILE_VERSION - - -class PkceAuthorizedAuthPayload(AuthPayloadBase): - mode: Literal["pkce"] = "pkce" - state: Literal["authorized"] = "authorized" - refresh_token: str - - -class BridgeConfiguredAuthPayload(AuthPayloadBase): - mode: Literal["bridge"] = "bridge" - state: Literal["configured"] = "configured" - - -class ClearedAuthPayload(AuthPayloadBase): - mode: Literal["pkce", "bridge"] - state: Literal["cleared"] = "cleared" - - -class PermanentErrorAuthPayload(AuthPayloadBase): - mode: Literal["pkce", "bridge"] - state: Literal["permanent_error"] = "permanent_error" - error_code: str - error_description: str | None = None - - -type AuthPayload = Annotated[ - PkceAuthorizedAuthPayload - | BridgeConfiguredAuthPayload - | ClearedAuthPayload - | PermanentErrorAuthPayload, - Field(discriminator="state"), -] -AUTH_PAYLOAD_ADAPTER = TypeAdapter(AuthPayload) - - -class InvalidRefreshTokenError(ValueError): - pass @dataclass(frozen=True) @@ -82,28 +31,6 @@ class AuthorizationResult: state: str -@dataclass(frozen=True) -class FileAuthStateStore: - path: Path - - def load(self) -> AuthPayload | None: - if not self.path.exists(): - return None - - try: - return AUTH_PAYLOAD_ADAPTER.validate_json( - self.path.read_text(encoding="utf-8") - ) - except (ValidationError, ValueError) as exc: - msg = f"Invalid Spotify auth.json: {self.path}" - raise InvalidRefreshTokenError(msg) from exc - - def save(self, payload: AuthPayload) -> None: - content = payload.model_dump_json().encode("utf-8") - with utils.replace(self.path, mode=0o600) as file_handle: - file_handle.write(content) - - def generate_state() -> str: return secrets.token_urlsafe(32) @@ -189,25 +116,3 @@ def exchange_code_request(code: str, verifier: str) -> requests.Request: "code_verifier": verifier, }, ) - - -def refresh_token_request(auth_state_path: Path) -> requests.Request: - payload = FileAuthStateStore(auth_state_path).load() - if payload is None: - msg = "missing refresh_token" - raise ValueError(msg) - if payload.state != "authorized" or payload.mode != "pkce": - error = ( - "Spotify auth.json uses unsupported state for refresh_token: " - f"{auth_state_path} ({payload.mode}/{payload.state})" - ) - raise InvalidRefreshTokenError(error) - return requests.Request( - "POST", - "https://accounts.spotify.com/api/token", - data={ - "client_id": CLIENT_ID, - "grant_type": "refresh_token", - "refresh_token": payload.refresh_token, - }, - ) diff --git a/tests/test_auth_state.py b/tests/test_auth_state.py new file mode 100644 index 00000000..c7f99d59 --- /dev/null +++ b/tests/test_auth_state.py @@ -0,0 +1,36 @@ +from pathlib import Path + +import pytest + +from mopidy_spotify import auth_state + + +def test_file_auth_state_store_round_trips_pkce_authorized(tmp_path: Path): + store = auth_state.FileAuthStateStore(tmp_path / "auth.json") + token_id = 1 + refresh_token = f"refresh-token-{token_id}" + + store.save(auth_state.PkceAuthorizedAuthPayload(refresh_token=refresh_token)) + + assert store.load() == auth_state.PkceAuthorizedAuthPayload( + refresh_token=refresh_token + ) + + +def test_file_auth_state_store_round_trips_cleared_bridge(tmp_path: Path): + store = auth_state.FileAuthStateStore(tmp_path / "auth.json") + + store.save(auth_state.ClearedAuthPayload(mode="bridge")) + + assert store.load() == auth_state.ClearedAuthPayload(mode="bridge") + + +def test_refresh_token_request_requires_pkce_authorized(tmp_path: Path): + auth_state_path = tmp_path / "auth.json" + auth_state_path.write_text( + '{"version":1,"mode":"bridge","state":"configured"}', + encoding="utf-8", + ) + + with pytest.raises(auth_state.InvalidRefreshTokenError): + auth_state.refresh_token_request(auth_state_path) diff --git a/tests/test_pkce.py b/tests/test_pkce.py new file mode 100644 index 00000000..684afb8d --- /dev/null +++ b/tests/test_pkce.py @@ -0,0 +1,38 @@ +import base64 + +import pytest + +from mopidy_spotify import pkce + + +def test_parse_authorization_result_parses_redirect_url() -> None: + result = pkce.parse_authorization_result( + "https://example.com/callback?code=code-123&state=state-123" + ) + + assert result == pkce.AuthorizationResult( + code="code-123", + state="state-123", + ) + + +def test_parse_authorization_result_parses_base64_query_string() -> None: + payload = ( + base64.urlsafe_b64encode(b"code=code-123&state=state-123") + .decode("ascii") + .rstrip("=") + ) + + result = pkce.parse_authorization_result(payload) + + assert result == pkce.AuthorizationResult( + code="code-123", + state="state-123", + ) + + +def test_parse_authorization_result_raises_on_provider_error() -> None: + with pytest.raises(ValueError, match="access_denied"): + pkce.parse_authorization_result( + "https://example.com/callback?error=access_denied&error_description=Denied" + ) diff --git a/tests/test_tokens.py b/tests/test_tokens.py deleted file mode 100644 index 4838abfd..00000000 --- a/tests/test_tokens.py +++ /dev/null @@ -1,69 +0,0 @@ -import base64 -from pathlib import Path - -import pytest - -from mopidy_spotify import tokens - - -def test_parse_authorization_result_parses_redirect_url() -> None: - result = tokens.parse_authorization_result( - "https://example.com/callback?code=code-123&state=state-123" - ) - - assert result == tokens.AuthorizationResult( - code="code-123", - state="state-123", - ) - - -def test_parse_authorization_result_parses_base64_query_string() -> None: - payload = ( - base64.urlsafe_b64encode(b"code=code-123&state=state-123") - .decode("ascii") - .rstrip("=") - ) - - result = tokens.parse_authorization_result(payload) - - assert result == tokens.AuthorizationResult( - code="code-123", - state="state-123", - ) - - -def test_parse_authorization_result_raises_on_provider_error() -> None: - with pytest.raises(ValueError, match="access_denied"): - tokens.parse_authorization_result( - "https://example.com/callback?error=access_denied&error_description=Denied" - ) - - -def test_file_auth_state_store_round_trips_pkce_authorized(tmp_path: Path): - store = tokens.FileAuthStateStore(tmp_path / "auth.json") - refresh_token = "refresh-token-1" # noqa: S105 - - store.save(tokens.PkceAuthorizedAuthPayload(refresh_token=refresh_token)) - - assert store.load() == tokens.PkceAuthorizedAuthPayload( - refresh_token=refresh_token - ) - - -def test_file_auth_state_store_round_trips_cleared_bridge(tmp_path: Path): - store = tokens.FileAuthStateStore(tmp_path / "auth.json") - - store.save(tokens.ClearedAuthPayload(mode="bridge")) - - assert store.load() == tokens.ClearedAuthPayload(mode="bridge") - - -def test_refresh_token_request_requires_pkce_authorized(tmp_path: Path): - auth_state_path = tmp_path / "auth.json" - auth_state_path.write_text( - '{"version":1,"mode":"bridge","state":"configured"}', - encoding="utf-8", - ) - - with pytest.raises(tokens.InvalidRefreshTokenError): - tokens.refresh_token_request(auth_state_path) From d2eb58526eccc7cbbce7803fe007aaec8d178a14 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 24 Jun 2026 11:11:21 +0000 Subject: [PATCH 4/5] test: cover auth state and pkce input errors --- tests/test_auth_state.py | 8 +++++ tests/test_pkce.py | 64 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/tests/test_auth_state.py b/tests/test_auth_state.py index c7f99d59..f2d66ee4 100644 --- a/tests/test_auth_state.py +++ b/tests/test_auth_state.py @@ -25,6 +25,14 @@ def test_file_auth_state_store_round_trips_cleared_bridge(tmp_path: Path): assert store.load() == auth_state.ClearedAuthPayload(mode="bridge") +def test_file_auth_state_store_rejects_invalid_json(tmp_path: Path): + auth_state_path = tmp_path / "auth.json" + auth_state_path.write_text("not-json", encoding="utf-8") + + with pytest.raises(auth_state.InvalidRefreshTokenError): + auth_state.FileAuthStateStore(auth_state_path).load() + + def test_refresh_token_request_requires_pkce_authorized(tmp_path: Path): auth_state_path = tmp_path / "auth.json" auth_state_path.write_text( diff --git a/tests/test_pkce.py b/tests/test_pkce.py index 684afb8d..da1585a2 100644 --- a/tests/test_pkce.py +++ b/tests/test_pkce.py @@ -1,4 +1,5 @@ import base64 +from dataclasses import dataclass import pytest @@ -36,3 +37,66 @@ def test_parse_authorization_result_raises_on_provider_error() -> None: pkce.parse_authorization_result( "https://example.com/callback?error=access_denied&error_description=Denied" ) + + +@dataclass(frozen=True, kw_only=True) +class ParseAuthorizationResultScenario: + name: str + input_value: str + expected: pkce.AuthorizationResult | None = None + error: str | None = None + + +@pytest.mark.parametrize( + ("scenario"), + [ + ParseAuthorizationResultScenario( + name="empty input", + input_value="", + error="invalid authorization response.", + ), + ParseAuthorizationResultScenario( + name="bad base64 padding", + input_value="a", + error="invalid authorization response.", + ), + ParseAuthorizationResultScenario( + name="missing query params", + input_value="https://example.com/callback", + error="invalid authorization response.", + ), + ParseAuthorizationResultScenario( + name="whitespace trimmed", + input_value=( + " https://example.com/callback?code=code-123&state=state-123 " + ), + expected=pkce.AuthorizationResult(code="code-123", state="state-123"), + ), + ParseAuthorizationResultScenario( + name="error without description", + input_value="https://example.com/callback?error=access_denied", + error="access_denied", + ), + ParseAuthorizationResultScenario( + name="missing code", + input_value="https://example.com/callback?state=state-123", + error="missing code/state.", + ), + ParseAuthorizationResultScenario( + name="missing state", + input_value="https://example.com/callback?code=code-123", + error="missing code/state.", + ), + ], + ids=lambda scenario: scenario.name, +) +def test_parse_authorization_result_edge_cases( + scenario: ParseAuthorizationResultScenario, +) -> None: + if scenario.error is not None: + with pytest.raises(ValueError, match=scenario.error): + pkce.parse_authorization_result(scenario.input_value) + else: + assert ( + pkce.parse_authorization_result(scenario.input_value) == scenario.expected + ) From 9c778baa8224ed11f9f612439dde6baaa5726c39 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Sat, 27 Jun 2026 11:37:05 +0000 Subject: [PATCH 5/5] refactor: harden atomic file replacement --- src/mopidy_spotify/auth_state.py | 1 + src/mopidy_spotify/utils.py | 36 ++++++++++++++++++++++++-------- tests/test_utils.py | 22 +++++++++++++++++++ 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/mopidy_spotify/auth_state.py b/src/mopidy_spotify/auth_state.py index e684a94b..37fbe326 100644 --- a/src/mopidy_spotify/auth_state.py +++ b/src/mopidy_spotify/auth_state.py @@ -76,6 +76,7 @@ def load(self) -> AuthPayload | None: def save(self, payload: AuthPayload) -> None: content = payload.model_dump_json().encode("utf-8") + self.path.parent.mkdir(mode=0o700, parents=True, exist_ok=True) with utils.replace(self.path, mode=0o600) as file_handle: file_handle.write(content) diff --git a/src/mopidy_spotify/utils.py b/src/mopidy_spotify/utils.py index 2e51f8df..5e0d7437 100644 --- a/src/mopidy_spotify/utils.py +++ b/src/mopidy_spotify/utils.py @@ -54,16 +54,34 @@ def replace( path: Path, mode: int | None = None, ) -> Generator[tempfile._TemporaryFileWrapper[bytes]]: - path.parent.mkdir(mode=0o700, parents=True, exist_ok=True) - with tempfile.NamedTemporaryFile(dir=path.parent, delete=False) as file_handle: - if mode is not None: - os.fchmod(file_handle.fileno(), mode) - temp_path = PathlibPath(file_handle.name) - try: + temp_path: PathlibPath | None = None + try: + with tempfile.NamedTemporaryFile( + mode="wb", + dir=path.parent, + prefix=f".{path.name}.", + delete=False, + ) as file_handle: + temp_path = PathlibPath(file_handle.name) + if mode is not None: + os.fchmod(file_handle.fileno(), mode) + yield file_handle - temp_path.replace(path) - finally: - if temp_path.exists(): + + file_handle.flush() + os.fsync(file_handle.fileno()) + + temp_path.replace(path) + + if os.name == "posix": + dir_fd = os.open(path.parent, os.O_RDONLY) + try: + os.fsync(dir_fd) + finally: + os.close(dir_fd) + finally: + if temp_path is not None: + with contextlib.suppress(FileNotFoundError): temp_path.unlink() diff --git a/tests/test_utils.py b/tests/test_utils.py index 8c369183..6e09b63d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -73,3 +73,25 @@ def test_replace_without_mode_keeps_tempfile_mode(tmp_path: Path): assert target.read_text() == "new-token" assert target.stat().st_mode & 0o777 == 0o600 + + +def test_replace_requires_existing_parent_directory(tmp_path: Path): + target = tmp_path / "missing" / "refresh-token.txt" + + with pytest.raises(FileNotFoundError), utils.replace(target) as file_handle: + file_handle.write(b"new-token") + + +def test_replace_cleans_up_tempfile_after_write_error(tmp_path: Path): + target = tmp_path / "refresh-token.txt" + message = "boom" + + def write_then_fail() -> None: + with utils.replace(target) as file_handle: + file_handle.write(b"new-token") + raise RuntimeError(message) + + with pytest.raises(RuntimeError, match=message): + write_then_fail() + + assert list(tmp_path.iterdir()) == []