Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,33 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
.intType()
.defaultValue(3)
.withDescription(
"Maximum number of retry attempts for failed S3 requests. "
+ "Uses the AWS SDK's default retry strategy (exponential backoff with jitter). "
"Maximum number of retries for failed S3 requests (excluding the initial attempt). "
+ "Set to 0 to disable retries.");

public static final ConfigOption<Duration> RETRY_BASE_DELAY =
ConfigOptions.key("s3.retry.base-delay")
.durationType()
.defaultValue(Duration.ofMillis(100))
.withDescription(
"Base delay for exponential backoff on non-throttle retries. "
+ "Uses exponential backoff with full jitter, capped by s3.retry.max-backoff.");

public static final ConfigOption<Duration> RETRY_THROTTLE_BASE_DELAY =
ConfigOptions.key("s3.retry.throttle.base-delay")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
"Base delay for exponential backoff on throttle retries (HTTP 429, 503). "
+ "Uses exponential backoff with full jitter, capped by s3.retry.max-backoff.");

public static final ConfigOption<Duration> RETRY_MAX_BACKOFF =
ConfigOptions.key("s3.retry.max-backoff")
.durationType()
.defaultValue(Duration.ofSeconds(20))
.withDescription(
"Maximum delay cap for exponential backoff, applied to both "
+ "normal and throttle retry paths.");

