Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/mopidy_spotify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ def get_credentials_dir(cls, config: config.Config) -> pathlib.Path:
credentials_dir = data_dir / "credentials-cache"
credentials_dir.mkdir(mode=0o700, exist_ok=True)
return credentials_dir

@classmethod
def get_auth_state_path(cls, config: config.Config) -> pathlib.Path:
return cls.get_data_dir(config) / "auth.json"
103 changes: 103 additions & 0 deletions src/mopidy_spotify/auth_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Annotated, Literal

import requests
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError

from mopidy_spotify import utils
from mopidy_spotify.pkce import CLIENT_ID

if TYPE_CHECKING:
from pathlib import Path

AUTH_FILE_VERSION = 1


class AuthPayloadBase(BaseModel):
model_config = ConfigDict(extra="forbid")

version: Literal[1] = AUTH_FILE_VERSION


class PkceAuthorizedAuthPayload(AuthPayloadBase):
mode: Literal["pkce"] = "pkce"
state: Literal["authorized"] = "authorized"
refresh_token: str


class BridgeConfiguredAuthPayload(AuthPayloadBase):
mode: Literal["bridge"] = "bridge"
state: Literal["configured"] = "configured"


class ClearedAuthPayload(AuthPayloadBase):
mode: Literal["pkce", "bridge"]
state: Literal["cleared"] = "cleared"


class PermanentErrorAuthPayload(AuthPayloadBase):
mode: Literal["pkce", "bridge"]
state: Literal["permanent_error"] = "permanent_error"
error_code: str
error_description: str | None = None


type AuthPayload = Annotated[
PkceAuthorizedAuthPayload
| BridgeConfiguredAuthPayload
| ClearedAuthPayload
| PermanentErrorAuthPayload,
Field(discriminator="state"),
]
AUTH_PAYLOAD_ADAPTER = TypeAdapter(AuthPayload)


class InvalidRefreshTokenError(ValueError):
pass


@dataclass(frozen=True)
class FileAuthStateStore:
path: Path

def load(self) -> AuthPayload | None:
if not self.path.exists():
return None

try:
return AUTH_PAYLOAD_ADAPTER.validate_json(
self.path.read_text(encoding="utf-8")
)
except (ValidationError, ValueError) as exc:
msg = f"Invalid Spotify auth.json: {self.path}"
raise InvalidRefreshTokenError(msg) from exc

def save(self, payload: AuthPayload) -> None:
content = payload.model_dump_json().encode("utf-8")
self.path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
with utils.replace(self.path, mode=0o600) as file_handle:
file_handle.write(content)


def refresh_token_request(auth_state_path: Path) -> requests.Request:
payload = FileAuthStateStore(auth_state_path).load()
if payload is None:
msg = "missing refresh_token"
raise ValueError(msg)
if payload.state != "authorized" or payload.mode != "pkce":
error = (
"Spotify auth.json uses unsupported state for refresh_token: "
f"{auth_state_path} ({payload.mode}/{payload.state})"
)
raise InvalidRefreshTokenError(error)
return requests.Request(
"POST",
"https://accounts.spotify.com/api/token",
data={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": payload.refresh_token,
},
)
118 changes: 118 additions & 0 deletions src/mopidy_spotify/pkce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from __future__ import annotations

import base64
import hashlib
import secrets
import urllib.parse
from dataclasses import dataclass

import requests

CLIENT_ID = "f88ee52f92724d51b7579a1d1cdb3128"
REDIRECT_URI = "https://mopidy.com/auth/spotify"
SCOPES = (
"playlist-modify-public",
"playlist-modify-private",
"playlist-read-private",
"playlist-read-collaborative",
"user-follow-read",
"user-library-read",
"user-library-modify",
"user-read-recently-played",
"user-read-private",
"user-top-read",
"streaming",
)


@dataclass(frozen=True)
class AuthorizationResult:
code: str
state: str


def generate_state() -> str:
return secrets.token_urlsafe(32)


def generate_pkce_verifier() -> tuple[str, str]:
verifier = secrets.token_urlsafe(96)[:128]
digest = hashlib.sha256(verifier.encode("ascii")).digest()
challenge = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
return verifier, challenge


def generate_authorization_url(challenge: str, state: str) -> str:
query = urllib.parse.urlencode(
{
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": REDIRECT_URI,
"code_challenge_method": "S256",
"code_challenge": challenge,
"state": state,
"scope": " ".join(SCOPES),
}
)
return f"https://accounts.spotify.com/authorize?{query}"


def parse_authorization_result(result: str) -> AuthorizationResult:
result = result.strip()
for parser in (
_parse_authorization_url,
_parse_base64_query_string,
):
if parsed_result := parser(result):
error = parsed_result.get("error")
if error is not None:
error_description = parsed_result.get("error_description")
if error_description is None:
raise ValueError(error)

