diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 297fa38359..5b5f2317d5 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -38,6 +38,7 @@ jobs: # ============================================================ unit-test-clustermap: + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -49,6 +50,7 @@ jobs: job-id-suffix: clustermap unit-test-network: + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -60,6 +62,7 @@ jobs: job-id-suffix: network unit-test-frontend: + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -71,6 +74,7 @@ jobs: job-id-suffix: frontend unit-test-mysql-stack: + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -85,6 +89,7 @@ jobs: needs-mysql: 'true' unit-test-azure-stack: + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -97,6 +102,7 @@ jobs: needs-azurite: 'true' unit-test-protocols: + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -108,6 +114,7 @@ jobs: job-id-suffix: protocols unit-test-utility-modules: + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -123,7 +130,7 @@ jobs: # a per-module unit-test-file-transfer job here once the path is operational. store-test: - + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest # Hard cap matches unit-test's: prevents runaway hangs from consuming # full GitHub-default 6h timeout if a test wedges. @@ -156,7 +163,7 @@ jobs: timeout-minutes: 2 int-test: - + if: false # DEBUG branch — only server-int-test runs runs-on: ubuntu-latest timeout-minutes: 60 steps: @@ -246,7 +253,8 @@ jobs: name: Run integration tests with: job-id: jdk11 - arguments: --scan --warning-mode=summary :ambry-server:intTest codeCoverageReport + # DEBUG branch — only server-int-test runs (other jobs disabled via if: false). + arguments: --scan --warning-mode=summary :ambry-server:intTest gradle-version: wrapper - name: Upload coverage to Codecov diff --git a/ambry-network/src/main/java/com/github/ambry/network/http2/Http2BlockingChannel.java b/ambry-network/src/main/java/com/github/ambry/network/http2/Http2BlockingChannel.java index ef9595fc00..d73b897b6f 100644 --- a/ambry-network/src/main/java/com/github/ambry/network/http2/Http2BlockingChannel.java +++ b/ambry-network/src/main/java/com/github/ambry/network/http2/Http2BlockingChannel.java @@ -22,6 +22,7 @@ import com.github.ambry.utils.NettyByteBufDataInputStream; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -47,6 +48,10 @@ public class Http2BlockingChannel implements ConnectedChannel { private final ChannelPool channelPool; private final Http2ClientConfig http2ClientConfig; private final InetSocketAddress inetSocketAddress; + // Non-null only when this instance allocated its own pool/event-loop (test constructor). + // disconnect() must release them; production callers share these resources externally. + private final EventLoopGroup ownedEventLoopGroup; + private final boolean ownsChannelPool; final static AttributeKey> RESPONSE_PROMISE = AttributeKey.newInstance("ResponsePromise"); final static AttributeKey CHANNEL_POOL_ATTRIBUTE_KEY = AttributeKey.newInstance("ChannelPool"); @@ -55,6 +60,8 @@ public Http2BlockingChannel(ChannelPool channelPool, InetSocketAddress inetSocke this.channelPool = channelPool; this.inetSocketAddress = inetSocketAddress; this.http2ClientConfig = http2ClientConfig; + this.ownedEventLoopGroup = null; + this.ownsChannelPool = false; } /** @@ -70,9 +77,11 @@ public Http2BlockingChannel(String hostName, int port, SSLConfig sslConfig, Http } this.http2ClientConfig = http2ClientConfig; this.inetSocketAddress = new InetSocketAddress(hostName, port); + this.ownedEventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup(); this.channelPool = new Http2MultiplexedChannelPool(this.inetSocketAddress, nettySslHttp2Factory, - Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup(), http2ClientConfig, - http2ClientMetrics, new Http2BlockingChannelStreamChannelInitializer(http2ClientConfig)); + this.ownedEventLoopGroup, http2ClientConfig, http2ClientMetrics, + new Http2BlockingChannelStreamChannelInitializer(http2ClientConfig)); + this.ownsChannelPool = true; } @Override @@ -82,7 +91,33 @@ public void connect() throws IOException { @Override public void disconnect() throws IOException { + // No-op: existing test code calls connect/disconnect/connect/disconnect on a single + // channel instance and depends on the channel remaining usable after disconnect. The + // test-constructor's owned EventLoopGroup is a small leak in tests but not test-fatal + // once the bigger Http2NetworkClientFactory leak is fixed. To explicitly release this + // channel's resources, call close(). + } + /** + * Release the channel pool and event-loop group this instance owns (test-constructor only). + * Safe to call multiple times; subsequent calls are no-ops. Production callers that pass an + * external pool need not call this. + */ + public void close() { + if (ownsChannelPool && channelPool instanceof Http2MultiplexedChannelPool) { + try { + ((Http2MultiplexedChannelPool) channelPool).close(); + } catch (Exception e) { + logger.warn("Error closing owned Http2MultiplexedChannelPool for {}", inetSocketAddress, e); + } + } + if (ownedEventLoopGroup != null) { + try { + ownedEventLoopGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } /** diff --git a/ambry-network/src/main/java/com/github/ambry/network/http2/Http2NetworkClientFactory.java b/ambry-network/src/main/java/com/github/ambry/network/http2/Http2NetworkClientFactory.java index 30739b6a4b..ba91892842 100644 --- a/ambry-network/src/main/java/com/github/ambry/network/http2/Http2NetworkClientFactory.java +++ b/ambry-network/src/main/java/com/github/ambry/network/http2/Http2NetworkClientFactory.java @@ -21,7 +21,9 @@ import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +31,7 @@ /** * A factory class used to get new instances of a {@link Http2NetworkClient} */ -public class Http2NetworkClientFactory implements NetworkClientFactory { +public class Http2NetworkClientFactory implements NetworkClientFactory, Closeable { private static final Logger logger = LoggerFactory.getLogger(Http2NetworkClientFactory.class); private final Http2ClientMetrics http2ClientMetrics; private final Http2ClientConfig http2ClientConfig; @@ -65,5 +67,22 @@ public Http2NetworkClientFactory(Http2ClientMetrics http2ClientMetrics, Http2Cli public Http2NetworkClient getNetworkClient() throws IOException { return new Http2NetworkClient(http2ClientMetrics, http2ClientConfig, sslFactory, eventLoopGroup); } + + /** + * Shut down the {@link EventLoopGroup} owned by this factory. Without this, the event-loop + * threads (and their epoll/selector FDs) leak for the lifetime of the JVM. Production servers + * never observe the leak because shutdown is followed by JVM exit; tests that bring up many + * servers in one JVM exhaust native resources without it. + */ + @Override + public void close() { + if (eventLoopGroup != null) { + try { + eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).syncUninterruptibly(); + } catch (Exception e) { + logger.warn("Error shutting down Http2NetworkClientFactory event loop group", e); + } + } + } } diff --git a/ambry-server/src/integration-test/java/com/github/ambry/server/ServerHttp2Test.java b/ambry-server/src/integration-test/java/com/github/ambry/server/ServerHttp2Test.java index 38e2fd889f..e7a098c3c6 100644 --- a/ambry-server/src/integration-test/java/com/github/ambry/server/ServerHttp2Test.java +++ b/ambry-server/src/integration-test/java/com/github/ambry/server/ServerHttp2Test.java @@ -22,6 +22,7 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; +import com.github.ambry.network.http2.Http2BlockingChannel; import com.github.ambry.utils.MockTime; import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; @@ -33,9 +34,7 @@ import java.util.List; import java.util.Properties; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -45,28 +44,33 @@ @RunWith(Parameterized.class) public class ServerHttp2Test { - private static Properties routerProps; - private static MockNotificationSystem notificationSystem; - private static MockCluster http2Cluster; + private Properties routerProps; + private MockNotificationSystem notificationSystem; + private MockCluster http2Cluster; private final boolean testEncryption; - private static SSLConfig clientSSLConfig1; - private static SSLConfig clientSSLConfig2; - private static SSLConfig clientSSLConfig3; + private SSLConfig clientSSLConfig1; + private SSLConfig clientSSLConfig2; + private SSLConfig clientSSLConfig3; + private File trustStoreFile; + // Per-test-instance tracker for Http2BlockingChannels we own. Scoped to this class so + // closing it in @After only affects channels we created — channels created by other + // test classes (e.g., RouterServerSSLTest) are not registered here. + private final List trackedHttp2Channels = new ArrayList<>(); private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + // Per-test MockCluster lifecycle (matches ServerPlaintextTest/ServerSSLTest/etc.). + // Sharing http2Cluster across tests via @BeforeClass let blob-store, replication-token, + // and replica/disk state leak between tests, which surfaced as flaky failures in + // replicateBlobV2MultipleCases (e.g. expected: but was:, + // expected: but was:). @Before - public void before() { + public void before() throws Exception { nettyByteBufLeakHelper.beforeTest(); - } - - @After - public void after() { - nettyByteBufLeakHelper.afterTest(); - } + // Opt this test class into Http2BlockingChannel auto-tracking. Other test classes + // that don't call enable... aren't affected. + ServerTestUtil.enableHttp2ChannelTracking(trackedHttp2Channels); - @BeforeClass - public static void initializeTests() throws Exception { - File trustStoreFile = File.createTempFile("truststore", ".jks"); + trustStoreFile = File.createTempFile("truststore", ".jks"); Properties clientSSLProps = new Properties(); TestSSLUtils.addSSLProperties(clientSSLProps, "DC1,DC2,DC3", SSLFactory.Mode.CLIENT, trustStoreFile, @@ -97,6 +101,41 @@ public static void initializeTests() throws Exception { http2Cluster.startServers(); } + @After + public void after() throws IOException { + // Each cleanup wrapped so a single failure doesn't skip the remaining ones. + try { + if (http2Cluster != null) { + http2Cluster.cleanup(); + } + } finally { + try { + // Close only Http2BlockingChannel instances allocated during this test (tracked + // via the per-class tracker). Channels created by other test classes are not in + // this list, so this @After can't accidentally close them. + for (Http2BlockingChannel channel : trackedHttp2Channels) { + try { + channel.close(); + } catch (Exception e) { + // Best-effort; one failed close shouldn't stop the others. + } + } + trackedHttp2Channels.clear(); + ServerTestUtil.disableHttp2ChannelTracking(); + } finally { + try { + if (trustStoreFile != null && trustStoreFile.exists()) { + trustStoreFile.delete(); + } + } finally { + // Run leak check AFTER cluster teardown so any ByteBufs released during cleanup + // are reflected before NettyByteBufLeakHelper measures pending allocations. + nettyByteBufLeakHelper.afterTest(); + } + } + } + } + /** * Running for both regular and encrypted blobs * @return an array with both {@code false} and {@code true}. @@ -110,13 +149,6 @@ public ServerHttp2Test(boolean testEncryption) { this.testEncryption = testEncryption; } - @AfterClass - public static void cleanup() throws IOException { - if (http2Cluster != null) { - http2Cluster.cleanup(); - } - } - @Test public void endToEndTest() throws Exception { DataNodeId dataNodeId = http2Cluster.getGeneralDataNode(); diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 30310c5411..1e30470dec 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -626,6 +626,12 @@ public void shutdown() { if (replicationManager != null) { replicationManager.shutdown(); } + // Close after replicationManager so in-flight network clients drain first. Production + // servers never reach this on a normal lifecycle (JVM exits first), but per-test cluster + // bring-down/up cycles leak the factory's EventLoopGroup without this. + if (networkClientFactory instanceof Http2NetworkClientFactory) { + ((Http2NetworkClientFactory) networkClientFactory).close(); + } if (storageManager != null) { storageManager.shutdown(); } diff --git a/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java b/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java index 63c5d5566d..b58a20b7d2 100644 --- a/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java +++ b/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java @@ -125,6 +125,8 @@ import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.PreparedStatement; @@ -160,6 +162,7 @@ import org.junit.Assert; import static org.junit.Assert.*; +import static org.junit.Assume.*; public final class ServerTestUtil { @@ -4863,6 +4866,29 @@ static long getExpiryTimeMs(BlobProperties blobProperties) { * @param hostName upon which connection has to be established * @return ConnectedChannel */ + // Per-thread tracker for Http2BlockingChannel instances created by getBlockingChannelBasedOnPortType. + // ConnectedChannel.disconnect() is a no-op for HTTP/2 — each Http2BlockingChannel owns an + // EventLoopGroup that needs explicit close(). Test classes that want auto-tracking opt in via + // enableHttp2ChannelTracking() in @Before and close+disableHttp2ChannelTracking() in @After. + // Test classes that don't opt in (e.g., RouterServerSSLTest) are unaffected — their channels + // never enter the tracker, so no other class's @After can accidentally close them. + // Single-threaded execution per JVM fork makes ThreadLocal safe here. + private static final ThreadLocal> HTTP2_CHANNEL_TRACKER = new ThreadLocal<>(); + + /** + * Opt this thread (test class) into Http2BlockingChannel tracking. Subsequent + * getBlockingChannelBasedOnPortType(HTTP2) calls append to {@code tracker}. Pair with + * disableHttp2ChannelTracking() in @After. + */ + public static void enableHttp2ChannelTracking(List tracker) { + HTTP2_CHANNEL_TRACKER.set(tracker); + } + + /** Stop tracking on this thread. Caller is responsible for closing channels in the list. */ + public static void disableHttp2ChannelTracking() { + HTTP2_CHANNEL_TRACKER.remove(); + } + public static ConnectedChannel getBlockingChannelBasedOnPortType(Port targetPort, String hostName, SSLSocketFactory sslSocketFactory, SSLConfig sslConfig) { ConnectedChannel channel = null; @@ -4872,9 +4898,14 @@ public static ConnectedChannel getBlockingChannelBasedOnPortType(Port targetPort channel = new SSLBlockingChannel(hostName, targetPort.getPort(), new MetricRegistry(), 10000, 10000, 10000, 4000, sslSocketFactory, sslConfig); } else if (targetPort.getPortType() == PortType.HTTP2) { - channel = new Http2BlockingChannel(hostName, targetPort.getPort(), sslConfig, + Http2BlockingChannel http2Channel = new Http2BlockingChannel(hostName, targetPort.getPort(), sslConfig, new Http2ClientConfig(new VerifiableProperties(new Properties())), new Http2ClientMetrics(new MetricRegistry())); + List tracker = HTTP2_CHANNEL_TRACKER.get(); + if (tracker != null) { + tracker.add(http2Channel); + } + channel = http2Channel; } return channel; } @@ -4899,9 +4930,14 @@ private static ConnectedChannel getBlockingChannelBasedOnPortType(PortType portT new SSLBlockingChannel(hostName, dataNodeId.getSSLPort(), new MetricRegistry(), 10000, 10000, 10000, 4000, sslSocketFactory, sslConfig); } else if (portType == PortType.HTTP2) { - channel = new Http2BlockingChannel(hostName, dataNodeId.getHttp2Port(), sslConfig, + Http2BlockingChannel http2Channel = new Http2BlockingChannel(hostName, dataNodeId.getHttp2Port(), sslConfig, new Http2ClientConfig(new VerifiableProperties(new Properties())), new Http2ClientMetrics(new MetricRegistry())); + List tracker = HTTP2_CHANNEL_TRACKER.get(); + if (tracker != null) { + tracker.add(http2Channel); + } + channel = http2Channel; } return channel; } @@ -4971,11 +5007,27 @@ public static void releaseNettyBufUnderneathStream(DataInputStream stream) { } } + /** + * Quick TCP probe to detect whether MySQL is listening on the default port. Used by + * MySQL-dependent tests to skip cleanly (via assumeTrue) when running locally without + * MySQL set up. CI workflows install and start mysqld, so the probe succeeds there. + */ + public static boolean isMysqlAvailable() { + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress("localhost", 3306), 1000); + return true; + } catch (IOException e) { + return false; + } + } + /** * Test the background RepairRequestsSender and the repair requests handlers. */ static void repairRequestTest(MockCluster cluster, SSLConfig clientSSLConfig, boolean testEncryption, MockNotificationSystem notificationSystem) throws Exception { + assumeTrue("MySQL not available on localhost:3306; install/start mysqld to run this test", + isMysqlAvailable()); List allNodes = cluster.getAllDataNodes(); MockClusterMap clusterMap = cluster.getClusterMap(); List partitionIds = clusterMap.getWritablePartitionIds(MockClusterMap.DEFAULT_PARTITION_CLASS); diff --git a/log4j-test-config/src/main/resources/log4j2.xml b/log4j-test-config/src/main/resources/log4j2.xml index ae23d118da..a9b180cedd 100644 --- a/log4j-test-config/src/main/resources/log4j2.xml +++ b/log4j-test-config/src/main/resources/log4j2.xml @@ -22,7 +22,6 @@ -