public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
ConfigOptions.key("s3.connection.timeout")
.durationType()
Expand Down Expand Up @@ -458,6 +481,9 @@ public FileSystem create(URI fsUri) throws IOException {
.assumeRoleSessionName(assumeRoleSessionName)
.assumeRoleSessionDurationSeconds(assumeRoleSessionDuration)
.maxRetries(config.get(MAX_RETRIES))
.retryBaseDelay(config.get(RETRY_BASE_DELAY))
.retryThrottleBaseDelay(config.get(RETRY_THROTTLE_BASE_DELAY))
.retryMaxBackoff(config.get(RETRY_MAX_BACKOFF))
.credentialsProviderClasses(credentialsProviderClasses)
.encryptionConfig(encryptionConfig)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.retries.StandardRetryStrategy;
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -88,6 +89,9 @@ class S3ClientProvider implements AutoCloseableAsync {
private final boolean checksumValidation;
private final int maxConnections;
private final int maxRetries;
private final Duration retryBaseDelay;
private final Duration retryThrottleBaseDelay;
private final Duration retryMaxBackoff;
@Nullable private final String region;
@Nullable private final String endpoint;
@Nullable private final String assumeRoleArn;
Expand All @@ -111,6 +115,9 @@ private S3ClientProvider(
boolean checksumValidation,
int maxConnections,
int maxRetries,
Duration retryBaseDelay,
Duration retryThrottleBaseDelay,
Duration retryMaxBackoff,
@Nullable String region,
@Nullable String endpoint,
@Nullable String assumeRoleArn,
Expand Down Expand Up @@ -141,6 +148,13 @@ private S3ClientProvider(
this.checksumValidation = checksumValidation;
this.maxConnections = maxConnections;
this.maxRetries = maxRetries;
this.retryBaseDelay =
Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay must not be null");
this.retryThrottleBaseDelay =
Preconditions.checkNotNull(
retryThrottleBaseDelay, "retryThrottleBaseDelay must not be null");
this.retryMaxBackoff =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid values are possible. Add condition check for negative durations, zero durations, maxBackoff < baseDelay, maxBackoff < throttleBaseDelay

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered.

Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff must not be null");
this.region = region;
this.endpoint = endpoint;
this.assumeRoleArn = assumeRoleArn;
Expand Down Expand Up @@ -214,6 +228,26 @@ int getMaxRetries() {
return maxRetries;
}

@VisibleForTesting
int getMaxAttempts() {
return maxRetries + 1;
}

@VisibleForTesting
Duration getRetryBaseDelay() {
return retryBaseDelay;
}

@VisibleForTesting
Duration getRetryThrottleBaseDelay() {
return retryThrottleBaseDelay;
}

@VisibleForTesting
Duration getRetryMaxBackoff() {
return retryMaxBackoff;
}

@VisibleForTesting
@Nullable
String getRegion() {
Expand Down Expand Up @@ -312,6 +346,11 @@ public static class Builder {
private Duration socketTimeout = Duration.ofSeconds(60);
private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
private int maxRetries = 3;
private Duration retryBaseDelay = NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue();
private Duration retryThrottleBaseDelay =
NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue();
private Duration retryMaxBackoff =
NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue();
private Duration clientCloseTimeout = Duration.ofSeconds(30);

// AssumeRole configuration
Expand Down Expand Up @@ -391,6 +430,37 @@ public Builder maxRetries(int maxRetries) {
return this;
}

public Builder retryBaseDelay(Duration retryBaseDelay) {
Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay must not be null");
Preconditions.checkArgument(
!retryBaseDelay.isNegative(),
"retryBaseDelay must not be negative, but was %s",
retryBaseDelay);
this.retryBaseDelay = retryBaseDelay;
return this;
}

public Builder retryThrottleBaseDelay(Duration retryThrottleBaseDelay) {
Preconditions.checkNotNull(
retryThrottleBaseDelay, "retryThrottleBaseDelay must not be null");
Preconditions.checkArgument(
!retryThrottleBaseDelay.isNegative(),
"retryThrottleBaseDelay must not be negative, but was %s",
retryThrottleBaseDelay);
this.retryThrottleBaseDelay = retryThrottleBaseDelay;
return this;
}

public Builder retryMaxBackoff(Duration retryMaxBackoff) {
Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff must not be null");
Preconditions.checkArgument(
retryMaxBackoff.toMillis() > 0,
"retryMaxBackoff must be positive, but was %s",
retryMaxBackoff);
this.retryMaxBackoff = retryMaxBackoff;
return this;
}

public Builder assumeRoleArn(@Nullable String assumeRoleArn) {
this.assumeRoleArn = assumeRoleArn;
return this;
Expand Down Expand Up @@ -458,9 +528,30 @@ S3ClientProvider build() {
.checksumValidationEnabled(checksumValidation)
.build();

Preconditions.checkArgument(
retryMaxBackoff.compareTo(retryBaseDelay) >= 0,
"retryMaxBackoff (%s) must be >= retryBaseDelay (%s)",
retryMaxBackoff,
retryBaseDelay);
Preconditions.checkArgument(
retryMaxBackoff.compareTo(retryThrottleBaseDelay) >= 0,
"retryMaxBackoff (%s) must be >= retryThrottleBaseDelay (%s)",
retryMaxBackoff,
retryThrottleBaseDelay);

ClientOverrideConfiguration overrideConfig =
ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(maxRetries).build())
.retryStrategy(
StandardRetryStrategy.builder()
.maxAttempts(maxRetries + 1)
.backoffStrategy(
BackoffStrategy.exponentialDelay(
retryBaseDelay, retryMaxBackoff))
.throttlingBackoffStrategy(
BackoffStrategy.exponentialDelay(
retryThrottleBaseDelay,
retryMaxBackoff))
.build())
.build();

ApacheHttpClient.Builder httpClientBuilder =
Expand Down Expand Up @@ -516,6 +607,9 @@ S3ClientProvider build() {
checksumValidation,
maxConnections,
maxRetries,
retryBaseDelay,
retryThrottleBaseDelay,
retryMaxBackoff,
region,
endpoint,
assumeRoleArn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,57 @@ void testMaxRetriesExplicitlyConfigured() throws Exception {
assertThat(createFs(config).getClientProvider().getMaxRetries()).isEqualTo(5);
}

@Test
void testMaxAttemptsIsMaxRetriesPlusOne() throws Exception {
Configuration config = baseConfig();
config.set(NativeS3FileSystemFactory.MAX_RETRIES, 4);
assertThat(createFs(config).getClientProvider().getMaxAttempts()).isEqualTo(5);
}

// --- Retry backoff ---

@Test
void testRetryBaseDelayDefault() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests for actual retry strategy construction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

assertThat(createFs(baseConfig()).getClientProvider().getRetryBaseDelay())
.isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue());
}

@Test
void testRetryBaseDelayExplicitlyConfigured() throws Exception {
Configuration config = baseConfig();
config.set(NativeS3FileSystemFactory.RETRY_BASE_DELAY, Duration.ofMillis(200));
assertThat(createFs(config).getClientProvider().getRetryBaseDelay())
.isEqualTo(Duration.ofMillis(200));
}

@Test
void testRetryThrottleBaseDelayDefault() throws Exception {
assertThat(createFs(baseConfig()).getClientProvider().getRetryThrottleBaseDelay())
.isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue());
}

@Test
void testRetryThrottleBaseDelayExplicitlyConfigured() throws Exception {
Configuration config = baseConfig();
config.set(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY, Duration.ofSeconds(2));
assertThat(createFs(config).getClientProvider().getRetryThrottleBaseDelay())
.isEqualTo(Duration.ofSeconds(2));
}

@Test
void testRetryMaxBackoffDefault() throws Exception {
assertThat(createFs(baseConfig()).getClientProvider().getRetryMaxBackoff())
.isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue());
}

@Test
void testRetryMaxBackoffExplicitlyConfigured() throws Exception {
Configuration config = baseConfig();
config.set(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF, Duration.ofSeconds(30));
assertThat(createFs(config).getClientProvider().getRetryMaxBackoff())
.isEqualTo(Duration.ofSeconds(30));
}

// --- Timeouts ---

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -203,6 +204,73 @@ void testEmptyProviderStringThrows() {
.hasMessageContaining("no valid provider class names");
}

@Test
void testRetryBuilderDefaultsMatchConfigOptions() {
S3ClientProvider provider =
S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build();

assertThat(provider.getRetryBaseDelay())
.isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue());
assertThat(provider.getRetryThrottleBaseDelay())
.isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue());
assertThat(provider.getRetryMaxBackoff())
.isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue());
}

@Test
void testNegativeRetryBaseDelayThrows() {
assertThatThrownBy(() -> S3ClientProvider.builder().retryBaseDelay(Duration.ofMillis(-1)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("retryBaseDelay must not be negative");
}

@Test
void testNegativeRetryThrottleBaseDelayThrows() {
assertThatThrownBy(
() ->
S3ClientProvider.builder()
.retryThrottleBaseDelay(Duration.ofMillis(-1)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("retryThrottleBaseDelay must not be negative");
}

@Test
void testZeroRetryMaxBackoffThrows() {
assertThatThrownBy(() -> S3ClientProvider.builder().retryMaxBackoff(Duration.ZERO))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("retryMaxBackoff must be positive");
}

@Test
void testRetryMaxBackoffSmallerThanBaseDelayThrows() {
assertThatThrownBy(
() ->
S3ClientProvider.builder()
.endpoint(DUMMY_ENDPOINT)
.region(DUMMY_REGION)
.retryBaseDelay(Duration.ofSeconds(5))
.retryMaxBackoff(Duration.ofSeconds(1))
.build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("retryMaxBackoff")
.hasMessageContaining("retryBaseDelay");
}

@Test
void testRetryMaxBackoffSmallerThanThrottleBaseDelayThrows() {
assertThatThrownBy(
() ->
S3ClientProvider.builder()
.endpoint(DUMMY_ENDPOINT)
.region(DUMMY_REGION)
.retryThrottleBaseDelay(Duration.ofSeconds(5))
.retryMaxBackoff(Duration.ofSeconds(1))
.build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("retryMaxBackoff")
.hasMessageContaining("retryThrottleBaseDelay");
}

@SuppressWarnings("unchecked")
private static List<AwsCredentialsProvider> extractChain(AwsCredentialsProvider provider)
throws Exception {
Expand Down