Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
# ============================================================

unit-test-clustermap:
if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand All @@ -49,6 +50,7 @@ jobs:
job-id-suffix: clustermap

unit-test-network:
if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand All @@ -60,6 +62,7 @@ jobs:
job-id-suffix: network

unit-test-frontend:
if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand All @@ -71,6 +74,7 @@ jobs:
job-id-suffix: frontend

unit-test-mysql-stack:
if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand All @@ -85,6 +89,7 @@ jobs:
needs-mysql: 'true'

unit-test-azure-stack:
if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand All @@ -97,6 +102,7 @@ jobs:
needs-azurite: 'true'

unit-test-protocols:
if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand All @@ -108,6 +114,7 @@ jobs:
job-id-suffix: protocols

unit-test-utility-modules:
if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand All @@ -123,7 +130,7 @@ jobs:
# a per-module unit-test-file-transfer job here once the path is operational.

store-test:

if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
# Hard cap matches unit-test's: prevents runaway hangs from consuming
# full GitHub-default 6h timeout if a test wedges.
Expand Down Expand Up @@ -156,7 +163,7 @@ jobs:
timeout-minutes: 2

int-test:

if: false # DEBUG branch — only server-int-test runs
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
Expand Down Expand Up @@ -246,7 +253,8 @@ jobs:
name: Run integration tests
with:
job-id: jdk11
arguments: --scan --warning-mode=summary :ambry-server:intTest codeCoverageReport
# DEBUG branch — only server-int-test runs (other jobs disabled via if: false).
arguments: --scan --warning-mode=summary :ambry-server:intTest
gradle-version: wrapper