msg = f"{error}: {error_description}"
raise ValueError(msg)

code = parsed_result.get("code")
state = parsed_result.get("state")
if code is None or state is None:
msg = "missing code/state."
raise ValueError(msg)

return AuthorizationResult(code=code, state=state)

msg = "invalid authorization response."
raise ValueError(msg)


def _parse_authorization_url(result: str) -> dict[str, str] | None:
parsed = urllib.parse.urlsplit(result)
query = parsed.query or parsed.fragment
if not query:
return None

return dict(urllib.parse.parse_qsl(query, keep_blank_values=True))


def _parse_base64_query_string(result: str) -> dict[str, str] | None:
try:
padded_result = result + "=" * (-len(result) % 4)
decoded_result = base64.urlsafe_b64decode(padded_result).decode("utf-8")
except (ValueError, UnicodeDecodeError):
return None

return dict(urllib.parse.parse_qsl(decoded_result, keep_blank_values=True))


def exchange_code_request(code: str, verifier: str) -> requests.Request:
return requests.Request(
"POST",
"https://accounts.spotify.com/api/token",
data={
"client_id": CLIENT_ID,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": verifier,
},
)
40 changes: 40 additions & 0 deletions src/mopidy_spotify/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import itertools
import logging
import operator
import os
import tempfile
import time
from pathlib import Path as PathlibPath
from typing import TYPE_CHECKING

import requests
Expand All @@ -14,6 +17,7 @@

if TYPE_CHECKING:
from collections.abc import Generator, Iterable
from pathlib import Path

from mopidy.config import ProxyConfig

Expand Down Expand Up @@ -45,6 +49,42 @@ def time_logger(name: str, level: int = TRACE) -> Generator[None]:
logger.log(level, f"{name} took {int(end * 1000)}ms")


@contextlib.contextmanager
def replace(
path: Path,
mode: int | None = None,
) -> Generator[tempfile._TemporaryFileWrapper[bytes]]:
temp_path: PathlibPath | None = None
try:
with tempfile.NamedTemporaryFile(
mode="wb",
dir=path.parent,
prefix=f".{path.name}.",
delete=False,
) as file_handle:
temp_path = PathlibPath(file_handle.name)
if mode is not None:
os.fchmod(file_handle.fileno(), mode)

yield file_handle

file_handle.flush()
os.fsync(file_handle.fileno())

temp_path.replace(path)

if os.name == "posix":
dir_fd = os.open(path.parent, os.O_RDONLY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
finally:
if temp_path is not None:
with contextlib.suppress(FileNotFoundError):
temp_path.unlink()


def flatten[T](list_of_lists: Iterable[Iterable[T]]) -> list[T]:
return [item for sublist in list_of_lists for item in sublist]

Expand Down
44 changes: 44 additions & 0 deletions tests/test_auth_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pathlib import Path

import pytest

from mopidy_spotify import auth_state


def test_file_auth_state_store_round_trips_pkce_authorized(tmp_path: Path):
store = auth_state.FileAuthStateStore(tmp_path / "auth.json")
token_id = 1
refresh_token = f"refresh-token-{token_id}"

store.save(auth_state.PkceAuthorizedAuthPayload(refresh_token=refresh_token))

assert store.load() == auth_state.PkceAuthorizedAuthPayload(
refresh_token=refresh_token
)


def test_file_auth_state_store_round_trips_cleared_bridge(tmp_path: Path):
store = auth_state.FileAuthStateStore(tmp_path / "auth.json")

store.save(auth_state.ClearedAuthPayload(mode="bridge"))

assert store.load() == auth_state.ClearedAuthPayload(mode="bridge")


def test_file_auth_state_store_rejects_invalid_json(tmp_path: Path):
auth_state_path = tmp_path / "auth.json"
auth_state_path.write_text("not-json", encoding="utf-8")

with pytest.raises(auth_state.InvalidRefreshTokenError):
auth_state.FileAuthStateStore(auth_state_path).load()


def test_refresh_token_request_requires_pkce_authorized(tmp_path: Path):
auth_state_path = tmp_path / "auth.json"
auth_state_path.write_text(
'{"version":1,"mode":"bridge","state":"configured"}',
encoding="utf-8",
)

with pytest.raises(auth_state.InvalidRefreshTokenError):
auth_state.refresh_token_request(auth_state_path)
6 changes: 6 additions & 0 deletions tests/test_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,9 @@ def test_get_credentials_dir(tmp_path: Path) -> None:

result2 = ext.get_credentials_dir(config) # check exists_ok
assert result == result2


def test_get_auth_state_path(tmp_path: Path) -> None:
config = {"core": {"data_dir": tmp_path}}

assert Extension.get_auth_state_path(config) == (tmp_path / "spotify" / "auth.json")
Loading