diff --git a/ambry-rest/src/main/java/com/github/ambry/rest/NettyMessageProcessor.java b/ambry-rest/src/main/java/com/github/ambry/rest/NettyMessageProcessor.java index 9f66502550..8a8d9fdfd7 100644 --- a/ambry-rest/src/main/java/com/github/ambry/rest/NettyMessageProcessor.java +++ b/ambry-rest/src/main/java/com/github/ambry/rest/NettyMessageProcessor.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,9 +139,12 @@ public void channelInactive(ChannelHandlerContext ctx) { logger.trace("Channel {} inactive", ctx.channel()); nettyMetrics.channelDestructionRate.mark(); if (request != null && request.isOpen()) { - logger.error("Request {} was aborted because the channel {} became inactive", request.getUri(), ctx.channel()); + nettyMetrics.channelInactiveWithActiveRequestCount.inc(); + logger.error("Request {} was aborted because the channel {} became inactive. Method: {}, bytesReceived: {}", + request.getUri(), ctx.channel(), request.getRestMethod(), request.getBlobBytesReceived()); onRequestAborted(Utils.convertToClientTerminationException(new ClosedChannelException())); } else { + nettyMetrics.channelInactiveWithoutActiveRequestCount.inc(); if (request != null) { logger.error("Request {} on channel {} was already closed", request.getUri(), ctx.channel()); } @@ -164,7 +168,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E try { if (request != null && request.isOpen() && cause instanceof Exception) { nettyMetrics.processorExceptionCaughtCount.inc(); - logger.error("Swallowing request {} error on channel {}", request.getUri(), ctx.channel(), cause); + classifyAndCountException(cause); + logger.error("Swallowing request {} error on channel {}. ChannelActive: {}, exception: {} - {}", + request.getUri(), ctx.channel(), ctx.channel().isActive(), cause.getClass().getSimpleName(), + cause.getMessage(), cause); onRequestAborted((Exception) cause); } else if (isOpen()) { if (cause instanceof RestServiceException) { @@ -201,6 +208,27 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } } + /** + * Classifies the exception and increments counters for common network-termination failure categories. + * @param cause the exception to classify. + */ + private void classifyAndCountException(Throwable cause) { + if (cause instanceof ClosedChannelException) { + nettyMetrics.exceptionCaughtClosedChannelCount.inc(); + } else if (cause instanceof SSLException) { + nettyMetrics.exceptionCaughtSSLExceptionCount.inc(); + } else if (cause instanceof IOException) { + String message = cause.getMessage(); + if (message != null && message.contains("Connection reset")) { + nettyMetrics.exceptionCaughtConnectionResetCount.inc(); + } else if (message != null && message.contains("Broken pipe")) { + nettyMetrics.exceptionCaughtBrokenPipeCount.inc(); + } else { + nettyMetrics.exceptionCaughtOtherIOExceptionCount.inc(); + } + } + } + /** * Netty calls this function when events that we have registered for, occur (in this case we are specifically waiting * for {@link IdleStateEvent} so that we close connections that have been idle too long - maybe due to client failure) diff --git a/ambry-rest/src/main/java/com/github/ambry/rest/NettyMetrics.java b/ambry-rest/src/main/java/com/github/ambry/rest/NettyMetrics.java index 670e331e2d..e472654162 100644 --- a/ambry-rest/src/main/java/com/github/ambry/rest/NettyMetrics.java +++ b/ambry-rest/src/main/java/com/github/ambry/rest/NettyMetrics.java @@ -106,8 +106,15 @@ public class NettyMetrics { // NettyMessageProcessor public final Histogram channelReadIntervalInMs; public final Counter idleConnectionCloseCount; + public final Counter channelInactiveWithActiveRequestCount; + public final Counter channelInactiveWithoutActiveRequestCount; public final Counter processorErrorAfterCloseCount; public final Counter processorExceptionCaughtCount; + public final Counter exceptionCaughtConnectionResetCount; + public final Counter exceptionCaughtBrokenPipeCount; + public final Counter exceptionCaughtClosedChannelCount; + public final Counter exceptionCaughtSSLExceptionCount; + public final Counter exceptionCaughtOtherIOExceptionCount; public final Counter processorIOExceptionCount; public final Counter processorRestServiceExceptionCount; public final Counter processorThrowableCount; @@ -115,6 +122,10 @@ public class NettyMetrics { // NettyResponseChannel public final Counter clientEarlyTerminationCount; + public final Counter clientTerminationOnActiveChannelCount; + public final Counter clientTerminationOnInactiveChannelCount; + public final Counter errorResponseSentCount; + public final Counter errorResponseNotSentCount; public final Counter acceptedCount; public final Counter createdCount; public final Counter okCount; @@ -281,10 +292,24 @@ public NettyMetrics(MetricRegistry metricRegistry) { metricRegistry.histogram(MetricRegistry.name(NettyMessageProcessor.class, "ChannelReadIntervalInMs")); idleConnectionCloseCount = metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "IdleConnectionCloseCount")); + channelInactiveWithActiveRequestCount = + metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ChannelInactiveWithActiveRequestCount")); + channelInactiveWithoutActiveRequestCount = metricRegistry.counter( + MetricRegistry.name(NettyMessageProcessor.class, "ChannelInactiveWithoutActiveRequestCount")); processorErrorAfterCloseCount = metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ErrorAfterCloseCount")); processorExceptionCaughtCount = metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtCount")); + exceptionCaughtConnectionResetCount = + metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtConnectionResetCount")); + exceptionCaughtBrokenPipeCount = + metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtBrokenPipeCount")); + exceptionCaughtClosedChannelCount = + metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtClosedChannelCount")); + exceptionCaughtSSLExceptionCount = + metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtSSLExceptionCount")); + exceptionCaughtOtherIOExceptionCount = metricRegistry.counter( + MetricRegistry.name(NettyMessageProcessor.class, "ExceptionCaughtOtherIOExceptionCount")); processorIOExceptionCount = metricRegistry.counter(MetricRegistry.name(NettyMessageProcessor.class, "IOExceptionCount")); processorRestServiceExceptionCount = @@ -296,6 +321,14 @@ public NettyMetrics(MetricRegistry metricRegistry) { // NettyResponseChannel clientEarlyTerminationCount = metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ClientEarlyTerminationCount")); + clientTerminationOnActiveChannelCount = + metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ClientTerminationOnActiveChannelCount")); + clientTerminationOnInactiveChannelCount = metricRegistry.counter( + MetricRegistry.name(NettyResponseChannel.class, "ClientTerminationOnInactiveChannelCount")); + errorResponseSentCount = + metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ErrorResponseSentCount")); + errorResponseNotSentCount = + metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "ErrorResponseNotSentCount")); acceptedCount = metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "AcceptedCount")); createdCount = metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "CreatedCount")); okCount = metricRegistry.counter(MetricRegistry.name(NettyResponseChannel.class, "OkCount")); diff --git a/ambry-rest/src/main/java/com/github/ambry/rest/NettyResponseChannel.java b/ambry-rest/src/main/java/com/github/ambry/rest/NettyResponseChannel.java index fdb58f6763..f712ed5beb 100644 --- a/ambry-rest/src/main/java/com/github/ambry/rest/NettyResponseChannel.java +++ b/ambry-rest/src/main/java/com/github/ambry/rest/NettyResponseChannel.java @@ -548,15 +548,28 @@ private boolean maybeSendErrorResponse(Exception exception) { long processingStartTime = System.currentTimeMillis(); boolean responseSent = false; logger.trace("Sending error response to client on channel {}", ctx.channel()); + // If this is a likely client disconnect and the channel is already inactive, classify it as 4xx + // for accounting purposes but skip any response write attempt. + if (Utils.isPossibleClientTermination(exception) && !ctx.channel().isActive()) { + nettyMetrics.clientEarlyTerminationCount.inc(); + nettyMetrics.clientTerminationOnInactiveChannelCount.inc(); + errorResponseStatus = ResponseStatus.BadRequest; + responseStatus = errorResponseStatus; + logger.debug("Skipping error response write for client termination on inactive channel {}", ctx.channel()); + nettyMetrics.errorResponseNotSentCount.inc(); + return false; + } FullHttpResponse errorResponse = getErrorResponse(exception); if (maybeWriteResponseMetadata(errorResponse, new ErrorResponseWriteListener())) { logger.trace("Scheduled error response sending on channel {}", ctx.channel()); responseStatus = errorResponseStatus; responseSent = true; + nettyMetrics.errorResponseSentCount.inc(); long processingTime = System.currentTimeMillis() - processingStartTime; nettyMetrics.errorResponseProcessingTimeInMs.update(processingTime); } else { logger.error("Could not send error response on channel {}", ctx.channel()); + nettyMetrics.errorResponseNotSentCount.inc(); } return responseSent; } @@ -577,20 +590,28 @@ private FullHttpResponse getErrorResponse(Throwable cause) { errorResponseStatus = ResponseStatus.getResponseStatus(restServiceErrorCode); status = getHttpResponseStatus(errorResponseStatus); if (shouldSendFailureReason(status, restServiceException)) { + Throwable rootCause = Utils.getRootCause(cause); + String rootMessage = rootCause.getMessage(); + if (rootMessage == null) { + rootMessage = rootCause.getClass().getSimpleName(); + } errReason = new String( - Utils.getRootCause(cause).getMessage().replaceAll("[\n\t\r]", " ").getBytes(StandardCharsets.US_ASCII), + rootMessage.replaceAll("[\n\t\r]", " ").getBytes(StandardCharsets.US_ASCII), StandardCharsets.US_ASCII); } if (restServiceException.shouldIncludeExceptionMetadataInResponse()) { errHeaders = restServiceException.getExceptionHeadersMap(); } } else if (Utils.isPossibleClientTermination(cause)) { - // Client closed the connection, it's likely that error response won't be able to reach client. - // If that's the case, then set the status to client error. This would then be recorded as client error - // in ContainerMetrics nettyMetrics.clientEarlyTerminationCount.inc(); - status = HttpResponseStatus.BAD_REQUEST; - errorResponseStatus = ResponseStatus.BadRequest; + // Inactive-channel accounting-only case is handled in maybeSendErrorResponse(). + // Any path reaching response construction should use 500 to avoid emitting 4xx to a still-connected client. + nettyMetrics.clientTerminationOnActiveChannelCount.inc(); + logger.warn("Client termination detected on ACTIVE channel {} for request {}. Exception: {}", ctx.channel(), + request != null ? request.getUri() : "unknown", cause.getMessage()); + nettyMetrics.internalServerErrorCount.inc(); + status = HttpResponseStatus.INTERNAL_SERVER_ERROR; + errorResponseStatus = ResponseStatus.InternalServerError; } else { nettyMetrics.internalServerErrorCount.inc(); status = HttpResponseStatus.INTERNAL_SERVER_ERROR; @@ -846,8 +867,8 @@ private void log(Exception exception) { logger.trace("Error handling request {} with method {}", uri, restMethod, exception); } } else if (Utils.isPossibleClientTermination(exception)) { - logger.trace("Client likely terminated connection while handling request {} with method {}", uri, restMethod, - exception); + logger.debug("Client likely terminated connection while handling request {} with method {}. ChannelActive: {}", + uri, restMethod, ctx.channel().isActive(), exception); } else { logger.error("Unexpected error handling request {} with method {}", uri, restMethod, exception); } diff --git a/ambry-rest/src/test/java/com/github/ambry/rest/NettyResponseChannelTest.java b/ambry-rest/src/test/java/com/github/ambry/rest/NettyResponseChannelTest.java index 86780ef4d9..4536e26fd4 100644 --- a/ambry-rest/src/test/java/com/github/ambry/rest/NettyResponseChannelTest.java +++ b/ambry-rest/src/test/java/com/github/ambry/rest/NettyResponseChannelTest.java @@ -13,6 +13,7 @@ */ package com.github.ambry.rest; +import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; import com.github.ambry.commons.Callback; import com.github.ambry.config.NettyConfig; @@ -75,6 +76,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Before; @@ -600,6 +602,13 @@ public void errorResponseTrackingHeadersTest() { channel.writeInbound(RestTestUtils.createRequest(HttpMethod.HEAD, uri.toString(), httpHeaders)); response = channel.readOutbound(); verifyTrackingHeaders(response, shouldTrackingHeadersExist); + boolean shouldBeAlive = !NettyResponseChannel.CLOSE_CONNECTION_ERROR_STATUSES.contains(response.status()); + assertEquals("Channel state (open/close) not as expected", shouldBeAlive, channel.isActive()); + assertEquals("Connection header should be consistent with channel state", shouldBeAlive, + HttpUtil.isKeepAlive(response)); + if (!shouldBeAlive) { + channel = createEmbeddedChannel(); + } } httpHeaders = new DefaultHttpHeaders(); if (shouldTrackingHeadersExist) { @@ -610,7 +619,15 @@ public void errorResponseTrackingHeadersTest() { channel.writeInbound(RestTestUtils.createRequest(HttpMethod.HEAD, uri.toString(), httpHeaders)); response = channel.readOutbound(); verifyTrackingHeaders(response, shouldTrackingHeadersExist); + boolean shouldBeAlive = !NettyResponseChannel.CLOSE_CONNECTION_ERROR_STATUSES.contains(response.status()); + assertEquals("Channel state (open/close) not as expected", shouldBeAlive, channel.isActive()); + assertEquals("Connection header should be consistent with channel state", shouldBeAlive, + HttpUtil.isKeepAlive(response)); + if (!shouldBeAlive) { + channel = createEmbeddedChannel(); + } } + channel.close(); } /** @@ -635,7 +652,7 @@ public void keepAliveTest() { } /** - * Tests that client initiated terminations don't count towards {@link HttpResponseStatus#INTERNAL_SERVER_ERROR}. + * Tests that client termination on an active channel is treated as a retryable server error. */ @Test public void clientEarlyTerminationTest() throws Exception { @@ -644,25 +661,125 @@ public void clientEarlyTerminationTest() throws Exception { HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, uri.toString(), null); HttpUtil.setKeepAlive(httpRequest, false); - String brMetricName = MetricRegistry.name(NettyResponseChannel.class, "BadRequestCount"); - long brBeforeCount = MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(brMetricName).getCount(); String cetMetricName = MetricRegistry.name(NettyResponseChannel.class, "ClientEarlyTerminationCount"); long cetBeforeCount = MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(cetMetricName).getCount(); + String activeMetricName = MetricRegistry.name(NettyResponseChannel.class, "ClientTerminationOnActiveChannelCount"); + long activeBeforeCount = MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(activeMetricName).getCount(); channel.writeInbound(httpRequest); // first outbound has to be response. HttpResponse response = channel.readOutbound(); - assertEquals("Unexpected response status", HttpResponseStatus.BAD_REQUEST, response.status()); + assertEquals("Unexpected response status", HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status()); if (!(response instanceof FullHttpResponse)) { // empty the channel while (channel.readOutbound() != null) { } } - assertEquals("Client terminations should not count towards Bad request count", brBeforeCount, - MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(brMetricName).getCount()); assertEquals("Client terminations should have been tracked", cetBeforeCount + 1, MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(cetMetricName).getCount()); + assertEquals("Client terminations on active channels should have been tracked", activeBeforeCount + 1, + MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(activeMetricName).getCount()); + } + + /** + * Tests that client termination on an inactive channel is classified as bad request and not sent. + */ + @Test + public void clientTerminationOnInactiveChannelTest() { + ChunkedWriteHandler chunkedWriteHandler = new ChunkedWriteHandler(); + EmbeddedChannel channel = new EmbeddedChannel(chunkedWriteHandler); + VerifiableProperties verifiableProperties = new VerifiableProperties(new Properties()); + MetricRegistry metricRegistry = new MetricRegistry(); + NettyMetrics nettyMetrics = new NettyMetrics(metricRegistry); + NettyResponseChannel nettyResponseChannel = + new NettyResponseChannel(new MockChannelHandlerContext(channel), nettyMetrics, + new PerformanceConfig(verifiableProperties), new NettyConfig(verifiableProperties)); + + channel.disconnect().awaitUninterruptibly(); + assertFalse("Channel should be inactive", channel.isActive()); + + long cetBefore = nettyMetrics.clientEarlyTerminationCount.getCount(); + long activeBefore = nettyMetrics.clientTerminationOnActiveChannelCount.getCount(); + long inactiveBefore = nettyMetrics.clientTerminationOnInactiveChannelCount.getCount(); + long notSentBefore = nettyMetrics.errorResponseNotSentCount.getCount(); + + nettyResponseChannel.onResponseComplete(Utils.convertToClientTerminationException(new Exception())); + + assertEquals("Client termination should be tracked", cetBefore + 1, + nettyMetrics.clientEarlyTerminationCount.getCount()); + assertEquals("Client termination on active channel should not be tracked", activeBefore, + nettyMetrics.clientTerminationOnActiveChannelCount.getCount()); + assertEquals("Client termination on inactive channel should be tracked", inactiveBefore + 1, + nettyMetrics.clientTerminationOnInactiveChannelCount.getCount()); + assertEquals("Dropped error response should be tracked", notSentBefore + 1, + nettyMetrics.errorResponseNotSentCount.getCount()); + } + + /** + * Tests that sent error responses are tracked. + */ + @Test + public void errorResponseMetricsTest() { + EmbeddedChannel channel = createEmbeddedChannel(); + HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.GET, + TestingUri.OnResponseCompleteWithNonRestException.toString(), null); + String sentMetricName = MetricRegistry.name(NettyResponseChannel.class, "ErrorResponseSentCount"); + long sentBefore = MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(sentMetricName).getCount(); + + channel.writeInbound(httpRequest); + HttpResponse response = channel.readOutbound(); + assertEquals("Unexpected response status", HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status()); + assertEquals("Sent error response should be tracked", sentBefore + 1, + MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(sentMetricName).getCount()); + while (channel.readOutbound() != null) { + } + } + + /** + * Tests error response delivery behavior matrix for 4xx/5xx on active/inactive channels. + */ + @Test + public void errorResponseDeliveryMatrixTest() throws Exception { + // Active channel + 5xx => response should be sent. + EmbeddedChannel activeChannel = createEmbeddedChannel(); + String sentMetricName = MetricRegistry.name(NettyResponseChannel.class, "ErrorResponseSentCount"); + long sentBefore = MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(sentMetricName).getCount(); + HttpRequest internalErrorRequest = + RestTestUtils.createRequest(HttpMethod.GET, TestingUri.OnResponseCompleteWithNonRestException.toString(), null); + activeChannel.writeInbound(internalErrorRequest); + HttpResponse response = activeChannel.readOutbound(); + assertEquals("Unexpected response status for active 5xx", HttpResponseStatus.INTERNAL_SERVER_ERROR, + response.status()); + assertEquals("Sent error response should be tracked for active 5xx", sentBefore + 1, + MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(sentMetricName).getCount()); + while (activeChannel.readOutbound() != null) { + } + + // Active channel + 4xx => response should be sent. + HttpHeaders badRequestHeaders = new DefaultHttpHeaders(); + badRequestHeaders.set(MockNettyMessageProcessor.REST_SERVICE_ERROR_CODE_HEADER_NAME, + RestServiceErrorCode.BadRequest); + sentBefore = MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(sentMetricName).getCount(); + HttpRequest badRequest = + RestTestUtils.createRequest(HttpMethod.HEAD, TestingUri.OnResponseCompleteWithRestException.toString(), + badRequestHeaders); + activeChannel.writeInbound(badRequest); + response = activeChannel.readOutbound(); + assertEquals("Unexpected response status for active 4xx", HttpResponseStatus.BAD_REQUEST, response.status()); + assertEquals("Sent error response should be tracked for active 4xx", sentBefore + 1, + MockNettyMessageProcessor.METRIC_REGISTRY.getCounters().get(sentMetricName).getCount()); + while (activeChannel.readOutbound() != null) { + } + activeChannel.close(); + + // Inactive channel + 5xx => response should not be sent. + assertErrorResponseDroppedOnInactiveChannel(new RuntimeException("server error"), + nettyMetrics -> nettyMetrics.internalServerErrorCount); + + // Inactive channel + 4xx => response should not be sent. + assertErrorResponseDroppedOnInactiveChannel(new RestServiceException("bad request", RestServiceErrorCode.BadRequest), + nettyMetrics -> nettyMetrics.badRequestCount); } /** @@ -1084,6 +1201,37 @@ private void verifyTrackingHeaders(HttpResponse response, boolean shouldTracking } } + /** + * Asserts that an error response is not sent when the channel is inactive. + * @param exception the exception to complete the response with. + * @param statusCounterSelector selects the status counter expected to increment (e.g. 4xx/5xx). + */ + private void assertErrorResponseDroppedOnInactiveChannel(Exception exception, + Function statusCounterSelector) { + ChunkedWriteHandler chunkedWriteHandler = new ChunkedWriteHandler(); + EmbeddedChannel channel = new EmbeddedChannel(chunkedWriteHandler); + VerifiableProperties verifiableProperties = new VerifiableProperties(new Properties()); + MetricRegistry metricRegistry = new MetricRegistry(); + NettyMetrics nettyMetrics = new NettyMetrics(metricRegistry); + NettyResponseChannel nettyResponseChannel = + new NettyResponseChannel(new MockChannelHandlerContext(channel), nettyMetrics, + new PerformanceConfig(verifiableProperties), new NettyConfig(verifiableProperties)); + channel.disconnect().awaitUninterruptibly(); + assertFalse("Channel should be inactive", channel.isActive()); + + long sentBefore = nettyMetrics.errorResponseSentCount.getCount(); + long notSentBefore = nettyMetrics.errorResponseNotSentCount.getCount(); + long statusCounterBefore = statusCounterSelector.apply(nettyMetrics).getCount(); + nettyResponseChannel.onResponseComplete(exception); + + assertNull("No response should be sent on inactive channel", channel.readOutbound()); + assertEquals("Error response sent metric mismatch", sentBefore, nettyMetrics.errorResponseSentCount.getCount()); + assertEquals("Error response not sent metric mismatch", notSentBefore + 1, + nettyMetrics.errorResponseNotSentCount.getCount()); + assertEquals("Expected status counter should be incremented", statusCounterBefore + 1, + statusCounterSelector.apply(nettyMetrics).getCount()); + } + // requestPerformanceEvaluationTest() helpers. /** @@ -1439,7 +1587,7 @@ private void handleRequest(HttpRequest httpRequest) throws Exception { break; case OnResponseCompleteWithEarlyClientTermination: restResponseChannel.onResponseComplete(Utils.convertToClientTerminationException(new Exception())); - assertEquals("ResponseStatus does not reflect error", ResponseStatus.BadRequest, + assertEquals("ResponseStatus does not reflect error", ResponseStatus.InternalServerError, restResponseChannel.getStatus()); assertFalse("Request channel is not closed", request.isOpen()); break; @@ -2074,5 +2222,3 @@ public boolean hasAttr(AttributeKey key) { return false; } } - -