- name: Upload coverage to Codecov
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.ambry.utils.NettyByteBufDataInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
Expand All @@ -47,6 +48,10 @@ public class Http2BlockingChannel implements ConnectedChannel {
private final ChannelPool channelPool;
private final Http2ClientConfig http2ClientConfig;
private final InetSocketAddress inetSocketAddress;
// Non-null only when this instance allocated its own pool/event-loop (test constructor).
// disconnect() must release them; production callers share these resources externally.
private final EventLoopGroup ownedEventLoopGroup;
private final boolean ownsChannelPool;
final static AttributeKey<CompletableFuture<ByteBuf>> RESPONSE_PROMISE = AttributeKey.newInstance("ResponsePromise");
final static AttributeKey<ChannelPool> CHANNEL_POOL_ATTRIBUTE_KEY = AttributeKey.newInstance("ChannelPool");

Expand All @@ -55,6 +60,8 @@ public Http2BlockingChannel(ChannelPool channelPool, InetSocketAddress inetSocke
this.channelPool = channelPool;
this.inetSocketAddress = inetSocketAddress;
this.http2ClientConfig = http2ClientConfig;
this.ownedEventLoopGroup = null;
this.ownsChannelPool = false;
}

/**
Expand All @@ -70,9 +77,11 @@ public Http2BlockingChannel(String hostName, int port, SSLConfig sslConfig, Http
}
this.http2ClientConfig = http2ClientConfig;
this.inetSocketAddress = new InetSocketAddress(hostName, port);
this.ownedEventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
this.channelPool = new Http2MultiplexedChannelPool(this.inetSocketAddress, nettySslHttp2Factory,
Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup(), http2ClientConfig,
http2ClientMetrics, new Http2BlockingChannelStreamChannelInitializer(http2ClientConfig));
this.ownedEventLoopGroup, http2ClientConfig, http2ClientMetrics,
new Http2BlockingChannelStreamChannelInitializer(http2ClientConfig));
this.ownsChannelPool = true;
}

@Override
Expand All @@ -82,7 +91,33 @@ public void connect() throws IOException {

@Override
public void disconnect() throws IOException {
// No-op: existing test code calls connect/disconnect/connect/disconnect on a single
// channel instance and depends on the channel remaining usable after disconnect. The
// test-constructor's owned EventLoopGroup is a small leak in tests but not test-fatal
// once the bigger Http2NetworkClientFactory leak is fixed. To explicitly release this
// channel's resources, call close().
}

/**
* Release the channel pool and event-loop group this instance owns (test-constructor only).
* Safe to call multiple times; subsequent calls are no-ops. Production callers that pass an
* external pool need not call this.
*/
public void close() {
if (ownsChannelPool && channelPool instanceof Http2MultiplexedChannelPool) {
try {
((Http2MultiplexedChannelPool) channelPool).close();
} catch (Exception e) {
logger.warn("Error closing owned Http2MultiplexedChannelPool for {}", inetSocketAddress, e);
}
}
if (ownedEventLoopGroup != null) {
try {
ownedEventLoopGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A factory class used to get new instances of a {@link Http2NetworkClient}
*/
public class Http2NetworkClientFactory implements NetworkClientFactory {
public class Http2NetworkClientFactory implements NetworkClientFactory, Closeable {
private static final Logger logger = LoggerFactory.getLogger(Http2NetworkClientFactory.class);
private final Http2ClientMetrics http2ClientMetrics;
private final Http2ClientConfig http2ClientConfig;
Expand Down Expand Up @@ -65,5 +67,22 @@ public Http2NetworkClientFactory(Http2ClientMetrics http2ClientMetrics, Http2Cli
public Http2NetworkClient getNetworkClient() throws IOException {
return new Http2NetworkClient(http2ClientMetrics, http2ClientConfig, sslFactory, eventLoopGroup);
}

/**
* Shut down the {@link EventLoopGroup} owned by this factory. Without this, the event-loop
* threads (and their epoll/selector FDs) leak for the lifetime of the JVM. Production servers
* never observe the leak because shutdown is followed by JVM exit; tests that bring up many
* servers in one JVM exhaust native resources without it.
*/
@Override
public void close() {
if (eventLoopGroup != null) {
try {
eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).syncUninterruptibly();
} catch (Exception e) {
logger.warn("Error shutting down Http2NetworkClientFactory event loop group", e);
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.network.Port;
import com.github.ambry.network.PortType;
import com.github.ambry.network.http2.Http2BlockingChannel;
import com.github.ambry.utils.MockTime;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.SystemTime;
Expand All @@ -33,9 +34,7 @@
import java.util.List;
import java.util.Properties;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -45,28 +44,33 @@

@RunWith(Parameterized.class)
public class ServerHttp2Test {
private static Properties routerProps;
private static MockNotificationSystem notificationSystem;
private static MockCluster http2Cluster;
private Properties routerProps;
private MockNotificationSystem notificationSystem;
private MockCluster http2Cluster;
private final boolean testEncryption;
private static SSLConfig clientSSLConfig1;
private static SSLConfig clientSSLConfig2;
private static SSLConfig clientSSLConfig3;
private SSLConfig clientSSLConfig1;
private SSLConfig clientSSLConfig2;
private SSLConfig clientSSLConfig3;
private File trustStoreFile;
// Per-test-instance tracker for Http2BlockingChannels we own. Scoped to this class so
// closing it in @After only affects channels we created — channels created by other
// test classes (e.g., RouterServerSSLTest) are not registered here.
private final List<Http2BlockingChannel> trackedHttp2Channels = new ArrayList<>();
private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper();

// Per-test MockCluster lifecycle (matches ServerPlaintextTest/ServerSSLTest/etc.).
// Sharing http2Cluster across tests via @BeforeClass let blob-store, replication-token,
// and replica/disk state leak between tests, which surfaced as flaky failures in
// replicateBlobV2MultipleCases (e.g. expected:<BlobNotFound> but was:<NoError>,
// expected:<NoError> but was:<ReplicaUnavailable>).
@Before
public void before() {
public void before() throws Exception {
nettyByteBufLeakHelper.beforeTest();
}

@After
public void after() {
nettyByteBufLeakHelper.afterTest();
}
// Opt this test class into Http2BlockingChannel auto-tracking. Other test classes
// that don't call enable... aren't affected.
ServerTestUtil.enableHttp2ChannelTracking(trackedHttp2Channels);

@BeforeClass
public static void initializeTests() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks");
trustStoreFile = File.createTempFile("truststore", ".jks");

Properties clientSSLProps = new Properties();
TestSSLUtils.addSSLProperties(clientSSLProps, "DC1,DC2,DC3", SSLFactory.Mode.CLIENT, trustStoreFile,
Expand Down Expand Up @@ -97,6 +101,41 @@ public static void initializeTests() throws Exception {
http2Cluster.startServers();
}

@After
public void after() throws IOException {
// Each cleanup wrapped so a single failure doesn't skip the remaining ones.
try {
if (http2Cluster != null) {
http2Cluster.cleanup();
}
} finally {
try {
// Close only Http2BlockingChannel instances allocated during this test (tracked
// via the per-class tracker). Channels created by other test classes are not in
// this list, so this @After can't accidentally close them.
for (Http2BlockingChannel channel : trackedHttp2Channels) {
try {
channel.close();
} catch (Exception e) {
// Best-effort; one failed close shouldn't stop the others.
}
}
trackedHttp2Channels.clear();
ServerTestUtil.disableHttp2ChannelTracking();
} finally {
try {
if (trustStoreFile != null && trustStoreFile.exists()) {
trustStoreFile.delete();
}
} finally {
// Run leak check AFTER cluster teardown so any ByteBufs released during cleanup
// are reflected before NettyByteBufLeakHelper measures pending allocations.
nettyByteBufLeakHelper.afterTest();
}
}
}
}

/**
* Running for both regular and encrypted blobs
* @return an array with both {@code false} and {@code true}.
Expand All @@ -110,13 +149,6 @@ public ServerHttp2Test(boolean testEncryption) {
this.testEncryption = testEncryption;
}

@AfterClass
public static void cleanup() throws IOException {
if (http2Cluster != null) {
http2Cluster.cleanup();
}
}

@Test
public void endToEndTest() throws Exception {
DataNodeId dataNodeId = http2Cluster.getGeneralDataNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,12 @@ public void shutdown() {
if (replicationManager != null) {
replicationManager.shutdown();
}
// Close after replicationManager so in-flight network clients drain first. Production
// servers never reach this on a normal lifecycle (JVM exits first), but per-test cluster
// bring-down/up cycles leak the factory's EventLoopGroup without this.
if (networkClientFactory instanceof Http2NetworkClientFactory) {
((Http2NetworkClientFactory) networkClientFactory).close();
}
if (storageManager != null) {
storageManager.shutdown();
}
Expand Down
Loading
Loading