Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import lombok.CustomLog;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
Expand Down Expand Up @@ -366,33 +365,24 @@ private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, by
SchemaLocator locator = new SchemaLocator();
locator.setInfo().copyFrom(info);
locator.addIndex().copyFrom(info);
CompletableFuture<Long> created = createSchemaLocator(
getSchemaPath(schemaId), locator).thenApply(ignore -> 0L);

// Step 3: Handle failure by cleaning up the orphan ledger
// if concurrent schema creation caused a CAS conflict
return created.whenComplete((__, ex) -> {
if (ex == null) {
return;
}
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", position.getLedgerId())
.exception(cause)
.log("Failed to create schema locator with position");
if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, ctx) -> {
if (rc != BKException.Code.OK) {
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", position.getLedgerId())
.attr("rc", rc)
.log("Failed to delete orphan ledger after schema locator creation failed");
return createSchemaLocator(getSchemaPath(schemaId), locator)
.thenApply(ignore -> 0L)
.exceptionallyCompose(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", position.getLedgerId())
.exception(cause)
.log("Failed to create schema locator with position");
if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
return deleteLedgerAsync(schemaId, position.getLedgerId(),
"schema locator creation failed")
.thenCompose(__ -> FutureUtil.failedFuture(cause));
}
}, null);
}
});
return FutureUtil.failedFuture(cause);
});
});
}

Expand Down Expand Up @@ -504,32 +494,48 @@ private CompletableFuture<Long> updateSchemaLocator(
return updateSchemaLocator(getSchemaPath(schemaId),
newLocator
, locatorEntry.version
).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
if (ex != null) {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", position.getLedgerId())
.exception(cause)
.log("Failed to update schema locator with position");
if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
if (rc != BKException.Code.OK) {
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", position.getLedgerId())
.attr("rc", rc)
.log("Failed to delete ledger after updating schema locator failed, rc");
}
}
}, null);
}
).thenApply(ignore -> nextVersion).exceptionallyCompose(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", position.getLedgerId())
.exception(cause)
.log("Failed to update schema locator with position");
if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
return deleteLedgerAsync(schemaId, position.getLedgerId(),
"schema locator update failed")
.thenCompose(__ -> FutureUtil.failedFuture(cause));
}
return FutureUtil.failedFuture(cause);
});
}

private CompletableFuture<Void> deleteLedgerAsync(String schemaId, long ledgerId, String reason) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
if (rc != BKException.Code.OK) {
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", ledgerId)
.attr("rc", rc)
.attr("reason", reason)
.log("Failed to delete orphan schema ledger");
}
future.complete(null);
}, null);
} catch (Throwable t) {
log.warn()
.attr("schemaId", schemaId)
.attr("ledgerId", ledgerId)
.attr("reason", reason)
.exception(t)
.log("Failed to trigger orphan schema ledger deletion");
future.complete(null);
}
return future;
}

@NonNull
private CompletableFuture<SchemaEntry> findSchemaEntryByVersion(
List<IndexEntry> index,
Expand Down Expand Up @@ -815,4 +821,4 @@ public static <T> CompletableFuture<T> ignoreUnrecoverableBKException(Completabl
throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -1394,48 +1396,96 @@ public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception {
final String topic = getTopicName(ns, "testConcurrentCreateSchemaNoOrphanLedger");
final String schemaName = TopicName.get(topic).getSchemaName();

org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
(org.apache.bookkeeper.client.PulsarMockBookKeeper) pulsar.getBookKeeperClient();
PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) pulsar.getBookKeeperClient();

// Concurrently create producers with the same schema on a brand-new topic
int concurrency = 16;
List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = createProducersInParallel(
topic, Schema.AVRO(Schemas.PersonOne.class), concurrency);
try {
FutureUtil.waitForAll(producers).join();

// Verify only 1 schema version exists
assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);

int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
assertEquals(schemaLedgerCount, 1,
"Expected exactly 1 schema ledger for the topic, but found "
+ schemaLedgerCount + ". Orphan ledgers were not cleaned up.");
} finally {
closeProducers(producers);
}
}

/**
* Test that concurrent compatible schema updates clean up ledgers created by requests
* that lose the schema locator CAS race.
*/
@Test
public void testConcurrentUpdateSchemaNoOrphanLedger() throws Exception {
final String namespace = "test-namespace-" + randomName(16);
String ns = PUBLIC_TENANT + "/" + namespace;
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));

final String topic = getTopicName(ns, "testConcurrentUpdateSchemaNoOrphanLedger");
final String schemaName = TopicName.get(topic).getSchemaName();
PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) pulsar.getBookKeeperClient();

@Cleanup
Producer<Schemas.PersonOne> initialProducer = pulsarClient
.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic)
.create();
assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);

int concurrency = 16;
List<CompletableFuture<Producer<Schemas.PersonThree>>> producers = createProducersInParallel(
topic, Schema.AVRO(Schemas.PersonThree.class), concurrency);
try {
FutureUtil.waitForAll(producers).join();

assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
assertEquals(schemaLedgerCount, 2,
"Expected exactly 2 schema ledgers for the topic, but found "
+ schemaLedgerCount + ". Orphan ledgers were not cleaned up.");
} finally {
closeProducers(producers);
}
}

private <T> List<CompletableFuture<Producer<T>>> createProducersInParallel(
String topic, Schema<T> schema, int concurrency) throws InterruptedException {
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
Collections.synchronizedList(new ArrayList<>(concurrency));
List<CompletableFuture<Producer<T>>> producers = Collections.synchronizedList(new ArrayList<>(concurrency));
CountDownLatch latch = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
try {
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topic).createAsync());
producers.add(pulsarClient.newProducer(schema).topic(topic).createAsync());
} finally {
latch.countDown();
}
});
}
latch.await();
FutureUtil.waitForAll(producers).join();

// Verify only 1 schema version exists
assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
return producers;
}

// Count surviving BK ledgers whose customMetadata "pulsar/schemaId" matches this topic's schemaName.
// If orphan ledgers were not cleaned up, there would be more than 1.
private int countSchemaLedgers(PulsarMockBookKeeper mockBk, String schemaName) {
int schemaLedgerCount = 0;
for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) {
for (PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) {
Map<String, byte[]> metadata = lh.getLedgerMetadata().getCustomMetadata();
byte[] schemaIdBytes = metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID);
if (schemaIdBytes != null
&& schemaName.equals(new String(schemaIdBytes, java.nio.charset.StandardCharsets.UTF_8))) {
if (schemaIdBytes != null && schemaName.equals(new String(schemaIdBytes, StandardCharsets.UTF_8))) {
schemaLedgerCount++;
}
}
assertEquals(schemaLedgerCount, 1,
"Expected exactly 1 schema ledger for the topic, but found "
+ schemaLedgerCount + ". Orphan ledgers were not cleaned up.");
return schemaLedgerCount;
}

// Cleanup
private <T> void closeProducers(List<CompletableFuture<Producer<T>>> producers) {
producers.forEach(p -> {
try {
p.join().close();
Expand Down
Loading