diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index 50c0145be9348..2d7fea8532e24 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -243,10 +243,33 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { .intType() .defaultValue(3) .withDescription( - "Maximum number of retry attempts for failed S3 requests. " - + "Uses the AWS SDK's default retry strategy (exponential backoff with jitter). " + "Maximum number of retries for failed S3 requests (excluding the initial attempt). " + "Set to 0 to disable retries."); + public static final ConfigOption RETRY_BASE_DELAY = + ConfigOptions.key("s3.retry.base-delay") + .durationType() + .defaultValue(Duration.ofMillis(100)) + .withDescription( + "Base delay for exponential backoff on non-throttle retries. " + + "Uses exponential backoff with full jitter, capped by s3.retry.max-backoff."); + + public static final ConfigOption RETRY_THROTTLE_BASE_DELAY = + ConfigOptions.key("s3.retry.throttle.base-delay") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription( + "Base delay for exponential backoff on throttle retries (HTTP 429, 503). " + + "Uses exponential backoff with full jitter, capped by s3.retry.max-backoff."); + + public static final ConfigOption RETRY_MAX_BACKOFF = + ConfigOptions.key("s3.retry.max-backoff") + .durationType() + .defaultValue(Duration.ofSeconds(20)) + .withDescription( + "Maximum delay cap for exponential backoff, applied to both " + + "normal and throttle retry paths."); + public static final ConfigOption CONNECTION_TIMEOUT = ConfigOptions.key("s3.connection.timeout") .durationType() @@ -458,6 +481,9 @@ public FileSystem create(URI fsUri) throws IOException { .assumeRoleSessionName(assumeRoleSessionName) .assumeRoleSessionDurationSeconds(assumeRoleSessionDuration) .maxRetries(config.get(MAX_RETRIES)) + .retryBaseDelay(config.get(RETRY_BASE_DELAY)) + .retryThrottleBaseDelay(config.get(RETRY_THROTTLE_BASE_DELAY)) + .retryMaxBackoff(config.get(RETRY_MAX_BACKOFF)) .credentialsProviderClasses(credentialsProviderClasses) .encryptionConfig(encryptionConfig) .build(); diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java index cd1c918c486f6..e3c7a1f238054 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java @@ -33,11 +33,12 @@ import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; -import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; +import software.amazon.awssdk.retries.StandardRetryStrategy; +import software.amazon.awssdk.retries.api.BackoffStrategy; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; @@ -88,6 +89,9 @@ class S3ClientProvider implements AutoCloseableAsync { private final boolean checksumValidation; private final int maxConnections; private final int maxRetries; + private final Duration retryBaseDelay; + private final Duration retryThrottleBaseDelay; + private final Duration retryMaxBackoff; @Nullable private final String region; @Nullable private final String endpoint; @Nullable private final String assumeRoleArn; @@ -111,6 +115,9 @@ private S3ClientProvider( boolean checksumValidation, int maxConnections, int maxRetries, + Duration retryBaseDelay, + Duration retryThrottleBaseDelay, + Duration retryMaxBackoff, @Nullable String region, @Nullable String endpoint, @Nullable String assumeRoleArn, @@ -141,6 +148,13 @@ private S3ClientProvider( this.checksumValidation = checksumValidation; this.maxConnections = maxConnections; this.maxRetries = maxRetries; + this.retryBaseDelay = + Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay must not be null"); + this.retryThrottleBaseDelay = + Preconditions.checkNotNull( + retryThrottleBaseDelay, "retryThrottleBaseDelay must not be null"); + this.retryMaxBackoff = + Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff must not be null"); this.region = region; this.endpoint = endpoint; this.assumeRoleArn = assumeRoleArn; @@ -214,6 +228,26 @@ int getMaxRetries() { return maxRetries; } + @VisibleForTesting + int getMaxAttempts() { + return maxRetries + 1; + } + + @VisibleForTesting + Duration getRetryBaseDelay() { + return retryBaseDelay; + } + + @VisibleForTesting + Duration getRetryThrottleBaseDelay() { + return retryThrottleBaseDelay; + } + + @VisibleForTesting + Duration getRetryMaxBackoff() { + return retryMaxBackoff; + } + @VisibleForTesting @Nullable String getRegion() { @@ -312,6 +346,11 @@ public static class Builder { private Duration socketTimeout = Duration.ofSeconds(60); private Duration connectionMaxIdleTime = Duration.ofSeconds(60); private int maxRetries = 3; + private Duration retryBaseDelay = NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue(); + private Duration retryThrottleBaseDelay = + NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue(); + private Duration retryMaxBackoff = + NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue(); private Duration clientCloseTimeout = Duration.ofSeconds(30); // AssumeRole configuration @@ -391,6 +430,37 @@ public Builder maxRetries(int maxRetries) { return this; } + public Builder retryBaseDelay(Duration retryBaseDelay) { + Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay must not be null"); + Preconditions.checkArgument( + !retryBaseDelay.isNegative(), + "retryBaseDelay must not be negative, but was %s", + retryBaseDelay); + this.retryBaseDelay = retryBaseDelay; + return this; + } + + public Builder retryThrottleBaseDelay(Duration retryThrottleBaseDelay) { + Preconditions.checkNotNull( + retryThrottleBaseDelay, "retryThrottleBaseDelay must not be null"); + Preconditions.checkArgument( + !retryThrottleBaseDelay.isNegative(), + "retryThrottleBaseDelay must not be negative, but was %s", + retryThrottleBaseDelay); + this.retryThrottleBaseDelay = retryThrottleBaseDelay; + return this; + } + + public Builder retryMaxBackoff(Duration retryMaxBackoff) { + Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff must not be null"); + Preconditions.checkArgument( + retryMaxBackoff.toMillis() > 0, + "retryMaxBackoff must be positive, but was %s", + retryMaxBackoff); + this.retryMaxBackoff = retryMaxBackoff; + return this; + } + public Builder assumeRoleArn(@Nullable String assumeRoleArn) { this.assumeRoleArn = assumeRoleArn; return this; @@ -458,9 +528,30 @@ S3ClientProvider build() { .checksumValidationEnabled(checksumValidation) .build(); + Preconditions.checkArgument( + retryMaxBackoff.compareTo(retryBaseDelay) >= 0, + "retryMaxBackoff (%s) must be >= retryBaseDelay (%s)", + retryMaxBackoff, + retryBaseDelay); + Preconditions.checkArgument( + retryMaxBackoff.compareTo(retryThrottleBaseDelay) >= 0, + "retryMaxBackoff (%s) must be >= retryThrottleBaseDelay (%s)", + retryMaxBackoff, + retryThrottleBaseDelay); + ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() - .retryPolicy(RetryPolicy.builder().numRetries(maxRetries).build()) + .retryStrategy( + StandardRetryStrategy.builder() + .maxAttempts(maxRetries + 1) + .backoffStrategy( + BackoffStrategy.exponentialDelay( + retryBaseDelay, retryMaxBackoff)) + .throttlingBackoffStrategy( + BackoffStrategy.exponentialDelay( + retryThrottleBaseDelay, + retryMaxBackoff)) + .build()) .build(); ApacheHttpClient.Builder httpClientBuilder = @@ -516,6 +607,9 @@ S3ClientProvider build() { checksumValidation, maxConnections, maxRetries, + retryBaseDelay, + retryThrottleBaseDelay, + retryMaxBackoff, region, endpoint, assumeRoleArn, diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java index e673af3a55e43..f256b0a2040c5 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java @@ -178,6 +178,57 @@ void testMaxRetriesExplicitlyConfigured() throws Exception { assertThat(createFs(config).getClientProvider().getMaxRetries()).isEqualTo(5); } + @Test + void testMaxAttemptsIsMaxRetriesPlusOne() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.MAX_RETRIES, 4); + assertThat(createFs(config).getClientProvider().getMaxAttempts()).isEqualTo(5); + } + + // --- Retry backoff --- + + @Test + void testRetryBaseDelayDefault() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().getRetryBaseDelay()) + .isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue()); + } + + @Test + void testRetryBaseDelayExplicitlyConfigured() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.RETRY_BASE_DELAY, Duration.ofMillis(200)); + assertThat(createFs(config).getClientProvider().getRetryBaseDelay()) + .isEqualTo(Duration.ofMillis(200)); + } + + @Test + void testRetryThrottleBaseDelayDefault() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().getRetryThrottleBaseDelay()) + .isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue()); + } + + @Test + void testRetryThrottleBaseDelayExplicitlyConfigured() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY, Duration.ofSeconds(2)); + assertThat(createFs(config).getClientProvider().getRetryThrottleBaseDelay()) + .isEqualTo(Duration.ofSeconds(2)); + } + + @Test + void testRetryMaxBackoffDefault() throws Exception { + assertThat(createFs(baseConfig()).getClientProvider().getRetryMaxBackoff()) + .isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue()); + } + + @Test + void testRetryMaxBackoffExplicitlyConfigured() throws Exception { + Configuration config = baseConfig(); + config.set(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF, Duration.ofSeconds(30)); + assertThat(createFs(config).getClientProvider().getRetryMaxBackoff()) + .isEqualTo(Duration.ofSeconds(30)); + } + // --- Timeouts --- @Test diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java index 4e43aac3383f9..51a457cc537ab 100644 --- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import java.lang.reflect.Field; +import java.time.Duration; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -203,6 +204,73 @@ void testEmptyProviderStringThrows() { .hasMessageContaining("no valid provider class names"); } + @Test + void testRetryBuilderDefaultsMatchConfigOptions() { + S3ClientProvider provider = + S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build(); + + assertThat(provider.getRetryBaseDelay()) + .isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue()); + assertThat(provider.getRetryThrottleBaseDelay()) + .isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue()); + assertThat(provider.getRetryMaxBackoff()) + .isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue()); + } + + @Test + void testNegativeRetryBaseDelayThrows() { + assertThatThrownBy(() -> S3ClientProvider.builder().retryBaseDelay(Duration.ofMillis(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("retryBaseDelay must not be negative"); + } + + @Test + void testNegativeRetryThrottleBaseDelayThrows() { + assertThatThrownBy( + () -> + S3ClientProvider.builder() + .retryThrottleBaseDelay(Duration.ofMillis(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("retryThrottleBaseDelay must not be negative"); + } + + @Test + void testZeroRetryMaxBackoffThrows() { + assertThatThrownBy(() -> S3ClientProvider.builder().retryMaxBackoff(Duration.ZERO)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("retryMaxBackoff must be positive"); + } + + @Test + void testRetryMaxBackoffSmallerThanBaseDelayThrows() { + assertThatThrownBy( + () -> + S3ClientProvider.builder() + .endpoint(DUMMY_ENDPOINT) + .region(DUMMY_REGION) + .retryBaseDelay(Duration.ofSeconds(5)) + .retryMaxBackoff(Duration.ofSeconds(1)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("retryMaxBackoff") + .hasMessageContaining("retryBaseDelay"); + } + + @Test + void testRetryMaxBackoffSmallerThanThrottleBaseDelayThrows() { + assertThatThrownBy( + () -> + S3ClientProvider.builder() + .endpoint(DUMMY_ENDPOINT) + .region(DUMMY_REGION) + .retryThrottleBaseDelay(Duration.ofSeconds(5)) + .retryMaxBackoff(Duration.ofSeconds(1)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("retryMaxBackoff") + .hasMessageContaining("retryThrottleBaseDelay"); + } + @SuppressWarnings("unchecked") private static List extractChain(AwsCredentialsProvider provider) throws Exception {