Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloudsmith_cli/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
auth,
check,
copy,
credential_helper,
delete,
dependencies,
docs,
Expand Down
31 changes: 31 additions & 0 deletions cloudsmith_cli/cli/commands/credential_helper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Credential helper commands for Cloudsmith.

This module provides credential helper commands for package managers
that follow their respective credential helper protocols.
"""

import click

from ..main import main
from .docker import docker as docker_cmd


@click.group()
def credential_helper():
"""
Credential helpers for package managers.

These commands provide credentials for package managers like Docker.
They are typically called by wrapper binaries
(e.g., docker-credential-cloudsmith) or used directly for debugging.

Examples:
# Test Docker credential helper
$ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker
"""


credential_helper.add_command(docker_cmd, name="docker")

main.add_command(credential_helper, name="credential-helper")
80 changes: 80 additions & 0 deletions cloudsmith_cli/cli/commands/credential_helper/docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Docker credential helper command.

Implements the Docker credential helper protocol for Cloudsmith registries.

See: https://github.com/docker/docker-credential-helpers
"""

import json
import sys

import click

from ....credential_helpers.docker import get_credentials
from ...decorators import common_api_auth_options, resolve_credentials


@click.command()
@common_api_auth_options
@resolve_credentials
def docker(opts):
"""
Docker credential helper for Cloudsmith registries.

Reads a Docker registry server URL from stdin and returns credentials in JSON format.
This command implements the 'get' operation of the Docker credential helper protocol.

Only provides credentials for Cloudsmith Docker registries (docker.cloudsmith.io).

Input (stdin):
Server URL as plain text (e.g., "docker.cloudsmith.io")

Output (stdout):
JSON: {"Username": "token", "Secret": "<cloudsmith-token>"}

Exit codes:
0: Success
1: Error (no credentials available, not a Cloudsmith registry, etc.)

Examples:
# Manual testing
$ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker
{"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."}

# Called by Docker via wrapper
$ echo "docker.cloudsmith.io" | docker-credential-cloudsmith get
{"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."}

Environment variables:
CLOUDSMITH_API_KEY: API key for authentication (optional)
CLOUDSMITH_ORG: Organization slug (required for custom domain support)
"""
try:
server_url = sys.stdin.read().strip()

if not server_url:
click.echo("Error: No server URL provided on stdin", err=True)
sys.exit(1)

credentials = get_credentials(
server_url,
credential=opts.credential,
session=opts.session,
api_host=opts.api_host or "https://api.cloudsmith.io",
)

if not credentials:
click.echo(
"Error: Unable to retrieve credentials. "
"Make sure you have a valid cloudsmith-cli session, "
"this can be checked with `cloudsmith whoami`.",
err=True,
)
sys.exit(1)

click.echo(json.dumps(credentials))

except OSError as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
32 changes: 10 additions & 22 deletions cloudsmith_cli/cli/commands/whoami.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
"""CLI/Commands - Retrieve authentication status."""

import os

import click

from ...core import keyring
from ...core.api.exceptions import ApiException
from ...core.api.user import get_token_metadata, get_user_brief
from .. import decorators, utils
from ..config import CredentialsReader
from ..exceptions import handle_api_exceptions
from .main import main

Expand All @@ -26,26 +23,17 @@ def _get_active_method(api_config):
def _get_api_key_source(opts):
"""Determine where the API key was loaded from.

Checks in priority order matching actual resolution:
CLI --api-key flag > CLOUDSMITH_API_KEY env var > credentials.ini.
Uses the credential provider chain result attached by initialise_api.
"""
if not opts.api_key:
return {"configured": False, "source": None, "source_key": None}

env_key = os.environ.get("CLOUDSMITH_API_KEY")

# If env var is set but differs from the resolved key, CLI flag won
if env_key and opts.api_key != env_key:
source, key = "CLI --api-key flag", "cli_flag"
elif env_key:
suffix = env_key[-4:]
source, key = f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", "env_var"
elif creds := CredentialsReader.find_existing_files():
source, key = f"credentials.ini ({creds[0]})", "credentials_file"
else:
source, key = "CLI --api-key flag", "cli_flag"

return {"configured": True, "source": source, "source_key": key}
credential = getattr(opts, "credential", None)
if credential:
return {
"configured": True,
"source": credential.source_detail or credential.source_name,
"source_key": credential.source_name,
}

return {"configured": False, "source": None, "source_key": None}


def _get_sso_status(api_host):
Expand Down
109 changes: 91 additions & 18 deletions cloudsmith_cli/cli/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from cloudsmith_cli.cli import validators

from ..core.api.init import initialise_api as _initialise_api
from ..core.credentials.chain import CredentialProviderChain
from ..core.credentials.models import CredentialContext
from ..core.mcp import server
from ..core.rest import create_requests_session as _create_session
from . import config, utils


Expand All @@ -20,6 +23,14 @@ def report_retry(seconds, context=None):
)


def _pop_boolean_flag(kwargs, name, invert=False):
"""Pop a boolean flag from kwargs, optionally inverting it."""
value = kwargs.pop(name)
if value is not None and invert:
value = not value
return value


def common_package_action_options(f):
"""Add common options for package actions."""

Expand Down Expand Up @@ -214,15 +225,17 @@ def common_api_auth_options(f):
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
opts = config.get_or_create_options(ctx)
opts.api_key = kwargs.pop("api_key")
api_key = kwargs.pop("api_key")
if api_key:
opts.api_key = api_key
kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return wrapper


def initialise_api(f):
"""Initialise the Cloudsmith API for use."""
def initialise_session(f):
"""Create a shared HTTP session with proxy/SSL/user-agent settings."""

