Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# cloudpathlib Changelog

## UNRELEASED
- Added support for S3 Multi-Region Access Point (MRAP) URLs in `S3Path` (Issue [#556](https://github.com/drivendataorg/cloudpathlib/issues/556), PR [#557](https://github.com/drivendataorg/cloudpathlib/pull/557))
- Added support for Pydantic serialization (Issue [#537](https://github.com/drivendataorg/cloudpathlib/issues/537), PR [#538](https://github.com/drivendataorg/cloudpathlib/pull/538))

## v0.23.0 (2025-10-07)
Expand Down
11 changes: 11 additions & 0 deletions cloudpathlib/s3/s3path.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Optional, TYPE_CHECKING
Expand All @@ -8,6 +9,10 @@
if TYPE_CHECKING:
from .s3client import S3Client

_MRAP_PATTERN = re.compile(
r"^s3://(?P<arn>arn:aws:s3::\d{12}:accesspoint/[^/]+\.mrap)(?:/(?P<key>.*))?$"
)


@register_path_class("s3")
class S3Path(CloudPath):
Expand Down Expand Up @@ -74,6 +79,12 @@ def stat(self, follow_symlinks=True):

@property
def bucket(self) -> str:
"""The bucket name, or the full MRAP ARN for MRAP paths.

:type: :class:`str`
"""
if match := _MRAP_PATTERN.match(str(self)):
return match.group("arn")
return self._no_prefix.split("/", 1)[0]

@property
Expand Down
9 changes: 2 additions & 7 deletions tests/mock_clients/mock_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,9 @@ def list_buckets(self):
return {"Buckets": [{"Name": DEFAULT_S3_BUCKET_NAME}]}

def head_object(self, Bucket, Key, **kwargs):
if (
not (self.root / Key).exists()
or (self.root / Key).is_dir()
or Bucket != DEFAULT_S3_BUCKET_NAME
):
if not (self.root / Key).exists() or (self.root / Key).is_dir():
raise ClientError({}, {})
else:
return {"key": Key}
return {"key": Key}

def generate_presigned_url(self, op: str, Params: dict, ExpiresIn: int):
mock_presigned_url = f"https://{Params['Bucket']}.s3.amazonaws.com/{Params['Key']}?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=TEST%2FTEST%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240131T194721Z&X-Amz-Expires=3600&X-Amz-SignedHeaders=host&X-Amz-Signature=TEST"
Expand Down
114 changes: 114 additions & 0 deletions tests/test_s3_specific.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from boto3.s3.transfer import TransferConfig
import botocore
from cloudpathlib import S3Client, S3Path
import cloudpathlib.s3.s3client
from cloudpathlib.local import LocalS3Path
import psutil

Expand Down Expand Up @@ -290,3 +291,116 @@ def test_as_url_presign(s3_rig):
assert "Signature" in query_params
else:
assert False, "Unknown presigned URL format"


_MRAP_ARN = "arn:aws:s3::123456789012:accesspoint/my-mrap.mrap"


def test_mrap_bucket_and_key():
"""MRAP paths return the full ARN as bucket and the path suffix as key."""
# MRAP path without key
p = S3Path(f"s3://{_MRAP_ARN}")
assert p.bucket == _MRAP_ARN
assert p.key == ""

# MRAP path with trailing slash
p2 = S3Path(f"s3://{_MRAP_ARN}/")
assert p2.bucket == _MRAP_ARN
assert p2.key == ""

# MRAP path with a single key segment
p3 = S3Path(f"s3://{_MRAP_ARN}/file.txt")
assert p3.bucket == _MRAP_ARN
assert p3.key == "file.txt"

# MRAP path with a nested key
p4 = S3Path(f"s3://{_MRAP_ARN}/folder/sub/file.txt")
assert p4.bucket == _MRAP_ARN
assert p4.key == "folder/sub/file.txt"

# Regular S3 path is unaffected
p5 = S3Path("s3://my-bucket/folder/file.txt")
assert p5.bucket == "my-bucket"
assert p5.key == "folder/file.txt"

# ARN-like strings that are NOT valid MRAPs fall back to normal bucket parsing
# (wrong account ID length, missing .mrap suffix)
p6 = S3Path("s3://arn:aws:s3::12345:accesspoint/x.mrap/key")
assert p6.bucket == "arn:aws:s3::12345:accesspoint" # treated as normal bucket

p7 = S3Path("s3://arn:aws:s3::123456789012:accesspoint/notmrap/key")
assert p7.bucket == "arn:aws:s3::123456789012:accesspoint" # treated as normal bucket


def test_mrap_path_manipulation():
"""MRAP paths support standard path manipulation operations."""
base = S3Path(f"s3://{_MRAP_ARN}")

# Joining via /
child = base / "folder" / "file.txt"
assert str(child) == f"s3://{_MRAP_ARN}/folder/file.txt"
assert child.bucket == _MRAP_ARN
assert child.key == "folder/file.txt"

# name, stem, suffix
assert child.name == "file.txt"
assert child.stem == "file"
assert child.suffix == ".txt"

# parent preserves the MRAP ARN as bucket
parent = child.parent
assert str(parent) == f"s3://{_MRAP_ARN}/folder"
assert parent.bucket == _MRAP_ARN
assert parent.key == "folder"

# with_name and with_suffix
assert str(child.with_name("other.csv")) == f"s3://{_MRAP_ARN}/folder/other.csv"
assert str(child.with_suffix(".csv")) == f"s3://{_MRAP_ARN}/folder/file.csv"

# str / repr round-trip
url = f"s3://{_MRAP_ARN}/folder/file.txt"
assert str(S3Path(url)) == url
assert repr(S3Path(url)) == f"S3Path('{url}')"


def test_mrap_file_operations(monkeypatch):
"""MRAP paths work end-to-end with the mock S3 backend."""
from tests.mock_clients.mock_s3 import mocked_session_class_factory

test_dir = "test_mrap_ops"
monkeypatch.setattr(
cloudpathlib.s3.s3client,
"Session",
mocked_session_class_factory(test_dir),
)

client = S3Client()
base = f"s3://{_MRAP_ARN}/{test_dir}"

# seeded file from test assets
existing = client.CloudPath(f"{base}/dir_0/file0_0.txt")
assert existing.exists()
assert existing.is_file()
assert not existing.is_dir()
assert client.CloudPath(f"{base}/dir_0").is_dir()

# iterdir on the test_dir level: expects dir_0 and dir_1
top_level = list(client.CloudPath(base).iterdir())
assert len(top_level) == 2
assert all(p.is_dir() for p in top_level)
assert {p.name for p in top_level} == {"dir_0", "dir_1"}

# iterdir on dir_0: expects 3 files
dir0_contents = list(client.CloudPath(f"{base}/dir_0").iterdir())
assert len(dir0_contents) == 3
assert all(p.is_file() for p in dir0_contents)

# write / read / delete
new_file = client.CloudPath(f"{base}/mrap_write_test.txt")
assert not new_file.exists()
new_file.write_text("hello from mrap")
assert new_file.exists()
assert new_file.read_text() == "hello from mrap"
assert new_file.bucket == _MRAP_ARN
new_file.unlink()
assert not new_file.exists()