Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,6 @@ protected List<RegionPlan> balanceTable(TableName tableName,
rackManager, regionCacheRatioOnOldServerMap);

long startTime = EnvironmentEdgeManager.currentTime();
cluster.setStopRequestedAt(startTime + maxRunningTime);

initCosts(cluster);
balancerConditionals.loadClusterState(cluster);
Expand Down Expand Up @@ -632,6 +631,10 @@ protected List<RegionPlan> balanceTable(TableName tableName,
currentCost / sumMultiplier, functionCost(), computedMaxSteps);

final String initFunctionTotalCosts = totalCostsPerFunc();
long searchStartTime = EnvironmentEdgeManager.currentTime();
// Budget maxRunningTime for the stochastic walk only; initialization (cluster costs, etc.)
// can be substantial on busy hosts and must not consume the search deadline.
cluster.setStopRequestedAt(searchStartTime + maxRunningTime);
// Perform a stochastic walk to see if we can get a good fit.
long step;
boolean planImprovedConditionals = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class HBaseJupiterExtension implements InvocationInterceptor, BeforeAllCa

private static final Map<String, Duration> TAG_TO_TIMEOUT =
ImmutableMap.of(SmallTests.TAG, Duration.ofMinutes(3), MediumTests.TAG, Duration.ofMinutes(6),
LargeTests.TAG, Duration.ofMinutes(13), IntegrationTests.TAG, Duration.ZERO);
LargeTests.TAG, Duration.ofMinutes(20), IntegrationTests.TAG, Duration.ZERO);

private static final String EXECUTOR = "executor";

