diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java index ed877104b..93fc3a593 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java @@ -72,4 +72,9 @@ private void safelyRemoveConsumer(Consumer consumer) { public AuthenticationDataSource getAuthenticationData() { return ctx.channel().attr(AUTH_DATA_ATTRIBUTE_KEY).get(); } + + @Override + public String getClientVersion() { + return "mqtt"; + } } diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MQTTConsumer.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MQTTConsumer.java index b5da1f8b2..7ad8025c1 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MQTTConsumer.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MQTTConsumer.java @@ -28,7 +28,9 @@ import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions; import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import lombok.Getter; @@ -43,6 +45,7 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; /** * MQTT consumer. @@ -72,7 +75,7 @@ public MQTTConsumer(Subscription subscription, String mqttTopicName, String puls MQTTServerCnx cnx, MqttQoS qos, PacketIdGenerator packetIdGenerator, OutstandingPacketContainer outstandingPacketContainer, MQTTMetricsCollector metricsCollector) { super(subscription, CommandSubscribe.SubType.Shared, pulsarTopicName, 0, 0, - connection.getClientId(), true, cnx, connection.getUserRole(), null, false, + connection.getClientId(), true, cnx, connection.getClientId(), createConsumerMetadata(connection), false, null, MessageId.latest, Commands.DEFAULT_CONSUMER_EPOCH); this.pulsarTopicName = pulsarTopicName; this.mqttTopicName = mqttTopicName; @@ -90,6 +93,8 @@ public Future sendMessages(List entries, EntryBatchSizes EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) { ChannelPromise promise = cnx.ctx().newPromise(); + long sentMessages = 0; + long sentBytes = 0; MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages); for (Entry entry : entries) { String toConsumerTopicName = PulsarTopicUtils.getToConsumerTopicName(mqttTopicName, pulsarTopicName); @@ -137,6 +142,8 @@ public Future sendMessages(List entries, EntryBatchSizes CommandAck.AckType.Individual, Collections.emptyMap()); continue; } + sentMessages++; + sentBytes += readableBytes; cnx.ctx().channel().write(new MqttAdapterMessage(connection.getClientId(), msg, connection.isFromProxy())); } @@ -149,7 +156,15 @@ public Future sendMessages(List entries, EntryBatchSizes CommandAck.AckType.Cumulative, Collections.emptyMap()); } } + final long deliveredMessages = sentMessages; + final long deliveredBytes = sentBytes; cnx.ctx().channel().writeAndFlush(Unpooled.EMPTY_BUFFER, promise); + promise.addListener(future -> { + if (future.isSuccess() && (deliveredMessages > 0 || deliveredBytes > 0)) { + recordStatsUpdate(System.currentTimeMillis(), 0, System.currentTimeMillis(), + deliveredMessages, deliveredBytes); + } + }); return promise; } @@ -173,12 +188,44 @@ public void incrementPermits(int permits) { MESSAGE_PERMITS_UPDATER.addAndGet(this, var); this.getSubscription().consumerFlow(this, availablePermits); ADD_PERMITS_UPDATER.set(this, 0); + recordStatsUpdate(0, 0, System.currentTimeMillis(), 0, 0); } } public void addAllPermits() { this.availablePermits = clientRestrictions.getReceiveMaximum(); this.getSubscription().consumerFlow(this, availablePermits); + recordStatsUpdate(0, 0, System.currentTimeMillis(), 0, 0); + } + + public void recordAck() { + recordStatsUpdate(0, System.currentTimeMillis(), 0, 0, 0); + } + + private void recordStatsUpdate(long lastConsumedTimestamp, long lastAckedTimestamp, + long lastConsumedFlowTimestamp, long msgOutCounter, long bytesOutCounter) { + ConsumerStatsImpl currentStats = getStats(); + ConsumerStatsImpl stats = new ConsumerStatsImpl(); + stats.setAvailablePermits(getAvailablePermits()); + stats.setAvgMessagesPerEntry(getAvgMessagesPerEntry()); + stats.setLastConsumedTimestamp( + lastConsumedTimestamp > 0 ? lastConsumedTimestamp : currentStats.getLastConsumedTimestamp()); + stats.setLastAckedTimestamp(lastAckedTimestamp > 0 ? lastAckedTimestamp : currentStats.getLastAckedTimestamp()); + stats.setLastConsumedFlowTimestamp(lastConsumedFlowTimestamp > 0 + ? lastConsumedFlowTimestamp : currentStats.getLastConsumedFlowTimestamp()); + stats.setMsgOutCounter(msgOutCounter); + stats.setBytesOutCounter(bytesOutCounter); + updateStats(stats); + } + + private static Map createConsumerMetadata(Connection connection) { + Map metadata = new HashMap<>(); + metadata.put("protocol", "mqtt"); + metadata.put("appId", connection.getClientId()); + if (connection.getUserRole() != null) { + metadata.put("authRole", connection.getUserRole()); + } + return metadata; } @Override diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java index 37fb96f9b..ff8990a7e 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java @@ -186,6 +186,7 @@ public void processPubAck(MqttAdapterMessage adapter) { packet.getConsumer().getSubscription().acknowledgeMessage(Collections.singletonList(position), CommandAck.AckType.Individual, Collections.emptyMap()); packet.getConsumer().getPendingAcks().remove(packet.getLedgerId(), packet.getEntryId()); + packet.getConsumer().recordAck(); packet.getConsumer().incrementPermits(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java index 1e6eec042..a2d8077cc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java @@ -43,6 +43,8 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.TopicStats; @@ -156,6 +158,53 @@ public void testSendByPulsarAndReceiveByMqtt(boolean batchEnabled) throws Except producer.close(); } + @Test(timeOut = TIMEOUT) + public void testMqttConsumerStatsAreReported() throws Exception { + final String topicName = "persistent://public/default/testMqttConsumerStatsAreReported"; + final String clientId = "mqtt-stats-client"; + final String payload = "Hello MQTT stats"; + + MQTT mqtt = createMQTTClient(); + mqtt.setClientId(clientId); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(new Topic[]{new Topic(topicName, QoS.AT_LEAST_ONCE)}); + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + producer.send(payload); + + Message received = connection.receive(); + Assert.assertEquals(received.getTopic(), topicName); + Assert.assertEquals(new String(received.getPayload(), StandardCharsets.UTF_8), payload); + received.ack(); + + Awaitility.await().untilAsserted(() -> { + TopicStats stats = admin.topics().getStats(topicName); + SubscriptionStats subscriptionStats = stats.getSubscriptions().get(clientId); + Assert.assertNotNull(subscriptionStats); + Assert.assertTrue(subscriptionStats.getMsgOutCounter() > 0); + Assert.assertTrue(subscriptionStats.getBytesOutCounter() > 0); + Assert.assertTrue(subscriptionStats.getLastConsumedTimestamp() > 0); + Assert.assertTrue(subscriptionStats.getLastAckedTimestamp() > 0); + + Assert.assertEquals(subscriptionStats.getConsumers().size(), 1); + ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0); + Assert.assertEquals(consumerStats.getClientVersion(), "mqtt"); + Assert.assertEquals(consumerStats.getAppId(), clientId); + Assert.assertEquals(consumerStats.getMetadata().get("protocol"), "mqtt"); + Assert.assertTrue(consumerStats.getMsgOutCounter() > 0); + Assert.assertTrue(consumerStats.getBytesOutCounter() > 0); + Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0); + }); + + connection.disconnect(); + producer.close(); + } + @Test(timeOut = TIMEOUT) public void testBacklogShouldBeZeroWithQos0() throws Exception { final String topicName = "persistent://public/default/testBacklogShouldBeZeroWithQos0";