From 55e531b91c6a6a3e6ccf7c66dda96f812f36fa62 Mon Sep 17 00:00:00 2001 From: Dave Cridland Date: Thu, 7 May 2026 11:48:09 +0100 Subject: [PATCH 1/5] fix: synchronize CsiManager.activate() and flush full queue atomically activate() was not synchronized, causing a race with queueOrPush() on the shared queue field. It also used an indirect flush (pollLast + re- deliver) that could lose stanzas queued between pollLast and the re- entry into queueOrPush. Fix: drain the entire queue atomically under the lock, set active=true, then deliver all stanzas outside the lock so I/O does not block other threads from queuing new stanzas. Co-authored-by: Junie --- .../jivesoftware/openfire/csi/CsiManager.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java b/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java index f5b251952b..6b26bfbe77 100644 --- a/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java +++ b/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java @@ -138,12 +138,21 @@ public synchronized void process(@Nonnull final Element nonza) public void activate() { Log.trace("Session for '{}' to CSI 'active'", session.getAddress()); - active = true; - // If there are delayed stanzas, cause them to be delivered by rescheduling the last one. - if (!queue.isEmpty()) { + // Drain the queue under the lock, then deliver outside it to avoid holding the lock during I/O. + final List toDeliver; + synchronized (this) { + active = true; + toDeliver = new LinkedList<>(queue); + queue.clear(); + if (!toDeliver.isEmpty()) { + lastPush = Instant.now(); + } + } + + for (final Packet packet : toDeliver) { try { - session.deliver(queue.pollLast()); + session.deliver(packet); } catch (UnauthorizedException e) { Log.error("Unexpected exception while activating CSI.", e); } From 69e60ea3c233ad1181123dc410cb01dc9e587b4b Mon Sep 17 00:00:00 2001 From: Guus der Kinderen Date: Thu, 7 May 2026 16:45:42 +0200 Subject: [PATCH 2/5] OF-3272: Fix concurrency (and off-by-one) issue in CSI The implementation was partially synchronized, but lacked synchronization on activate/deactivate. By marking both methods as synchronized, there no longer is a data race. In activate, the rescheduling of queued stanzas was previously performed outside of a lock, although its execution (in `queueOrPush`) was fully synchronized. Because of that, I don't expect this commit to introduce significant more (possibly problematic, remember Stream Management?) locking. This commit also addresses a minor off-by-one error when checking the queue capacity. --- .../jivesoftware/openfire/csi/CsiManager.java | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java b/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java index 6b26bfbe77..65c527b355 100644 --- a/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java +++ b/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java @@ -135,24 +135,17 @@ public synchronized void process(@Nonnull final Element nonza) /** * Switch to the client state of 'active'. */ - public void activate() + public synchronized void activate() { Log.trace("Session for '{}' to CSI 'active'", session.getAddress()); + active = true; - // Drain the queue under the lock, then deliver outside it to avoid holding the lock during I/O. - final List toDeliver; - synchronized (this) { - active = true; - toDeliver = new LinkedList<>(queue); - queue.clear(); - if (!toDeliver.isEmpty()) { - lastPush = Instant.now(); - } - } - - for (final Packet packet : toDeliver) { + // Re-submit the tail of the queue through the normal queueOrPush path. Because active is now true, queueOrPush + // will flush the entire queue (including all preceding stanzas) in one atomic operation. + final Packet tail = queue.pollLast(); + if (tail != null) { try { - session.deliver(packet); + session.deliver(tail); } catch (UnauthorizedException e) { Log.error("Unexpected exception while activating CSI.", e); } @@ -162,7 +155,7 @@ public void activate() /** * Switch to the client state of 'inactive'. */ - public void deactivate() + public synchronized void deactivate() { Log.trace("Session for '{}' to CSI 'inactive'", session.getAddress()); active = false; @@ -202,7 +195,7 @@ public synchronized List queueOrPush(@Nonnull final Packet packet) final boolean mustPush = !DELAY_ENABLED.getValue() // The feature is disabled by configuration. Always send stanzas immediately. || active // The client is active! Do not delay. - || queue.size() > DELAY_QUEUE_CAPACITY.getValue() // The delay queue has reached its capacity. Flush the entire thing. + || queue.size() >= DELAY_QUEUE_CAPACITY.getValue() // The delay queue has reached its capacity. Flush the entire thing. || Instant.now().isAfter(lastPush.plus(DELAY_MAX_DURATION.getValue())) // Ensure that periodically, delayed data is sent anyway. || !canDelay(packet); From e672ef8bc3fdb8a082b3e2bb025f1ba64881ed2c Mon Sep 17 00:00:00 2001 From: Guus der Kinderen Date: Thu, 7 May 2026 21:54:33 +0200 Subject: [PATCH 3/5] OF-3272: Increase unit test coverage for CsiManager --- .../openfire/csi/CsiManagerTest.java | 807 +++++++++++++++++- 1 file changed, 802 insertions(+), 5 deletions(-) diff --git a/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java b/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java index 33d30ad1c9..1f16308828 100644 --- a/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java +++ b/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023-2025 Ignite Realtime Foundation. All rights reserved. + * Copyright (C) 2023-2026 Ignite Realtime Foundation. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,33 +18,518 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.XMPPPacketReader; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.xmlpull.v1.XmlPullParserException; import org.xmpp.packet.IQ; import org.xmpp.packet.Message; import org.xmpp.packet.Packet; import org.xmpp.packet.Presence; +import org.jivesoftware.openfire.session.LocalClientSession; import java.io.IOException; import java.io.StringReader; +import java.util.Collections; +import java.util.List; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; /** * Verifies the implementation of {@link CsiManager} * * @author Guus der Kinderen, guus@goodbytes.nl */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class CsiManagerTest { + @Mock + private LocalClientSession mockSession; + + private CsiManager csiManager; + + @BeforeEach + public void setUp() throws Exception + { + csiManager = new CsiManager(mockSession); + // Wire the mock so that deliver() forwards the stanza back through queueOrPush(), + // mirroring what LocalClientSession.deliver() does in production. + doAnswer(invocation -> { + csiManager.queueOrPush(invocation.getArgument(0)); + return Collections.emptyList(); // For some invocations (when 'mustpush==true), this is expected to return the queue, otherwise it returns an empty list. We're picking one of the two here. This is a simplification for testing purposes. + }).when(mockSession).deliver(any(Packet.class)); + } + + /** + * Verifies initial state is active. + */ + @Test + public void testInitialStateIsActive() + { + assertTrue(csiManager.isActive(), "CsiManager should initialize with active state"); + } + + /** + * Verifies activation updates CSI state. + */ + @Test + public void testActivate() + { + // Setup test fixture + csiManager.deactivate(); + assertFalse(csiManager.isActive(), "CsiManager should be inactive"); + + // Execute system under test + csiManager.activate(); + + // Verify result + assertTrue(csiManager.isActive(), "CsiManager should be active after calling activate()"); + } + + /** + * Verifies deactivation updates CSI state. + */ + @Test + public void testDeactivate() + { + // Setup test fixture + assertTrue(csiManager.isActive(), "CsiManager should be active"); + + // Execute system under test + csiManager.deactivate(); + + // Verify result + assertFalse(csiManager.isActive(), "CsiManager should be inactive after calling deactivate()"); + } + + /** + * Verifies processing of active nonza. + */ + @Test + public void testProcessActiveNonza() throws Exception + { + // Setup test fixture + csiManager.deactivate(); + final Element activeElement = parseElement(""); + + // Execute system under test + csiManager.process(activeElement); + + // Verify result + assertTrue(csiManager.isActive(), "CsiManager should be active after processing active nonza"); + } + + /** + * Verifies processing of inactive nonza. + */ + @Test + public void testProcessInactiveNonza() throws Exception + { + // Setup test fixture + final Element inactiveElement = parseElement(""); + + // Execute system under test + csiManager.process(inactiveElement); + + // Verify result + assertFalse(csiManager.isActive(), "CsiManager should be inactive after processing inactive nonza"); + } + + /** + * Verifies initial queue size is zero. + */ + @Test + public void testInitialQueueSizeIsZero() + { + assertEquals(0, csiManager.getDelayQueueSize(), "Initial queue size should be zero"); + } + + /** + * Verifies delay queue size after queueing. + */ + @Test + public void testQueueSizeAfterQueueing() throws Exception + { + // Setup test fixture + csiManager.deactivate(); + final Packet delayableMessage = createDelayableMessage(); + + // Execute system under test + csiManager.queueOrPush(delayableMessage); + + // Verify result + assertEquals(1, csiManager.getDelayQueueSize(), "Queue size should be 1 after queueing one message"); + } + + /** + * Verifies queueOrPush behavior when active flushes immediately. + */ + @Test + public void testQueueOrPushWhenActiveFlushesImmediately() throws Exception + { + // Setup test fixture + assertTrue(csiManager.isActive(), "CsiManager should be active"); + final Packet delayableMessage = createDelayableMessage(); + + // Execute system under test + final List result = csiManager.queueOrPush(delayableMessage); + + // Verify result + assertEquals(1, result.size(), "Should return the message immediately when active"); + assertEquals(0, csiManager.getDelayQueueSize(), "Queue should be empty when active"); + } + + /** + * Verifies queueOrPush behavior when inactive queues. + */ + @Test + public void testQueueOrPushWhenInactiveQueues() throws Exception + { + // Setup test fixture + csiManager.deactivate(); + final Packet delayableMessage = createDelayableMessage(); + + // Execute system under test + final List result = csiManager.queueOrPush(delayableMessage); + + // Verify result + assertTrue(result.isEmpty(), "Should queue the message when inactive"); + assertEquals(1, csiManager.getDelayQueueSize(), "Queue should contain the message"); + } + + /** + * Verifies queueOrPush behavior when non delayable packet always flushed. + */ + @Test + public void testQueueOrPushNonDelayablePacketAlwaysFlushed() throws Exception + { + // Setup test fixture + csiManager.deactivate(); + final Packet nonDelayableIQ = createIQ(); + + // Execute system under test + final List result = csiManager.queueOrPush(nonDelayableIQ); + + // Verify result + assertEquals(1, result.size(), "Should return non-delayable packet immediately"); + assertEquals(0, csiManager.getDelayQueueSize(), "Queue should remain empty"); + } + + /** + * Verifies queueOrPush behavior when returns both queued and new packet when flushing. + */ + @Test + public void testQueueOrPushReturnsBothQueuedAndNewPacketWhenFlushing() throws Exception + { + // Setup test fixture: queue one delayable packet while inactive. + csiManager.deactivate(); + final Packet queuedMessage = createDelayableMessage(); + final List initiallyQueued = csiManager.queueOrPush(queuedMessage); + + assertTrue(initiallyQueued.isEmpty(), "First delayable message should be queued while client is inactive"); + assertEquals(1, csiManager.getDelayQueueSize(), "First delayable message should be queued"); + + // Execute system under test: a non-delayable packet should force an immediate flush that returns both the + // previously queued packet and the new packet. + final Packet nonDelayableIQ = createIQ(); + final List result = csiManager.queueOrPush(nonDelayableIQ); + + // Verify result + assertEquals(2, result.size(), "Flushing should return both the queued packet and the new packet"); + assertTrue(result.contains(queuedMessage), "Flushed result should contain the previously queued packet"); + assertTrue(result.contains(nonDelayableIQ), "Flushed result should contain the new non-delayable packet"); + assertEquals(0, csiManager.getDelayQueueSize(), "Queue should be empty after flushing"); + } + + /** + * Verifies activation updates CSI state and queued stanza behavior. + */ + @Test + public void testActivateWithQueuedMessagesFlushesAll() throws Exception + { + // Setup test fixture + csiManager.deactivate(); + final Packet message1 = createDelayableMessage(); + final Packet message2 = createDelayableMessage(); + + csiManager.queueOrPush(message1); + csiManager.queueOrPush(message2); + + assertEquals(2, csiManager.getDelayQueueSize(), "Should have 2 messages queued"); + + // Execute system under test + csiManager.activate(); + + // Verify result - after activation, the queue should be flushed + // Note: activate() only delivers the tail, but that should trigger a flush as the delivery immediately invokes + // the next operation. The queue should be empty after session.deliver() is called with the tail. + assertEquals(0, csiManager.getDelayQueueSize(), "After activate, the delay queue should be flushed."); + } + + /** + * Verifies that an IQ stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayIQReturnsFalse() throws Exception + { + // Setup test fixture + final Packet iq = createIQ(); + + // Execute system under test + final boolean result = CsiManager.canDelay(iq); + + // Verify result + assertFalse(result, "IQ stanzas should not be delayable"); + } + + /** + * Verifies that an available presence stanza is identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayAvailablePresenceReturnsTrue() throws Exception + { + // Setup test fixture - Note: Presence with type other than null/unavailable is not delayable + final Packet presenceAvailable = parse(""" + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(presenceAvailable); + + // Verify result + assertTrue(result, "Available presence (type=null) should be delayable"); + } + + /** + * Verifies that an unavailable presence stanza is identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayUnavailablePresenceReturnsTrue() throws Exception + { + // Setup test fixture + final Packet presenceUnavailable = parse(""" + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(presenceUnavailable); + + // Verify result + assertTrue(result, "Unavailable presence should be delayable"); + } + + /** + * Verifies that a subscription request presence stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayPresenceWithOtherTypeReturnsFalse() throws Exception + { + // Setup test fixture + final Packet presenceSubscribed = parse(""" + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(presenceSubscribed); + + // Verify result + assertFalse(result, "Presence with type other than null/unavailable should not be delayable"); + } + + /** + * Verifies that a MUC self-presence stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelaySelfPresenceFromMUCReturnsFalse() throws Exception + { + // Setup test fixture + final Packet selfPresence = parse(""" + + + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(selfPresence); + + // Verify result + assertFalse(result, "MUC self-presence should not be delayable"); + } + + /** + * Verifies that a message stanza with a body is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayMessageWithBodyReturnsFalse() throws Exception + { + // Setup test fixture - Messages WITH body are important and not delayable + final Packet messageWithBody = parse(""" + + Hello, World! + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(messageWithBody); + + // Verify result + assertFalse(result, "Messages with body should not be delayable"); + } + + /** + * Verifies that a chat state notification stanza is identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayChatStateNotificationReturnsTrue() throws Exception + { + // Setup test fixture - Messages WITHOUT body (like chat state notifications) are delayable + final Packet chatStateNotification = parse(""" + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(chatStateNotification); + + // Verify result + assertTrue(result, "Chat state notifications (messages without body) should be delayable"); + } + + /** + * Verifies that a typing notification stanza is identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayTypingNotificationReturnsTrue() throws Exception + { + // Setup test fixture - Typing notifications (paused) should be delayable + final Packet typingPaused = parse(""" + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(typingPaused); + + // Verify result + assertTrue(result, "Typing paused notifications should be delayable"); + } + + /** + * Verifies that a message delivery receipt stanza is identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayReceiptNotificationReturnsTrue() throws Exception + { + // Setup test fixture - Message delivery receipts should be delayable + final Packet receipt = parse(""" + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(receipt); + + // Verify result + assertTrue(result, "Message delivery receipts should be delayable"); + } + + /** + * Verifies that a (non-empty) MUC room subject change stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayMUCRoomSubjectReturnsFalse() throws Exception + { + // Setup test fixture + final Packet roomSubject = parse(""" + + Room Subject + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(roomSubject); + + // Verify result + assertFalse(result, "MUC room subject should not be delayable"); + } + + /** + * Verifies that an empty MUC room subject change stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayEmptyMUCRoomSubjectReturnsFalse() throws Exception + { + // Setup test fixture + final Packet emptyRoomSubject = parse(""" + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(emptyRoomSubject); + + // Verify result + assertFalse(result, "Empty MUC room subject should not be delayable"); + } + + /** + * Verifies that a MUC room invitation stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayMUCInvitationReturnsFalse() throws Exception + { + // Setup test fixture + final Packet mucInvitation = parse(""" + + + + Come join our groupchat! + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(mucInvitation); + + // Verify result + assertFalse(result, "MUC invitations should not be delayable"); + } + + /** + * Verifies that an OMEMO-encrypted message stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayOMEMOMessageReturnsFalse() throws Exception + { + // Setup test fixture + final Packet omemoMessage = parse(""" + + +
+ base64key +
+ base64payload +
+
"""); + + // Execute system under test + final boolean result = CsiManager.canDelay(omemoMessage); + + // Verify result + assertFalse(result, "OMEMO encrypted messages should not be delayable"); + } + /** - * Verifies that a stanza Indicating Intent to Start a Session (Jingle) is not identified as a stanza that - * can be delayed/queued in context of CSI. + * Verifies that a stanza Indicating Intent to Start a Session (Jingle) is not identified as a stanza that can be delayed/queued in context of CSI. * * @see OF-2750: CSI-enabled client does not receive Jingle invitations */ @Test - public void testJinglePropose() throws Exception + public void testCanDelayJingleProposeReturnsFalse() throws Exception { // Setup test fixture. final Packet input = parse(""" @@ -64,6 +549,318 @@ public void testJinglePropose() throws Exception assertFalse(result); } + /** + * Verifies that a Jingle 'accept' stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayJingleAcceptReturnsFalse() throws Exception + { + // Setup test fixture + final Packet jingleAccept = parse(""" + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(jingleAccept); + + // Verify result + assertFalse(result, "Jingle accept messages should not be delayable"); + } + + /** + * Verifies that a Jingle 'reject' stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayJingleRejectReturnsFalse() throws Exception + { + // Setup test fixture + final Packet jingleReject = parse(""" + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(jingleReject); + + // Verify result + assertFalse(result, "Jingle reject messages should not be delayable"); + } + + /** + * Verifies that a group chat message stanza with a body is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayProceedingToGroupChatReturnsFalse() throws Exception + { + // Setup test fixture - Groupchat messages with body are not delayable + final Packet groupchatMessage = parse(""" + + Group message + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(groupchatMessage); + + // Verify result + assertFalse(result, "Groupchat messages with body are not delayable"); + } + + /** + * Verifies that a headline message stanza is identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayHeadlineMessageReturnsTrue() throws Exception + { + // Setup test fixture - Headline messages should be delayable + final Packet headlineMessage = parse(""" + + News Headline + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(headlineMessage); + + // Verify result + assertTrue(result, "Headline messages without body should be delayable"); + } + + /** + * Verifies that an HTTP File Upload slot response (an IQ stanza) is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayHTTPUploadSlotResponseReturnsFalse() throws Exception + { + // Setup test fixture - HTTP Upload slot responses contain important data + final Packet uploadSlot = parse(""" + + + + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(uploadSlot); + + // Verify result + assertFalse(result, "IQ stanzas should not be delayable"); + } + + /** + * Verifies that a 'subscribed' presence stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelaySubscribedPresenceReturnsFalse() throws Exception + { + // Setup test fixture - Subscribed presence (type='subscribed') is important + final Packet subscribedPresence = parse(""" + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(subscribedPresence); + + // Verify result + assertFalse(result, "Subscribed presence should not be delayable"); + } + + /** + * Verifies that a 'subscribe' presence stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelaySubscriptionPresenceReturnsFalse() throws Exception + { + // Setup test fixture + final Packet subscribePresence = parse(""" + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(subscribePresence); + + // Verify result + assertFalse(result, "Subscription presence should not be delayable"); + } + + /** + * Verifies that an 'unsubscribe' presence stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayUnsubscribePresenceReturnsFalse() throws Exception + { + // Setup test fixture + final Packet unsubscribePresence = parse(""" + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(unsubscribePresence); + + // Verify result + assertFalse(result, "Unsubscribe presence should not be delayable"); + } + + /** + * Verifies that a 'probe' presence stanza is not identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayProbePresenceReturnsFalse() throws Exception + { + // Setup test fixture + final Packet probePresence = parse(""" + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(probePresence); + + // Verify result + assertFalse(result, "Probe presence should not be delayable"); + } + + /** + * Verifies that a message stanza without a body is identified as a stanza that can be delayed/queued in context of CSI. + */ + @Test + public void testCanDelayNormalMessageWithoutBodyReturnsTrue() throws Exception + { + // Setup test fixture - Normal (type not specified or type="normal") messages without body + final Packet normalMessage = parse(""" + + + """); + + // Execute system under test + final boolean result = CsiManager.canDelay(normalMessage); + + // Verify result + assertTrue(result, "Normal messages without body should be delayable"); + } + + /** + * Verifies CSI nonza recognition for an 'active' element. + */ + @Test + public void testIsCsiNonzaWithActiveElement() throws Exception + { + // Setup test fixture + final Element activeElement = parseElement(""); + + // Execute system under test + final boolean result = CsiManager.isCsiNonza(activeElement); + + // Verify result + assertTrue(result, "Active element should be recognized as CSI nonza"); + } + + /** + * Verifies CSI nonza recognition for an 'inactive' element. + */ + @Test + public void testIsCsiNonzaWithInactiveElement() throws Exception + { + // Setup test fixture + final Element inactiveElement = parseElement(""); + + // Execute system under test + final boolean result = CsiManager.isCsiNonza(inactiveElement); + + // Verify result + assertTrue(result, "Inactive element should be recognized as CSI nonza"); + } + + /** + * Verifies CSI nonza recognition with an invalid namespace. + */ + @Test + public void testIsCsiNonzaWithInvalidNamespace() throws Exception + { + // Setup test fixture + final Element invalidElement = parseElement(""); + + // Execute system under test + final boolean result = CsiManager.isCsiNonza(invalidElement); + + // Verify result + assertFalse(result, "Element with invalid namespace should not be recognized as CSI nonza"); + } + + /** + * Verifies CSI nonza recognition with an invalid name. + */ + @Test + public void testIsCsiNonzaWithInvalidName() throws Exception + { + // Setup test fixture + final Element invalidElement = parseElement(""); + + // Execute system under test + final boolean result = CsiManager.isCsiNonza(invalidElement); + + // Verify result + assertFalse(result, "Element with invalid name should not be recognized as CSI nonza"); + } + + /** + * Verifies CSI nonza recognition with a null element. + */ + @Test + public void testIsCsiNonzaWithNullElement() + { + // Execute system under test + final boolean result = CsiManager.isCsiNonza(null); + + // Verify result + assertFalse(result, "Null element should not be recognized as CSI nonza"); + } + + /** + * Verifies CSI nonza recognition with a message element. + */ + @Test + public void testIsCsiNonzaWithMessageElement() throws Exception + { + // Setup test fixture + final Packet message = createDelayableMessage(); + final Element messageElement = message.getElement(); + + // Execute system under test + final boolean result = CsiManager.isCsiNonza(messageElement); + + // Verify result + assertFalse(result, "Message element should not be recognized as CSI nonza"); + } + + /** + * Creates a delayable message for testing purposes. + * + * This is a chat state notification (message without body), which is delayable. + */ + private Packet createDelayableMessage() throws Exception + { + return parse(""" + + + """); + } + + /** + * Creates an IQ stanza for testing purposes. + */ + private Packet createIQ() throws Exception + { + return parse(""" + + + """); + } + + /** + * Parses an element from a string representation. + */ + private Element parseElement(final String input) throws DocumentException, XmlPullParserException, IOException + { + final XMPPPacketReader reader = new XMPPPacketReader(); + return reader.read(new StringReader(input)).getRootElement(); + } + /** * Tries to parse a stanza from an input text. This method throws an exception when the input cannot be parsed. * From b62ea668d9cdb9dcae3b4dff1af90dbccf331c62 Mon Sep 17 00:00:00 2001 From: Guus der Kinderen Date: Fri, 8 May 2026 11:25:38 +0200 Subject: [PATCH 4/5] OF-3272 CSI: flush delayed queue outside lock and serialize activation flush Refactor CSI activation flushing to avoid holding the CsiManager monitor during delivery I/O and preserve stanza ordering across activation. --- .../jivesoftware/openfire/csi/CsiManager.java | 92 ++++++++++--- .../openfire/session/LocalClientSession.java | 20 ++- .../openfire/csi/CsiManagerTest.java | 127 ++++++++++++++++-- 3 files changed, 208 insertions(+), 31 deletions(-) diff --git a/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java b/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java index 65c527b355..a05aede8a9 100644 --- a/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java +++ b/xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java @@ -107,6 +107,15 @@ public class CsiManager */ private final Deque queue = new LinkedList<>(); + /** + * Indicates that activation-triggered queue flushing is in progress. + * + * While true, this field serves two purposes: + * 1) queue gating: {@link #queueOrPush(Packet)} keeps new stanzas queued (not pushed) to preserve ordering; + * 2) flusher ownership: concurrent {@link #activate()} calls do not start a second flush loop. + */ + private boolean flushingOnActivate = false; + public CsiManager(@Nonnull final LocalClientSession session) { this.session = session; @@ -118,7 +127,7 @@ public CsiManager(@Nonnull final LocalClientSession session) * * @param nonza The CSI nonza to be processed. */ - public synchronized void process(@Nonnull final Element nonza) + public void process(@Nonnull final Element nonza) { switch(nonza.getName()) { case "active": @@ -135,19 +144,51 @@ public synchronized void process(@Nonnull final Element nonza) /** * Switch to the client state of 'active'. */ - public synchronized void activate() + public void activate() { - Log.trace("Session for '{}' to CSI 'active'", session.getAddress()); - active = true; - - // Re-submit the tail of the queue through the normal queueOrPush path. Because active is now true, queueOrPush - // will flush the entire queue (including all preceding stanzas) in one atomic operation. - final Packet tail = queue.pollLast(); - if (tail != null) { - try { - session.deliver(tail); - } catch (UnauthorizedException e) { - Log.error("Unexpected exception while activating CSI.", e); + synchronized (this) + { + Log.trace("Session for '{}' to CSI 'active'", session.getAddress()); + active = true; + + // If another thread is already flushing the queue as part of activation, avoid starting a second flusher. + if (flushingOnActivate) { + return; + } + + flushingOnActivate = true; + } + + try { + while (true) { + final List stanzasToPush; + synchronized (this) { + // Stop flushing as soon as the session became inactive again. + if (!active) { + flushingOnActivate = false; + return; + } + + stanzasToPush = drainQueue(); + if (stanzasToPush.isEmpty()) { + flushingOnActivate = false; + lastPush = Instant.now(); + return; + } + } + + // I/O is intentionally performed outside the CSI lock. + session.pushPackets(stanzasToPush); + + synchronized (this) { + lastPush = Instant.now(); + } + } + } catch (UnauthorizedException e) { + Log.error("Unexpected exception while activating CSI.", e); + } finally { + synchronized (this) { + flushingOnActivate = false; } } } @@ -192,12 +233,13 @@ public synchronized List queueOrPush(@Nonnull final Packet packet) { queue.add(packet); - final boolean mustPush = - !DELAY_ENABLED.getValue() // The feature is disabled by configuration. Always send stanzas immediately. - || active // The client is active! Do not delay. - || queue.size() >= DELAY_QUEUE_CAPACITY.getValue() // The delay queue has reached its capacity. Flush the entire thing. - || Instant.now().isAfter(lastPush.plus(DELAY_MAX_DURATION.getValue())) // Ensure that periodically, delayed data is sent anyway. - || !canDelay(packet); + final boolean mustPush = !flushingOnActivate // Never flush while activation is in progress, as this can cause out-of-order delivery. + && ( !DELAY_ENABLED.getValue() // The feature is disabled by configuration. Always send stanzas immediately. + || active // The client is active! Do not delay. + || queue.size() >= DELAY_QUEUE_CAPACITY.getValue() // The delay queue has reached its capacity. Flush the entire thing. + || Instant.now().isAfter(lastPush.plus(DELAY_MAX_DURATION.getValue())) // Ensure that periodically, delayed data is sent anyway. + || !canDelay(packet) + ); final List result = new LinkedList<>(); if (mustPush) { @@ -211,6 +253,18 @@ public synchronized List queueOrPush(@Nonnull final Packet packet) return result; } + /** + * Returns all queued stanzas and clears the queue. + * + * @return The queued stanzas + */ + private List drainQueue() + { + final List result = new LinkedList<>(queue); + queue.clear(); + return result; + } + /** * Inspects a stanza and evaluates if it is eligible for delayed delivery to inactive clients. * diff --git a/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java b/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java index fb97cdbe5a..414c386f59 100644 --- a/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java +++ b/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java @@ -962,11 +962,25 @@ static void returnPrivacyListErrorToSender(Packet packet) { } @Override - public void deliver(Packet queueOrPushStanza) throws UnauthorizedException { - // Queue this stanza, possibly returning it immediately in line with any previously queued stanzas if this - // stanza needs to be pushed to the client immediately. + public void deliver(Packet queueOrPushStanza) throws UnauthorizedException + { final List stanzasToPush = csiManager.queueOrPush(queueOrPushStanza); + if (stanzasToPush.isEmpty()) { + return; + } + pushPackets(stanzasToPush); + } + /** + * Delivers stanzas to the client, without evaluating CSI. + * + * This method should generally not be used, as it is designed to be used by the CSI implementation specifically. + * Prefer using {@link #deliver(Packet)} instead. + * + * @param stanzasToPush The stanzas to deliver + */ + public void pushPackets(@Nonnull final List stanzasToPush) throws UnauthorizedException + { if (stanzasToPush.isEmpty()) { return; } diff --git a/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java b/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java index 1f16308828..fb77d5bf35 100644 --- a/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java +++ b/xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java @@ -34,12 +34,18 @@ import java.io.IOException; import java.io.StringReader; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Verifies the implementation of {@link CsiManager} @@ -59,12 +65,7 @@ public class CsiManagerTest public void setUp() throws Exception { csiManager = new CsiManager(mockSession); - // Wire the mock so that deliver() forwards the stanza back through queueOrPush(), - // mirroring what LocalClientSession.deliver() does in production. - doAnswer(invocation -> { - csiManager.queueOrPush(invocation.getArgument(0)); - return Collections.emptyList(); // For some invocations (when 'mustpush==true), this is expected to return the queue, otherwise it returns an empty list. We're picking one of the two here. This is a simplification for testing purposes. - }).when(mockSession).deliver(any(Packet.class)); + doAnswer(invocation -> null).when(mockSession).pushPackets(any()); } /** @@ -268,9 +269,117 @@ public void testActivateWithQueuedMessagesFlushesAll() throws Exception csiManager.activate(); // Verify result - after activation, the queue should be flushed - // Note: activate() only delivers the tail, but that should trigger a flush as the delivery immediately invokes - // the next operation. The queue should be empty after session.deliver() is called with the tail. assertEquals(0, csiManager.getDelayQueueSize(), "After activate, the delay queue should be flushed."); + verify(mockSession, times(1)).pushPackets(any()); + } + + /** + * Verifies that deactivation interrupts activation-triggered flushing after the in-flight push completes. + */ + @Test + public void testDeactivateStopsActivationFlushing() throws Exception + { + final Packet message1 = createDelayableMessage(); + final Packet message2 = createDelayableMessage(); + final Packet message3 = createDelayableMessage(); + final Packet message4 = createDelayableMessage(); + + csiManager.deactivate(); + csiManager.queueOrPush(message1); + csiManager.queueOrPush(message2); + + final CountDownLatch firstPushStarted = new CountDownLatch(1); + final CountDownLatch unblockFirstPush = new CountDownLatch(1); + final AtomicInteger pushCount = new AtomicInteger(0); + final List pushedBatchSizes = Collections.synchronizedList(new ArrayList<>()); + + doAnswer(invocation -> { + final List pushed = invocation.getArgument(0); + pushedBatchSizes.add(pushed.size()); + if (pushCount.incrementAndGet() == 1) { + firstPushStarted.countDown(); + assertTrue(unblockFirstPush.await(5, TimeUnit.SECONDS), "Timed out waiting to unblock first push"); + } + return null; + }).when(mockSession).pushPackets(any()); + + final Thread activation = new Thread(csiManager::activate); + activation.start(); + + assertTrue(firstPushStarted.await(5, TimeUnit.SECONDS), "First push should start"); + + // While activation is flushing, newly queued stanzas should remain queued. + assertTrue(csiManager.queueOrPush(message3).isEmpty()); + csiManager.deactivate(); + + unblockFirstPush.countDown(); + activation.join(5000); + assertFalse(activation.isAlive(), "Activation thread should have finished"); + + assertFalse(csiManager.isActive(), "Session should be inactive"); + assertEquals(1, csiManager.getDelayQueueSize(), "Messages queued during flush should remain queued after deactivation"); + + // Still inactive: additional delayable stanza is queued, not pushed. + assertTrue(csiManager.queueOrPush(message4).isEmpty()); + assertEquals(2, csiManager.getDelayQueueSize(), "Additional delayable stanza should be queued while inactive"); + + // Reactivating should flush the remaining queued stanzas in one batch. + csiManager.activate(); + assertEquals(0, csiManager.getDelayQueueSize()); + assertEquals(List.of(2, 2), pushedBatchSizes, "Expected one initial batch and one batch after reactivation"); + } + + /** + * Verifies that overlapping activate() calls are handled by one flusher thread. + */ + @Test + public void testDualActivateUsesSingleFlusher() throws Exception + { + final Packet message1 = createDelayableMessage(); + final Packet message2 = createDelayableMessage(); + final Packet message3 = createDelayableMessage(); + + csiManager.deactivate(); + csiManager.queueOrPush(message1); + csiManager.queueOrPush(message2); + + final CountDownLatch firstPushStarted = new CountDownLatch(1); + final CountDownLatch unblockFirstPush = new CountDownLatch(1); + final AtomicInteger pushCount = new AtomicInteger(0); + final List pushedBatchSizes = Collections.synchronizedList(new ArrayList<>()); + + doAnswer(invocation -> { + final List pushed = invocation.getArgument(0); + pushedBatchSizes.add(pushed.size()); + + if (pushCount.incrementAndGet() == 1) { + firstPushStarted.countDown(); + assertTrue(unblockFirstPush.await(5, TimeUnit.SECONDS), "Timed out waiting to unblock first push"); + } + return null; + }).when(mockSession).pushPackets(any()); + + final Thread firstActivator = new Thread(csiManager::activate); + firstActivator.start(); + + assertTrue(firstPushStarted.await(5, TimeUnit.SECONDS), "First activate() should start flushing"); + + final Thread secondActivator = new Thread(csiManager::activate); + secondActivator.start(); + secondActivator.join(1000); + assertFalse(secondActivator.isAlive(), "Second activate() should return quickly while flush is in progress"); + + // While the first flush is in-flight, this stanza should be queued and then flushed by the first activator. + assertTrue(csiManager.queueOrPush(message3).isEmpty()); + + unblockFirstPush.countDown(); + firstActivator.join(5000); + assertFalse(firstActivator.isAlive(), "First activate() should finish"); + + assertTrue(csiManager.isActive(), "Session should remain active"); + assertEquals(0, csiManager.getDelayQueueSize(), "All queued stanzas should be flushed"); + assertEquals(2, pushCount.get(), "Only one flusher loop should perform two push cycles"); + assertEquals(List.of(2, 1), pushedBatchSizes, "Expected initial flush and one follow-up batch"); } /** @@ -591,7 +700,7 @@ public void testCanDelayJingleRejectReturnsFalse() throws Exception * Verifies that a group chat message stanza with a body is not identified as a stanza that can be delayed/queued in context of CSI. */ @Test - public void testCanDelayProceedingToGroupChatReturnsFalse() throws Exception + public void testCanDelayGroupChatMessageWithBodyReturnsFalse() throws Exception { // Setup test fixture - Groupchat messages with body are not delayable final Packet groupchatMessage = parse(""" From 97ca746a9284ff25193f69df6c39b15e57d2bc8f Mon Sep 17 00:00:00 2001 From: Guus der Kinderen Date: Fri, 8 May 2026 21:48:46 +0200 Subject: [PATCH 5/5] (code review) remove redundant check for empty collection --- .../jivesoftware/openfire/session/LocalClientSession.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java b/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java index 414c386f59..432d5a2888 100644 --- a/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java +++ b/xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java @@ -964,11 +964,7 @@ static void returnPrivacyListErrorToSender(Packet packet) { @Override public void deliver(Packet queueOrPushStanza) throws UnauthorizedException { - final List stanzasToPush = csiManager.queueOrPush(queueOrPushStanza); - if (stanzasToPush.isEmpty()) { - return; - } - pushPackets(stanzasToPush); + pushPackets(csiManager.queueOrPush(queueOrPushStanza)); } /**