Expand Down
5 changes: 5 additions & 0 deletions hbase-compression/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<artifactId>hbase-resource-bundle</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,17 +435,26 @@ private void createSubDirAndSystemProperty(String propertyName, Path parent, Str
String sysValue = System.getProperty(propertyName);

if (sysValue != null) {
// There is already a value set. So we do nothing but hope
// that there will be no conflicts
LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
+ " so I do NOT create it in " + parent);
String confValue = conf.get(propertyName);
if (confValue != null && !confValue.endsWith(sysValue)) {
LOG.warn(propertyName + " property value differs in configuration and system: "
+ "Configuration=" + confValue + " while System=" + sysValue
+ " Erasing configuration value by system value.");
// There is already a value set by a previous test. Check if the directory still exists.
File sysDir = new File(sysValue);
if (sysDir.exists()) {
// Directory exists, so another test may still be using it. Reuse it to avoid conflicts.
LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
+ " and directory exists, so reusing it for " + parent);
String confValue = conf.get(propertyName);
if (confValue != null && !confValue.endsWith(sysValue)) {
LOG.warn(propertyName + " property value differs in configuration and system: "
+ "Configuration=" + confValue + " while System=" + sysValue
+ " Erasing configuration value by system value.");
}
conf.set(propertyName, sysValue);
} else {
// Directory was deleted (previous test cleaned up), so create our own.
LOG.info("System.getProperty(\"" + propertyName + "\") set to: " + sysValue
+ " but directory no longer exists. Creating new directory under " + parent);
createSubDir(propertyName, parent, subDirName);
System.setProperty(propertyName, conf.get(propertyName));
}
conf.set(propertyName, sysValue);
} else {
// Ok, it's not set, so we create it as a subdirectory
createSubDir(propertyName, parent, subDirName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public static void setUpBeforeClass() throws Exception {
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockLoadBalancer.class,
LoadBalancer.class);
TEST_UTIL.startMiniDFSCluster(2);
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -328,7 +327,8 @@ public void testPrefetchRunTriggersEvictions() throws Exception {
conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);
Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
// Use 15000 KVs to ensure file reliably exceeds 1MB cache capacity even with size variance
Path storeFile = writeStoreFile("testPrefetchRunTriggersEvictions", 15000);
// Prefetches the file blocks
createReaderAndWaitForPrefetchInterruption(storeFile);
Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
Expand All @@ -343,14 +343,16 @@ public void testPrefetchRunTriggersEvictions() throws Exception {
}
return true;
});
if (bc.getStats().getFailedInserts() == 0) {
// With no wait time configuration, prefetch should trigger evictions once it reaches
// cache capacity
assertNotEquals(0, bc.getStats().getEvictedCount());
} else {
LOG.info("We had {} cache insert failures, which may cause cache usage "
+ "to never reach capacity.", bc.getStats().getFailedInserts());
}
// With no wait time configuration, prefetch will either trigger evictions when reaching
// cache capacity, or have failed inserts when the writer queue fills faster than it drains.
// Both outcomes are valid - test should only fail if NEITHER happens, which would indicate
// a problem with the capacity management logic.
long evictions = bc.getStats().getEvictedCount();
long failedInserts = bc.getStats().getFailedInserts();
assertTrue(
"Expected either evictions or failed inserts to demonstrate capacity management, "
+ "but got evictions=" + evictions + ", failedInserts=" + failedInserts,
evictions > 0 || failedInserts > 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public static void tearDownAfterClass() throws IOException {
@BeforeEach
public void setUp() throws IOException {
UTIL.ensureSomeNonStoppedRegionServersAvailable(2);
// Surefire reruns failed tests in the same JVM without re-running @BeforeClass. Reset injection
// state so compareAndSet in persistToMeta can succeed again and kill-before-store flags clear.
INJECTED.set(false);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(), false);
}

private ServerCrashProcedure getSCPForServer(ServerName serverName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void testBBSMultiGet() throws Exception {

private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
throws Exception {
TEST_UTIL.waitFor(5_000, () -> {
TEST_UTIL.waitFor(30_000, () -> {
long actualSuccess;
try {
actualSuccess = trafficCallable.call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,18 +1351,23 @@ public void testGetWhileRegionClose() throws IOException {
threads[i].start();
}
} finally {
done.set(true);
for (GetTillDoneOrException t : threads) {
if (t != null) {
try {
t.join(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if (this.region != null) {
HBaseTestingUtil.closeRegionAndWAL(this.region);
this.region = null;
}
}
done.set(true);
// Check for errors after threads have been stopped
for (GetTillDoneOrException t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (t.e != null) {
LOG.info("Exception=" + t.e);
assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
Expand Down Expand Up @@ -1390,7 +1395,7 @@ class GetTillDoneOrException extends Thread {

@Override
public void run() {
while (!this.done.get()) {
while (!this.done.get() && !Thread.currentThread().isInterrupted()) {
try {
assertTrue(region.get(g).size() > 0);
this.count.incrementAndGet();
Expand Down Expand Up @@ -4574,7 +4579,7 @@ public void checkNoError() {
@Override
public void run() {
done = false;
while (!done) {
while (!done && !Thread.currentThread().isInterrupted()) {
synchronized (this) {
try {
wait();
Expand Down Expand Up @@ -4790,7 +4795,7 @@ public void checkNoError() {
@Override
public void run() {
done = false;
while (!done) {
while (!done && !Thread.currentThread().isInterrupted()) {
try {
for (int r = 0; r < numRows; r++) {
byte[] row = Bytes.toBytes("row" + r);
Expand Down Expand Up @@ -5327,7 +5332,7 @@ public void testParallelIncrementWithMemStoreFlush() throws Exception {
Runnable flusher = new Runnable() {
@Override
public void run() {
while (!incrementDone.get()) {
while (!incrementDone.get() && !Thread.currentThread().isInterrupted()) {
try {
region.flush(true);
} catch (Exception e) {
Expand All @@ -5343,28 +5348,39 @@ public void run() {
long expected = (long) threadNum * incCounter;
Thread[] incrementers = new Thread[threadNum];
Thread flushThread = new Thread(flusher);
flushThread.setName("FlushThread-" + method);
for (int i = 0; i < threadNum; i++) {
incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
incrementers[i].start();
}
flushThread.start();
for (int i = 0; i < threadNum; i++) {
incrementers[i].join();
}
try {
for (int i = 0; i < threadNum; i++) {
incrementers[i].join();
}

incrementDone.set(true);
flushThread.join();
incrementDone.set(true);
flushThread.join();

Get get = new Get(Incrementer.incRow);
get.addColumn(Incrementer.family, Incrementer.qualifier);
get.readVersions(1);
Result res = this.region.get(get);
List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
Get get = new Get(Incrementer.incRow);
get.addColumn(Incrementer.family, Incrementer.qualifier);
get.readVersions(1);
Result res = this.region.get(get);
List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);

// we just got the latest version
assertEquals(1, kvs.size());
Cell kv = kvs.get(0);
assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
// we just got the latest version
assertEquals(1, kvs.size());
Cell kv = kvs.get(0);
assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
} finally {
// Ensure flush thread is stopped even if test fails or times out
incrementDone.set(true);
flushThread.interrupt();
flushThread.join(5000); // Wait up to 5 seconds for thread to stop
if (flushThread.isAlive()) {
LOG.warn("Flush thread did not stop within timeout for test " + method);
}
}
}

/**
Expand Down Expand Up @@ -5412,7 +5428,7 @@ public void testParallelAppendWithMemStoreFlush() throws Exception {
Runnable flusher = new Runnable() {
@Override
public void run() {
while (!appendDone.get()) {
while (!appendDone.get() && !Thread.currentThread().isInterrupted()) {
try {
region.flush(true);
} catch (Exception e) {
Expand All @@ -5432,30 +5448,42 @@ public void run() {
}
Thread[] appenders = new Thread[threadNum];
Thread flushThread = new Thread(flusher);
flushThread.setName("FlushThread-" + method);
for (int i = 0; i < threadNum; i++) {
appenders[i] = new Thread(new Appender(this.region, appendCounter));
appenders[i].start();
}
flushThread.start();
for (int i = 0; i < threadNum; i++) {
appenders[i].join();
}

appendDone.set(true);
flushThread.join();

Get get = new Get(Appender.appendRow);
get.addColumn(Appender.family, Appender.qualifier);
get.readVersions(1);
Result res = this.region.get(get);
List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
try {
for (int i = 0; i < threadNum; i++) {
appenders[i].join();
}

// we just got the latest version
assertEquals(1, kvs.size());
Cell kv = kvs.get(0);
byte[] appendResult = new byte[kv.getValueLength()];
System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
assertArrayEquals(expected, appendResult);
appendDone.set(true);
flushThread.join();

Get get = new Get(Appender.appendRow);
get.addColumn(Appender.family, Appender.qualifier);
get.readVersions(1);
Result res = this.region.get(get);
List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);

// we just got the latest version
assertEquals(1, kvs.size());
Cell kv = kvs.get(0);
byte[] appendResult = new byte[kv.getValueLength()];
System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0,
kv.getValueLength());
assertArrayEquals(expected, appendResult);
} finally {
// Ensure flush thread is stopped even if test fails or times out
appendDone.set(true);
flushThread.interrupt();
flushThread.join(5000); // Wait up to 5 seconds for thread to stop
if (flushThread.isAlive()) {
LOG.warn("Flush thread did not stop within timeout for test " + method);
}
}
}

/**
Expand Down Expand Up @@ -7489,7 +7517,7 @@ public void testMutateRowInParallel() throws Exception {
// Writer thread
Thread writerThread = new Thread(() -> {
try {
while (true) {
while (!Thread.currentThread().isInterrupted()) {
// If all the reader threads finish, then stop the writer thread
if (latch.await(0, TimeUnit.MILLISECONDS)) {
return;
Expand All @@ -7514,15 +7542,19 @@ public void testMutateRowInParallel() throws Exception {
.addColumn(fam1, q3, tsIncrement + 1, Bytes.toBytes(1L))
.addColumn(fam1, q4, tsAppend + 1, Bytes.toBytes("a")) });
}
} catch (InterruptedException e) {
// Test interrupted, exit gracefully
Thread.currentThread().interrupt();
} catch (Exception e) {
assertionError.set(new AssertionError(e));
}
});
writerThread.setName("WriterThread-" + method);
writerThread.start();

// Reader threads
for (int i = 0; i < numReaderThreads; i++) {
new Thread(() -> {
Thread readerThread = new Thread(() -> {
try {
for (int j = 0; j < 10000; j++) {
// Verify the values
Expand Down Expand Up @@ -7551,13 +7583,24 @@ public void testMutateRowInParallel() throws Exception {
}

latch.countDown();
}).start();
});
readerThread.setName("ReaderThread-" + i + "-" + method);
readerThread.start();
}

writerThread.join();
try {
writerThread.join();

if (assertionError.get() != null) {
throw assertionError.get();
if (assertionError.get() != null) {
throw assertionError.get();
}
} finally {
// Ensure writer thread is stopped on test timeout
writerThread.interrupt();
writerThread.join(5000);
if (writerThread.isAlive()) {
LOG.warn("Writer thread did not stop within timeout for test " + method);
}
}
}

Expand Down
Loading