@click.option(
"--api-host", envvar="CLOUDSMITH_API_HOST", help="The API host to connect to."
Expand Down Expand Up @@ -252,6 +265,78 @@ def initialise_api(f):
envvar="CLOUDSMITH_API_HEADERS",
help="A CSV list of extra headers (key=value) to send to the API.",
)
@click.pass_context
@functools.wraps(f)
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
opts = config.get_or_create_options(ctx)
opts.api_host = kwargs.pop("api_host")
opts.api_proxy = kwargs.pop("api_proxy")
opts.api_ssl_verify = _pop_boolean_flag(
kwargs, "without_api_ssl_verify", invert=True
)
opts.api_user_agent = kwargs.pop("api_user_agent")
opts.api_headers = kwargs.pop("api_headers")

opts.session = _create_session(
proxy=opts.api_proxy,
ssl_verify=opts.api_ssl_verify,
user_agent=opts.api_user_agent,
headers=opts.api_headers,
)

kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return wrapper


def resolve_credentials(f):
"""Resolve credentials via the provider chain. Depends on initialise_session."""

@click.pass_context
@functools.wraps(f)
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
opts = config.get_or_create_options(ctx)

context = CredentialContext(
session=opts.session,
api_key=opts.api_key,
api_host=opts.api_host or "https://api.cloudsmith.io",
creds_file_path=ctx.meta.get("creds_file"),
profile=ctx.meta.get("profile"),
debug=opts.debug,
)

chain = CredentialProviderChain()
credential = chain.resolve(context)

if context.keyring_refresh_failed:
click.secho(
"An error occurred when attempting to refresh your SSO access token. "
"To refresh this session, run 'cloudsmith auth'",
fg="yellow",
err=True,
)
if credential:
click.secho(
"Falling back to API key authentication.",
fg="yellow",
err=True,
)

opts.credential = credential

kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return initialise_session(wrapper)


def initialise_api(f):
"""Initialise the Cloudsmith API for use. Depends on resolve_credentials."""

@click.option(
"-R",
"--without-rate-limit",
Expand Down Expand Up @@ -294,20 +379,8 @@ def initialise_api(f):
@functools.wraps(f)
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
def _set_boolean(name, invert=False):
value = kwargs.pop(name)
value = value if value is not None else None
if value is not None and invert:
value = not value
return value

opts = config.get_or_create_options(ctx)
opts.api_host = kwargs.pop("api_host")
opts.api_proxy = kwargs.pop("api_proxy")
opts.api_ssl_verify = _set_boolean("without_api_ssl_verify", invert=True)
opts.api_user_agent = kwargs.pop("api_user_agent")
opts.api_headers = kwargs.pop("api_headers")
opts.rate_limit = _set_boolean("without_rate_limit", invert=True)
opts.rate_limit = _pop_boolean_flag(kwargs, "without_rate_limit", invert=True)
opts.rate_limit_warning = kwargs.pop("rate_limit_warning")
opts.error_retry_max = kwargs.pop("error_retry_max")
opts.error_retry_backoff = kwargs.pop("error_retry_backoff")
Expand All @@ -320,7 +393,7 @@ def call_print_rate_limit_info_with_opts(rate_info):
opts.api_config = _initialise_api(
debug=opts.debug,
host=opts.api_host,
key=opts.api_key,
credential=opts.credential,
proxy=opts.api_proxy,
ssl_verify=opts.api_ssl_verify,
user_agent=opts.api_user_agent,
Expand All @@ -336,7 +409,7 @@ def call_print_rate_limit_info_with_opts(rate_info):
kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return wrapper
return resolve_credentials(wrapper)


def initialise_mcp(f):
Expand Down
5 changes: 4 additions & 1 deletion cloudsmith_cli/cli/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ...core.api.init import initialise_api
from ...core.api.repos import create_repo, delete_repo
from ...core.credentials.models import CredentialResult
from .utils import random_str


Expand Down Expand Up @@ -51,7 +52,9 @@ def organization():
@pytest.fixture()
def tmp_repository(organization, api_host, api_key):
"""Yield a temporary repository."""
initialise_api(host=api_host, key=api_key)
initialise_api(
host=api_host, credential=CredentialResult(api_key=api_key, source_name="test")
)
repo_data = create_repo(organization, {"name": random_str()})
yield repo_data
delete_repo(organization, repo_data["slug"])
Expand Down
6 changes: 5 additions & 1 deletion cloudsmith_cli/cli/tests/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ def test_refresh_api_config_passes_sso_token(self):

mock_init_api.assert_called_once()
call_kwargs = mock_init_api.call_args.kwargs
assert call_kwargs.get("access_token") == "test_sso_token_123"
credential = call_kwargs.get("credential")
assert credential is not None
assert credential.api_key == "test_sso_token_123"
assert credential.auth_type == "bearer"
assert credential.source_name == "sso"


class TestAuthenticationWebRequestHandlerKeyring:
Expand Down
11 changes: 10 additions & 1 deletion cloudsmith_cli/cli/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from ..core.api.exceptions import ApiException
from ..core.api.init import initialise_api
from ..core.credentials.models import CredentialResult
from ..core.keyring import store_sso_tokens
from .saml import exchange_2fa_token

Expand Down Expand Up @@ -79,7 +80,15 @@ def refresh_api_config_after_auth(self):
user_agent=getattr(self.api_opts, "user_agent", None),
headers=getattr(self.api_opts, "headers", None),
rate_limit=getattr(self.api_opts, "rate_limit", True),
access_token=self.sso_access_token,
credential=(
CredentialResult(
api_key=self.sso_access_token,
source_name="sso",
auth_type="bearer",
)
if self.sso_access_token
else None
),
)

def finish_request(self, request, client_address):
Expand Down
Loading
Loading