Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ default CompletableFuture<GetBlobResult> getBlob(String blobId, GetBlobOptions o
return future;
}

/**
* Variant that accepts a {@link RestRequest} so the router can resolve a named-blob path
* (e.g. {@code /named/{account}/{container}/{name}}) via {@code IdConverter} before the storage GET.
* Use the {@code String}-only overload when the caller already has a resolved blob ID.
* <p/>
* After {@code IdConverter} resolves named-blob metadata, implementations may translate a subsequent
* {@link RouterErrorCode#BlobDoesNotExist} from storage to {@link RouterErrorCode#AmbryUnavailable}
* (retryable 503) — once metadata says the blob exists, a missing storage response is treated as transient.
* @param restRequest used to invoke {@code IdConverter} for named-blob paths.
* @param blobId blob ID, or a named-blob path when {@code restRequest} is non-null.
* @param options request options.
* @param callback invoked on completion.
* @param quotaChargeCallback for quota accounting.
* @return future containing the {@link GetBlobResult} or an exception.
*/
default Future<GetBlobResult> getBlob(RestRequest restRequest, String blobId, GetBlobOptions options, Callback<GetBlobResult> callback,
QuotaChargeCallback quotaChargeCallback) {
return getBlob(blobId, options, callback, quotaChargeCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,16 +322,42 @@ public Future<GetBlobResult> getBlob(RestRequest restRequest, String blobIdStr,
if (restRequest != null) {
idConverter.convert(restRequest, blobIdStr).whenComplete((convertedId, exception) -> {
if (exception != null) {
completeOperation(futureResult, callback, null, (Exception) exception);
// Skip decrement: only getBlobHelper increments, and we never reached it.
completeOperation(futureResult, callback, null, (Exception) exception, false);
} else {
getBlobHelper(convertedId, options, callback, quotaChargeCallback, futureResult);
// Metadata says the blob exists. Translate storage BlobDoesNotExist to
// AmbryUnavailable so clients retry (503) instead of getting 404.
FutureResult<GetBlobResult> innerFuture = new FutureResult<>();
Callback<GetBlobResult> wrappedCallback = (result, e) -> {
Exception translated = translateNamedBlobMissingInStorage(e);
futureResult.done(result, translated);
if (callback != null) {
callback.onCompletion(result, translated);
}
};
getBlobHelper(convertedId, options, wrappedCallback, quotaChargeCallback, innerFuture);
}
});
}
// Direct path when blobIdStr is already provided
return futureResult;
}

/**
* Translate {@link RouterErrorCode#BlobDoesNotExist} to {@link RouterErrorCode#AmbryUnavailable}
* (retryable 503, not authoritative 404). Other exceptions pass through.
*/
private Exception translateNamedBlobMissingInStorage(Exception e) {
if (e instanceof RouterException
&& ((RouterException) e).getErrorCode() == RouterErrorCode.BlobDoesNotExist) {
routerMetrics.namedBlobMetadataExistsButStorageNotFoundCount.inc();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a log message please be added here, so that we know why the 503-alert is going off ? Or is the logging done elsewhere ?

return new RouterException(
"Named blob metadata exists but storage returned BlobNotFound for the resolved blob ID.",
RouterErrorCode.AmbryUnavailable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 503 (Unavailable) the right code ? In other words, is this situation transient and can clients expect to receive the blob upon retrying ?

}
return e;
}

private void getBlobHelper(String blobIdStr, GetBlobOptions options, Callback<GetBlobResult> callback,
QuotaChargeCallback quotaChargeCallback, FutureResult<GetBlobResult> futureResult) {
if (blobIdStr == null || options == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class NonBlockingRouterMetrics {
public final Counter forceDeleteBlobErrorCount;
public final Counter backgroundDeleterNotFoundCount;
public final Counter backgroundDeleterExceptionCount;
public final Counter namedBlobMetadataExistsButStorageNotFoundCount;
public final Counter operationAbortCount;
public final Counter routerRequestErrorCount;

Expand Down Expand Up @@ -450,6 +451,8 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap, RouterConfig routerConfig
operationAbortCount = metricRegistry.counter(MetricRegistry.name(NonBlockingRouter.class, "OperationAbortCount"));
routerRequestErrorCount =
metricRegistry.counter(MetricRegistry.name(NonBlockingRouter.class, "RouterRequestErrorCount"));
namedBlobMetadataExistsButStorageNotFoundCount = metricRegistry.counter(
MetricRegistry.name(NonBlockingRouter.class, "NamedBlobMetadataExistsButStorageNotFoundCount"));

// Counters for various errors.
ambryUnavailableErrorCount =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import com.github.ambry.rest.RequestPath;
import com.github.ambry.rest.RestMethod;
import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestServiceErrorCode;
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.rest.RestUtils;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.store.StoreKey;
Expand Down Expand Up @@ -1237,6 +1239,124 @@ public void onBlobCreated(String blobId, BlobProperties blobProperties, Account
}
}

/** Storage NOT_FOUND after metadata resolves → AmbryUnavailable (retryable 503). */
@Test
public void testNamedBlobMissingInStorageTranslatedToAmbryUnavailable() throws Exception {
AtomicReference<String> storedBlobId = new AtomicReference<>();
// PUT a named blob; subsequent GETs will resolve to its blob ID via the mock.
MockServerLayout layout = setUpNamedBlobAndPut(storedBlobId);
try {
// All replicas reply BlobNotFound to trigger storage-side miss.
layout.getMockServers().forEach(s -> s.setServerErrorForAllRequests(ServerErrorCode.BlobNotFound));
assertNamedBlobGetTranslatedToAmbryUnavailable("a", "c", "b");
} finally {
if (router != null) { router.close(); assertClosed(); }
}
}

/** Regression: idConverter NotFound stays as NotFound (translation only fires after success). */
@Test
public void testNamedBlobMetadataNotFoundStillReturnsNotFound() throws Exception {
// Wire idConverter to fail with NotFound on the GET-path convert.
setUpRouterWithFailingIdConverter(
new RestServiceException("Named blob not found", RestServiceErrorCode.NotFound));
try {
setOperationParams(); // tearDown's assertClosed calls putBlob; populate params.
RestServiceException cause = (RestServiceException) expectGetFailure("a", "c", "b").getCause();
Assert.assertEquals(RestServiceErrorCode.NotFound, cause.getErrorCode());
Assert.assertEquals(0L, routerMetrics.namedBlobMetadataExistsButStorageNotFoundCount.getCount());
} finally {
if (router != null) { router.close(); assertClosed(); }
}
}

/** Translation also fires when getBlobHelper short-circuits via the notFoundCache. */
@Test
public void testNamedBlobMissingViaNotFoundCacheStillTranslatedToAmbryUnavailable() throws Exception {
AtomicReference<String> storedBlobId = new AtomicReference<>();
// PUT a named blob; subsequent GETs will resolve to its blob ID via the mock.
setUpNamedBlobAndPut(storedBlobId);
try {
// Pre-populate notFoundCache so getBlobHelper short-circuits.
router.getNotFoundCache().put(storedBlobId.get(), Boolean.TRUE);
assertNamedBlobGetTranslatedToAmbryUnavailable("a", "c", "b");
} finally {
if (router != null) { router.close(); assertClosed(); }
}
}

/** Wire a mock IdConverter, set up the router, PUT a named blob {@code /a/c/b}, and stub the
* GET-path {@code convert(RestRequest, String)} to return the resolved blob ID. */
private MockServerLayout setUpNamedBlobAndPut(AtomicReference<String> storedBlobId) throws Exception {
CountDownLatch putLatch = new CountDownLatch(1);
// Wire a mock IdConverterFactory + IdConverter.
IdConverterFactory factory = mock(IdConverterFactory.class);
IdConverter mockIdConverter = mock(IdConverter.class);
when(factory.getIdConverter()).thenReturn(mockIdConverter);
// PUT-path stub: capture the resolved blob ID into storedBlobId and signal the latch.
when(mockIdConverter.convert(any(RestRequest.class), anyString(), any(), any())).thenAnswer(inv -> {
FutureResult<String> f = new FutureResult<>();
String id = inv.getArgument(1);
Callback<String> cb = inv.getArgument(3);
storedBlobId.set(id);
if (cb != null) {
cb.onCompletion(id, null);
}
f.done(id, null);
putLatch.countDown();
return f;
});
// Build the router with the mock factory.
MockServerLayout layout = new MockServerLayout(mockClusterMap);
setRouterWithIdConverterFactory(getNonBlockingRouterProperties(localDcName), layout,
new LoggingNotificationSystem(), factory);
// PUT a named blob /a/c/b, then wait for the convert mock to have run.
setOperationParams();
router.putBlob(createNamedBlobRestRequest(RestMethod.PUT, "a", "c", "b", false, false, false, false),
putBlobProperties, putUserMetadata, putChannel, new PutBlobOptionsBuilder().build(), null, null)
.get(AWAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
putLatch.await();
// GET-path stub: resolve the named blob path back to the captured blob ID.
when(mockIdConverter.convert(any(RestRequest.class), anyString()))
.thenAnswer(inv -> CompletableFuture.completedFuture(storedBlobId.get()));
return layout;
}

/** Set up the router with an IdConverter whose GET-path convert fails with {@code failure}. */
private void setUpRouterWithFailingIdConverter(Exception failure) throws Exception {
// Wire a mock IdConverterFactory + IdConverter.
IdConverterFactory factory = mock(IdConverterFactory.class);
IdConverter mockIdConverter = mock(IdConverter.class);
when(factory.getIdConverter()).thenReturn(mockIdConverter);
// GET-path stub: fail with the given exception.
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(failure);
when(mockIdConverter.convert(any(RestRequest.class), anyString())).thenReturn(failed);
// Build the router with the mock factory.
setRouterWithIdConverterFactory(getNonBlockingRouterProperties(localDcName),
new MockServerLayout(mockClusterMap), new LoggingNotificationSystem(), factory);
}

/** GET {@code /named/{account}/{container}/{blob}} expecting failure; returns the wrapping ExecutionException. */
private ExecutionException expectGetFailure(String account, String container, String blob) throws Exception {
RestRequest get = createNamedBlobRestRequest(RestMethod.GET, account, container, blob, false, false, false, true);
try {
router.getBlob(get, (String) get.getArgs().get(BLOB_ID), new GetBlobOptionsBuilder().build(), null, null)
.get(AWAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
throw new AssertionError("Expected failure");
} catch (ExecutionException ee) {
return ee;
}
}

/** Verify a named-blob GET surfaces as AmbryUnavailable and increments the translation metric. */
private void assertNamedBlobGetTranslatedToAmbryUnavailable(String account, String container, String blob)
throws Exception {
Assert.assertEquals(RouterErrorCode.AmbryUnavailable,
((RouterException) expectGetFailure(account, container, blob).getCause()).getErrorCode());
Assert.assertEquals(1L, routerMetrics.namedBlobMetadataExistsButStorageNotFoundCount.getCount());
}

/**
* Test named blob stitch after we move id converter into router.
*/
Expand Down
Loading