From ee60269b31196de256c24778c1bbb8f66cd600d5 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sat, 25 Apr 2026 16:21:03 +0800 Subject: [PATCH] [fix][broker] Wait for orphan schema ledger cleanup before retry --- .../schema/BookkeeperSchemaStorage.java | 102 +++++++++--------- .../org/apache/pulsar/schema/SchemaTest.java | 88 +++++++++++---- 2 files changed, 123 insertions(+), 67 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index f3a1baa538889..a7921b8b8d089 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -39,7 +39,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.Function; import lombok.CustomLog; -import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -366,33 +365,24 @@ private CompletableFuture createNewSchema(String schemaId, byte[] data, by SchemaLocator locator = new SchemaLocator(); locator.setInfo().copyFrom(info); locator.addIndex().copyFrom(info); - CompletableFuture created = createSchemaLocator( - getSchemaPath(schemaId), locator).thenApply(ignore -> 0L); - // Step 3: Handle failure by cleaning up the orphan ledger // if concurrent schema creation caused a CAS conflict - return created.whenComplete((__, ex) -> { - if (ex == null) { - return; - } - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn() - .attr("schemaId", schemaId) - .attr("ledgerId", position.getLedgerId()) - .exception(cause) - .log("Failed to create schema locator with position"); - if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { - bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, ctx) -> { - if (rc != BKException.Code.OK) { - log.warn() - .attr("schemaId", schemaId) - .attr("ledgerId", position.getLedgerId()) - .attr("rc", rc) - .log("Failed to delete orphan ledger after schema locator creation failed"); + return createSchemaLocator(getSchemaPath(schemaId), locator) + .thenApply(ignore -> 0L) + .exceptionallyCompose(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", position.getLedgerId()) + .exception(cause) + .log("Failed to create schema locator with position"); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + return deleteLedgerAsync(schemaId, position.getLedgerId(), + "schema locator creation failed") + .thenCompose(__ -> FutureUtil.failedFuture(cause)); } - }, null); - } - }); + return FutureUtil.failedFuture(cause); + }); }); } @@ -504,32 +494,48 @@ private CompletableFuture updateSchemaLocator( return updateSchemaLocator(getSchemaPath(schemaId), newLocator , locatorEntry.version - ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> { - if (ex != null) { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn() - .attr("schemaId", schemaId) - .attr("ledgerId", position.getLedgerId()) - .exception(cause) - .log("Failed to update schema locator with position"); - if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { - bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() { - @Override - public void deleteComplete(int rc, Object ctx) { - if (rc != BKException.Code.OK) { - log.warn() - .attr("schemaId", schemaId) - .attr("ledgerId", position.getLedgerId()) - .attr("rc", rc) - .log("Failed to delete ledger after updating schema locator failed, rc"); - } - } - }, null); - } + ).thenApply(ignore -> nextVersion).exceptionallyCompose(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", position.getLedgerId()) + .exception(cause) + .log("Failed to update schema locator with position"); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + return deleteLedgerAsync(schemaId, position.getLedgerId(), + "schema locator update failed") + .thenCompose(__ -> FutureUtil.failedFuture(cause)); } + return FutureUtil.failedFuture(cause); }); } + private CompletableFuture deleteLedgerAsync(String schemaId, long ledgerId, String reason) { + CompletableFuture future = new CompletableFuture<>(); + try { + bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { + if (rc != BKException.Code.OK) { + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", ledgerId) + .attr("rc", rc) + .attr("reason", reason) + .log("Failed to delete orphan schema ledger"); + } + future.complete(null); + }, null); + } catch (Throwable t) { + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", ledgerId) + .attr("reason", reason) + .exception(t) + .log("Failed to trigger orphan schema ledger deletion"); + future.complete(null); + } + return future; + } + @NonNull private CompletableFuture findSchemaEntryByVersion( List index, @@ -815,4 +821,4 @@ public static CompletableFuture ignoreUnrecoverableBKException(Completabl throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t); }); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 748818c6bd53b..7e5a698787ded 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -55,6 +55,8 @@ import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -1394,48 +1396,96 @@ public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception { final String topic = getTopicName(ns, "testConcurrentCreateSchemaNoOrphanLedger"); final String schemaName = TopicName.get(topic).getSchemaName(); - org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk = - (org.apache.bookkeeper.client.PulsarMockBookKeeper) pulsar.getBookKeeperClient(); + PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) pulsar.getBookKeeperClient(); // Concurrently create producers with the same schema on a brand-new topic int concurrency = 16; + List>> producers = createProducersInParallel( + topic, Schema.AVRO(Schemas.PersonOne.class), concurrency); + try { + FutureUtil.waitForAll(producers).join(); + + // Verify only 1 schema version exists + assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + + int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName); + assertEquals(schemaLedgerCount, 1, + "Expected exactly 1 schema ledger for the topic, but found " + + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + } finally { + closeProducers(producers); + } + } + + /** + * Test that concurrent compatible schema updates clean up ledgers created by requests + * that lose the schema locator CAS race. + */ + @Test + public void testConcurrentUpdateSchemaNoOrphanLedger() throws Exception { + final String namespace = "test-namespace-" + randomName(16); + String ns = PUBLIC_TENANT + "/" + namespace; + admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME)); + + final String topic = getTopicName(ns, "testConcurrentUpdateSchemaNoOrphanLedger"); + final String schemaName = TopicName.get(topic).getSchemaName(); + PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) pulsar.getBookKeeperClient(); + + @Cleanup + Producer initialProducer = pulsarClient + .newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic) + .create(); + assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + + int concurrency = 16; + List>> producers = createProducersInParallel( + topic, Schema.AVRO(Schemas.PersonThree.class), concurrency); + try { + FutureUtil.waitForAll(producers).join(); + + assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); + int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName); + assertEquals(schemaLedgerCount, 2, + "Expected exactly 2 schema ledgers for the topic, but found " + + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + } finally { + closeProducers(producers); + } + } + + private List>> createProducersInParallel( + String topic, Schema schema, int concurrency) throws InterruptedException { @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(concurrency); - List>> producers = - Collections.synchronizedList(new ArrayList<>(concurrency)); + List>> producers = Collections.synchronizedList(new ArrayList<>(concurrency)); CountDownLatch latch = new CountDownLatch(concurrency); for (int i = 0; i < concurrency; i++) { executor.execute(() -> { try { - producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topic).createAsync()); + producers.add(pulsarClient.newProducer(schema).topic(topic).createAsync()); } finally { latch.countDown(); } }); } latch.await(); - FutureUtil.waitForAll(producers).join(); - - // Verify only 1 schema version exists - assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + return producers; + } - // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" matches this topic's schemaName. - // If orphan ledgers were not cleaned up, there would be more than 1. + private int countSchemaLedgers(PulsarMockBookKeeper mockBk, String schemaName) { int schemaLedgerCount = 0; - for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) { + for (PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) { Map metadata = lh.getLedgerMetadata().getCustomMetadata(); byte[] schemaIdBytes = metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID); - if (schemaIdBytes != null - && schemaName.equals(new String(schemaIdBytes, java.nio.charset.StandardCharsets.UTF_8))) { + if (schemaIdBytes != null && schemaName.equals(new String(schemaIdBytes, StandardCharsets.UTF_8))) { schemaLedgerCount++; } } - assertEquals(schemaLedgerCount, 1, - "Expected exactly 1 schema ledger for the topic, but found " - + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + return schemaLedgerCount; + } - // Cleanup + private void closeProducers(List>> producers) { producers.forEach(p -> { try { p.join().close();