Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,8 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;

/**
Expand Down Expand Up @@ -88,15 +91,46 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata>

if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata);
availablePermits--;
} else {
final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer, availablePermits);
final String uuid = metadata.getUuid();
Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
if (metadata.getChunkId() != 0) {
if (subscription != null) {
log.warn("[{}][{}] Skip the message because it is not the first chunk."
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message.
if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(Collections.singletonList(
entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap());
entryAndMetadata.release();
}
}
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
continue;
}
consumerForUuid = consumer;
uuidToConsumer.put(uuid, consumerForUuid);
}

final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits);
if (permits <= 0) {
unassignedMessageProcessor.accept(entryAndMetadata);
continue;
}
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the uuid from the cache.
uuidToConsumer.remove(uuid);
}

consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata);
consumerToPermits.put(consumerForUuid, permits - 1);
if (consumerForUuid == consumer) {
availablePermits--;
}
}
availablePermits--;
}

for (; index < entryAndMetadataList.size(); index++) {
Expand All @@ -119,29 +153,4 @@ private Consumer getConsumer(final int numConsumers) {
}
return null;
}

private Consumer getConsumerForUuid(final MessageMetadata metadata,
final Consumer defaultConsumer,
final int currentAvailablePermits) {
final String uuid = metadata.getUuid();
Consumer consumer = uuidToConsumer.get(uuid);
if (consumer == null) {
if (metadata.getChunkId() != 0) {
// Not the first chunk, skip it
return null;
}
consumer = defaultConsumer;
uuidToConsumer.put(uuid, consumer);
}
final int permits = consumerToPermits.computeIfAbsent(consumer, Consumer::getAvailablePermits);
if (permits <= 0) {
return null;
}
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the cache
uuidToConsumer.remove(uuid);
}
consumerToPermits.put(consumer, currentAvailablePermits - 1);
return consumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
Expand All @@ -37,6 +43,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -294,4 +301,52 @@ public void testChunkMessagesNotBeLostNoConsumer() {
assertTrue(assignor.getUuidToConsumer().isEmpty());
}

/**
* Simulate the occurrence of chunk messages. When a message with chunk ID 0 is abnormally lost, subsequent chunk
* messages for that batch should be skipped instead of blocking the entire subscription.
*/
@Test
public void testSkipOrphanChunk() {
cleanupQueue.clear();
Subscription subscription = mock(Subscription.class);
when(subscription.getTopicName()).thenReturn("test-topic");
when(subscription.getName()).thenReturn("test-sub");

assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, subscription);

final Consumer consumer = new Consumer("C1", 10);
roundRobinConsumerSelector.addConsumers(consumer);

List<EntryAndMetadata> entries = new ArrayList<>();
AtomicLong entryId = new AtomicLong(0);
MockProducer producer = new MockProducer("P", entryId, entries);

// 0:0@P-0
producer.sendMessage();

// Simulate the sending of chunk messages with missing chunkId '0'
producer.sendChunk(1, 3);
producer.sendChunk(2, 3);

// 0:3@P-2
producer.sendMessage();

// Add to cleanupQueue but skip the orphan chunk as it will be released by assignor
cleanupQueue.add(entries.get(0));
cleanupQueue.add(entries.get(3));

Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
Map<Consumer, List<EntryAndMetadata>> result = assignor.assign(entries, 1);

List<EntryAndMetadata> assigned = result.get(consumer);
assertEquals(assigned.size(), 2);
assertEquals(assigned.get(0).toString(), "0:0@P-0");
assertEquals(assigned.get(1).toString(), "0:3@P-2");

verify(subscription, times(2)).acknowledgeMessage(any(), eq(AckType.Individual), any());

assertTrue(replayQueue.isEmpty());
assertTrue(assignor.getUuidToConsumer().isEmpty());
}


}
Loading