Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ private void safelyRemoveConsumer(Consumer consumer) {
public AuthenticationDataSource getAuthenticationData() {
return ctx.channel().attr(AUTH_DATA_ATTRIBUTE_KEY).get();
}

@Override
public String getClientVersion() {
return "mqtt";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions;
import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.Getter;
Expand All @@ -43,6 +45,7 @@
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;

/**
* MQTT consumer.
Expand Down Expand Up @@ -72,7 +75,7 @@ public MQTTConsumer(Subscription subscription, String mqttTopicName, String puls
MQTTServerCnx cnx, MqttQoS qos, PacketIdGenerator packetIdGenerator,
OutstandingPacketContainer outstandingPacketContainer, MQTTMetricsCollector metricsCollector) {
super(subscription, CommandSubscribe.SubType.Shared, pulsarTopicName, 0, 0,
connection.getClientId(), true, cnx, connection.getUserRole(), null, false,
connection.getClientId(), true, cnx, connection.getClientId(), createConsumerMetadata(connection), false,
null, MessageId.latest, Commands.DEFAULT_CONSUMER_EPOCH);
this.pulsarTopicName = pulsarTopicName;
this.mqttTopicName = mqttTopicName;
Expand All @@ -90,6 +93,8 @@ public Future<Void> sendMessages(List<? extends Entry> entries, EntryBatchSizes
EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes,
long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
ChannelPromise promise = cnx.ctx().newPromise();
long sentMessages = 0;
long sentBytes = 0;
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
for (Entry entry : entries) {
String toConsumerTopicName = PulsarTopicUtils.getToConsumerTopicName(mqttTopicName, pulsarTopicName);
Expand Down Expand Up @@ -137,6 +142,8 @@ public Future<Void> sendMessages(List<? extends Entry> entries, EntryBatchSizes
CommandAck.AckType.Individual, Collections.emptyMap());
continue;
}
sentMessages++;
sentBytes += readableBytes;
cnx.ctx().channel().write(new MqttAdapterMessage(connection.getClientId(), msg,
connection.isFromProxy()));
}
Expand All @@ -149,7 +156,15 @@ public Future<Void> sendMessages(List<? extends Entry> entries, EntryBatchSizes
CommandAck.AckType.Cumulative, Collections.emptyMap());
}
}
final long deliveredMessages = sentMessages;
final long deliveredBytes = sentBytes;
cnx.ctx().channel().writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
promise.addListener(future -> {
if (future.isSuccess() && (deliveredMessages > 0 || deliveredBytes > 0)) {
recordStatsUpdate(System.currentTimeMillis(), 0, System.currentTimeMillis(),
deliveredMessages, deliveredBytes);
}
});
return promise;
}

Expand All @@ -173,12 +188,44 @@ public void incrementPermits(int permits) {
MESSAGE_PERMITS_UPDATER.addAndGet(this, var);
this.getSubscription().consumerFlow(this, availablePermits);
ADD_PERMITS_UPDATER.set(this, 0);
recordStatsUpdate(0, 0, System.currentTimeMillis(), 0, 0);
}
}

public void addAllPermits() {
this.availablePermits = clientRestrictions.getReceiveMaximum();
this.getSubscription().consumerFlow(this, availablePermits);
recordStatsUpdate(0, 0, System.currentTimeMillis(), 0, 0);
}

public void recordAck() {
recordStatsUpdate(0, System.currentTimeMillis(), 0, 0, 0);
}

private void recordStatsUpdate(long lastConsumedTimestamp, long lastAckedTimestamp,
long lastConsumedFlowTimestamp, long msgOutCounter, long bytesOutCounter) {
ConsumerStatsImpl currentStats = getStats();
ConsumerStatsImpl stats = new ConsumerStatsImpl();
stats.setAvailablePermits(getAvailablePermits());
stats.setAvgMessagesPerEntry(getAvgMessagesPerEntry());
stats.setLastConsumedTimestamp(
lastConsumedTimestamp > 0 ? lastConsumedTimestamp : currentStats.getLastConsumedTimestamp());
stats.setLastAckedTimestamp(lastAckedTimestamp > 0 ? lastAckedTimestamp : currentStats.getLastAckedTimestamp());
stats.setLastConsumedFlowTimestamp(lastConsumedFlowTimestamp > 0
? lastConsumedFlowTimestamp : currentStats.getLastConsumedFlowTimestamp());
stats.setMsgOutCounter(msgOutCounter);
stats.setBytesOutCounter(bytesOutCounter);
updateStats(stats);
}

private static Map<String, String> createConsumerMetadata(Connection connection) {
Map<String, String> metadata = new HashMap<>();
metadata.put("protocol", "mqtt");
metadata.put("appId", connection.getClientId());
if (connection.getUserRole() != null) {
metadata.put("authRole", connection.getUserRole());
}
return metadata;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public void processPubAck(MqttAdapterMessage adapter) {
packet.getConsumer().getSubscription().acknowledgeMessage(Collections.singletonList(position),
CommandAck.AckType.Individual, Collections.emptyMap());
packet.getConsumer().getPendingAcks().remove(packet.getLedgerId(), packet.getEntryId());
packet.getConsumer().recordAck();
packet.getConsumer().incrementPermits();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -156,6 +158,53 @@ public void testSendByPulsarAndReceiveByMqtt(boolean batchEnabled) throws Except
producer.close();
}

@Test(timeOut = TIMEOUT)
public void testMqttConsumerStatsAreReported() throws Exception {
final String topicName = "persistent://public/default/testMqttConsumerStatsAreReported";
final String clientId = "mqtt-stats-client";
final String payload = "Hello MQTT stats";

MQTT mqtt = createMQTTClient();
mqtt.setClientId(clientId);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[]{new Topic(topicName, QoS.AT_LEAST_ONCE)});

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.create();

producer.send(payload);

Message received = connection.receive();
Assert.assertEquals(received.getTopic(), topicName);
Assert.assertEquals(new String(received.getPayload(), StandardCharsets.UTF_8), payload);
received.ack();

Awaitility.await().untilAsserted(() -> {
TopicStats stats = admin.topics().getStats(topicName);
SubscriptionStats subscriptionStats = stats.getSubscriptions().get(clientId);
Assert.assertNotNull(subscriptionStats);
Assert.assertTrue(subscriptionStats.getMsgOutCounter() > 0);
Assert.assertTrue(subscriptionStats.getBytesOutCounter() > 0);
Assert.assertTrue(subscriptionStats.getLastConsumedTimestamp() > 0);
Assert.assertTrue(subscriptionStats.getLastAckedTimestamp() > 0);

Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0);
Assert.assertEquals(consumerStats.getClientVersion(), "mqtt");
Assert.assertEquals(consumerStats.getAppId(), clientId);
Assert.assertEquals(consumerStats.getMetadata().get("protocol"), "mqtt");
Assert.assertTrue(consumerStats.getMsgOutCounter() > 0);
Assert.assertTrue(consumerStats.getBytesOutCounter() > 0);
Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
});

connection.disconnect();
producer.close();
}

@Test(timeOut = TIMEOUT)
public void testBacklogShouldBeZeroWithQos0() throws Exception {
final String topicName = "persistent://public/default/testBacklogShouldBeZeroWithQos0";
Expand Down
Loading