Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;

/**
* The assigner to assign entries to the proper {@link Consumer} in the shared subscription.
*/

Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
@Slf4j
@RequiredArgsConstructor
public class SharedConsumerAssignor {

Expand All @@ -50,6 +56,8 @@ public class SharedConsumerAssignor {
// Process the unassigned messages, e.g. adding them to the replay queue
private final java.util.function.Consumer<EntryAndMetadata> unassignedMessageProcessor;

private final Subscription subscription;

public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata> entryAndMetadataList,
final int numConsumers) {
assert numConsumers >= 0;
Expand Down Expand Up @@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata>

if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata);
availablePermits--;
} else {
final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer, availablePermits);
final String uuid = metadata.getUuid();
Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
unassignedMessageProcessor.accept(entryAndMetadata);
continue;
if (metadata.getChunkId() != 0) {
if (subscription != null) {
log.warn("[{}][{}] Skip the message because of it not the first chunk."
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof PulsarCompactorSubscription)) {
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
subscription.acknowledgeMessage(Collections.singletonList(
entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap());
entryAndMetadata.release();
}
}
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
}
consumerForUuid = consumer;
uuidToConsumer.put(uuid, consumerForUuid);
}

final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits);
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the uuid
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
uuidToConsumer.remove(uuid);
}

consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata);
Comment thread
zjxxzjwang marked this conversation as resolved.
Outdated
consumerToPermits.put(consumerForUuid, permits - 1);
if (consumerForUuid == consumer) {
availablePermits--;
}
}
availablePermits--;
}

for (; index < entryAndMetadataList.size(); index++) {
Expand All @@ -111,29 +145,4 @@ private Consumer getConsumer(final int numConsumers) {
}
return null;
}

private Consumer getConsumerForUuid(final MessageMetadata metadata,
final Consumer defaultConsumer,
final int currentAvailablePermits) {
final String uuid = metadata.getUuid();
Consumer consumer = uuidToConsumer.get(uuid);
if (consumer == null) {
if (metadata.getChunkId() != 0) {
// Not the first chunk, skip it
return null;
}
consumer = defaultConsumer;
uuidToConsumer.put(uuid, consumer);
}
final int permits = consumerToPermits.computeIfAbsent(consumer, Consumer::getAvailablePermits);
if (permits <= 0) {
return null;
}
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the cache
uuidToConsumer.remove(uuid);
}
consumerToPermits.put(consumer, currentAvailablePermits - 1);
return consumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay);
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay, subscription);
ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public PersistentDispatcherMultipleConsumersClassic(PersistentTopic topic, Manag
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay, subscription);
this.readFailureBackoff = new Backoff(
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void prepareData() {
roundRobinConsumerSelector.clear();
entryAndMetadataList.clear();
replayQueue.clear();
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add);
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null);
Comment thread
zjxxzjwang marked this conversation as resolved.
final AtomicLong entryId = new AtomicLong(0L);
final MockProducer producerA = new MockProducer("A", entryId, entryAndMetadataList);
final MockProducer producerB = new MockProducer("B", entryId, entryAndMetadataList);
Expand Down
Loading