From 92e19c8333fa12c4035ffc8c965496fe76806310 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 13:27:10 +0200 Subject: [PATCH 01/15] fix(ci): force java codeql builds to rerun --- .github/workflows/codeql.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 6aab7d9..b5cbe89 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -137,8 +137,10 @@ jobs: if: matrix.language == 'java-kotlin' shell: bash run: | - # The Java/Kotlin extractor only needs JVM and Android compilation commands. - # Avoid clean/full assemble so PR scans do not rebuild iOS/native artifacts. + # CodeQL must observe real compiler invocations, so do not let Gradle satisfy + # these tasks from the build cache or up-to-date checks. + # Keep the task list scoped to JVM and Android compilation so this stays faster + # than a full build and avoids unnecessary iOS/native work. ./gradlew \ :meshlink:jvmJar \ :meshlink:androidJar \ @@ -147,6 +149,8 @@ jobs: :meshlink-reference:bundleAndroidMainAar \ :meshlink-reference:android:compileDebugKotlin \ :meshlink-proof:android:compileDebugKotlin \ + --rerun-tasks \ + --no-build-cache \ --no-daemon \ --console=plain From 2ba0894e70b53033fdd0e07d8f3b05976009416e Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 13:35:18 +0200 Subject: [PATCH 02/15] test: relax large-transfer route discovery timing --- .../integration/LargeTransferIntegrationTest.kt | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index cba19d0..16b1c3d 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -30,6 +30,7 @@ import kotlinx.coroutines.withTimeoutOrNull class LargeTransferIntegrationTest { private companion object { private const val TEST_TIMING_SLACK_MULTIPLIER: Long = 4 + private const val ROUTE_DISCOVERY_TIMEOUT_MILLIS: Long = 8_000 } @Test @@ -55,7 +56,7 @@ class LargeTransferIntegrationTest { code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { @@ -91,7 +92,7 @@ class LargeTransferIntegrationTest { code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) val frameCountBeforeSend = harness.sentFrames(sender).size val receivedMessageDeferred = @@ -143,7 +144,7 @@ class LargeTransferIntegrationTest { code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) val sendResultDeferred = async { sender.meshLink.send(recipient.peerId, payload) } val receivedMessageDeferred = @@ -245,7 +246,7 @@ class LargeTransferIntegrationTest { awaitDiagnostic( diagnostics = restartedSender.diagnosticSink::events, code = DiagnosticCode.ROUTE_DISCOVERED, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) // Act @@ -294,7 +295,7 @@ class LargeTransferIntegrationTest { code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) val sendResultDeferred = async { sender.meshLink.send(recipient.peerId, payload) } val receivedMessageDeferred = async { @@ -349,7 +350,7 @@ class LargeTransferIntegrationTest { code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { @@ -400,7 +401,7 @@ class LargeTransferIntegrationTest { code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) harness.dropNextDeliveries(recipient, sender, count = 256) @@ -437,7 +438,7 @@ class LargeTransferIntegrationTest { code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, - timeoutMillis = 5_000, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, ) val recipientFrameCountBeforeSend = harness.sentFrames(recipient).size val receivedMessageDeferred = From 3960abd8d39ac9d50748db2b1fba624f23e58188 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 13:42:25 +0200 Subject: [PATCH 03/15] fix(ci): keep linux verification single-worker --- .github/workflows/ci.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 71cd2a0..04a605c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,6 +41,9 @@ jobs: - name: Verify MeshLink SDK on Linux run: | + # The hosted Linux runners still expose tight CPU budgets for this suite. + # Keep verification single-worker so the routing and transfer integration + # tests do not starve while JVM and Android host tasks execute. ./gradlew \ :meshlink:jvmTest \ :meshlink:testAndroidHostTest \ @@ -49,6 +52,7 @@ jobs: :meshlink:koverVerify \ verifyDocs \ checkAgp9Invariants \ + --max-workers=1 \ --console=plain - name: Upload verification reports From 6982b39e8a0b49e585d8650cf9d1eb8aed766280 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 13:49:51 +0200 Subject: [PATCH 04/15] test: remove flaky large-transfer pre-route wait --- .../meshlink/integration/LargeTransferIntegrationTest.kt | 7 ------- 1 file changed, 7 deletions(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index 16b1c3d..4cfd112 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -51,13 +51,6 @@ class LargeTransferIntegrationTest { relay.meshLink.start() recipient.meshLink.start() testDelay(250) - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, - ) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { testWithTimeout(6_000) { recipient.meshLink.messages.first() } From f0e61e194c8278d3f731ab0bfaf060cd9704edd4 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 13:58:00 +0200 Subject: [PATCH 05/15] test: extend large-transfer receive timeout on android host --- .../meshlink/integration/LargeTransferIntegrationTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index 4cfd112..1ae8605 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -53,7 +53,7 @@ class LargeTransferIntegrationTest { testDelay(250) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { - testWithTimeout(6_000) { recipient.meshLink.messages.first() } + testWithTimeout(10_000) { recipient.meshLink.messages.first() } } // Act From a664245aabe673a985eb5e486ac3131f0ca08dcf Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 15:23:26 +0200 Subject: [PATCH 06/15] test: harden virtual mesh integration harness --- .../LargeTransferIntegrationTest.kt | 75 ++++--- .../integration/MeshRoutingIntegrationTest.kt | 12 +- .../meshlink/test/VirtualMeshNetwork.kt | 208 ++++++++++++------ .../meshlink/test/VirtualMeshTransport.kt | 104 ++++++--- 4 files changed, 272 insertions(+), 127 deletions(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index 1ae8605..68342c3 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -8,6 +8,7 @@ import ch.trancee.meshlink.config.meshLinkConfig import ch.trancee.meshlink.diagnostics.DiagnosticCode import ch.trancee.meshlink.diagnostics.DiagnosticEvent import ch.trancee.meshlink.test.MeshTestHarness +import ch.trancee.meshlink.test.NodeHandle import kotlin.test.AfterTest import kotlin.test.Test import kotlin.test.assertContentEquals @@ -21,6 +22,7 @@ import kotlin.time.Duration.Companion.seconds import kotlin.time.TimeSource import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.first import kotlinx.coroutines.runBlocking @@ -51,6 +53,7 @@ class LargeTransferIntegrationTest { relay.meshLink.start() recipient.meshLink.start() testDelay(250) + prewarmRoute(sender = sender, recipient = recipient) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { testWithTimeout(10_000) { recipient.meshLink.messages.first() } @@ -80,13 +83,7 @@ class LargeTransferIntegrationTest { sender.meshLink.start() recipient.meshLink.start() testDelay(500) - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, - ) + prewarmRoute(sender = sender, recipient = recipient) val frameCountBeforeSend = harness.sentFrames(sender).size val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { @@ -132,21 +129,20 @@ class LargeTransferIntegrationTest { recipient.meshLink.start() alternateRelay.meshLink.start() testDelay(250) - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, - ) + prewarmRoute(sender = sender, recipient = recipient) + val firstRelayFrameCountBeforeSend = harness.sentFrames(firstRelay).size val sendResultDeferred = async { sender.meshLink.send(recipient.peerId, payload) } val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { - testWithTimeout(4_000) { recipient.meshLink.messages.first() } + testWithTimeout(10_000) { recipient.meshLink.messages.first() } } // Act - testDelay(250) + awaitSentFrameCountAtLeast( + harness = harness, + handle = firstRelay, + expectedCount = firstRelayFrameCountBeforeSend + 1, + ) harness.unlinkPeers(firstRelay, recipient) harness.linkPeers(sender, alternateRelay) harness.linkPeers(alternateRelay, recipient) @@ -233,8 +229,8 @@ class LargeTransferIntegrationTest { } } } - restartedSender.meshLink.start() harness.linkPeers(relay, recipient) + restartedSender.meshLink.start() restartedSenderFoundRelayDeferred.await() awaitDiagnostic( diagnostics = restartedSender.diagnosticSink::events, @@ -389,13 +385,7 @@ class LargeTransferIntegrationTest { sender.meshLink.start() recipient.meshLink.start() testDelay(250) - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, - ) + prewarmRoute(sender = sender, recipient = recipient) harness.dropNextDeliveries(recipient, sender, count = 256) // Act @@ -426,13 +416,7 @@ class LargeTransferIntegrationTest { relay.meshLink.start() recipient.meshLink.start() testDelay(250) - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, - ) + prewarmRoute(sender = sender, recipient = recipient) val recipientFrameCountBeforeSend = harness.sentFrames(recipient).size val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { @@ -499,6 +483,35 @@ class LargeTransferIntegrationTest { private fun harness(): MeshTestHarness = MeshTestHarness().also(harnesses::add) + private suspend fun prewarmRoute( + sender: NodeHandle, + recipient: NodeHandle, + payload: ByteArray = ByteArray(32) { index -> ((index * 29) % 251).toByte() }, + ): Unit = coroutineScope { + val warmupMessageDeferred = + async(start = CoroutineStart.UNDISPATCHED) { + testWithTimeout(5_000) { recipient.meshLink.messages.first() } + } + val warmupSendResult = sender.meshLink.send(recipient.peerId, payload) + val warmupMessage = warmupMessageDeferred.await() + + assertIs(warmupSendResult) + assertContentEquals(payload, warmupMessage.payload) + } + + private suspend fun awaitSentFrameCountAtLeast( + harness: MeshTestHarness, + handle: NodeHandle, + expectedCount: Int, + timeoutMillis: Long = 5_000, + ): Unit { + testWithTimeout(timeoutMillis) { + while (harness.sentFrames(handle).size < expectedCount) { + testDelay(10) + } + } + } + private suspend fun testDelay(milliseconds: Int): Unit = delay(milliseconds.toLong() * TEST_TIMING_SLACK_MULTIPLIER) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt index cbe6359..8e50a8b 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt @@ -279,12 +279,22 @@ class MeshRoutingIntegrationTest { storage = sender.storage, configOverride = senderConfig, ) + val restartedSenderFoundRelayDeferred = + async(start = CoroutineStart.UNDISPATCHED) { + testWithTimeout(5_000) { + restartedSender.meshLink.peerEvents.first { event -> + event is ch.trancee.meshlink.api.PeerEvent.Found && + event.peerId == relay.peerId + } + } + } + harness.linkPeers(relay, recipient) restartedSender.meshLink.start() val unexpectedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { testWithTimeoutOrNull(1_500) { recipient.meshLink.messages.first() } } - harness.linkPeers(relay, recipient) + restartedSenderFoundRelayDeferred.await() awaitDiagnosticForPeer( diagnostics = restartedSender.diagnosticSink::events, code = DiagnosticCode.ROUTE_DISCOVERED, diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshNetwork.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshNetwork.kt index 105d985..86146f4 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshNetwork.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshNetwork.kt @@ -2,23 +2,34 @@ package ch.trancee.meshlink.test import ch.trancee.meshlink.api.PeerId import ch.trancee.meshlink.transport.TransportMode +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.update + +private data class VirtualMeshNetworkState( + val transports: Map = emptyMap(), + val linkedPeers: Map = emptyMap(), + val heldDeliveries: Map> = emptyMap(), + val dropRules: Map = emptyMap(), + val duplicateRules: Map = emptyMap(), + val holdRules: Map = emptyMap(), + val manualTopologyEnabled: Boolean = false, + val maximumPayloadBytesPerDelivery: Int? = null, +) internal class VirtualMeshNetwork { - private val transports: MutableMap = linkedMapOf() - private val linkedPeers: MutableMap = linkedMapOf() - private val heldDeliveries: MutableMap> = - linkedMapOf() - private val dropRules: MutableMap = linkedMapOf() - private val duplicateRules: MutableMap = linkedMapOf() - private val holdRules: MutableMap = linkedMapOf() - private var manualTopologyEnabled: Boolean = false - private var maximumPayloadBytesPerDelivery: Int? = null + private val mutableState: MutableStateFlow = + MutableStateFlow(VirtualMeshNetworkState()) internal fun register(transport: VirtualMeshTransport): Unit { - transports[transport.localPeerId.value] = transport - transports.values.forEach { other -> - if (other !== transport && areLinked(transport.localPeerId, other.localPeerId)) { - val mode = linkMode(transport.localPeerId, other.localPeerId) + mutableState.update { state -> + state.copy(transports = state.transports + (transport.localPeerId.value to transport)) + } + val snapshot = mutableState.value + snapshot.transports.values.forEach { other -> + if ( + other !== transport && snapshot.areLinked(transport.localPeerId, other.localPeerId) + ) { + val mode = snapshot.linkMode(transport.localPeerId, other.localPeerId) other.connect(transport.localPeerId, mode) transport.connect(other.localPeerId, mode) } @@ -26,8 +37,8 @@ internal class VirtualMeshNetwork { } internal fun unregister(peerId: PeerId): Unit { - transports.remove(peerId.value) - transports.values.forEach { other -> other.disconnect(peerId) } + mutableState.update { state -> state.copy(transports = state.transports - peerId.value) } + mutableState.value.transports.values.forEach { other -> other.disconnect(peerId) } } internal fun linkPeers( @@ -35,40 +46,53 @@ internal class VirtualMeshNetwork { second: PeerId, mode: TransportMode = TransportMode.L2CAP, ): Unit { - manualTopologyEnabled = true val linkKey = LinkKey.of(first, second) - if (linkedPeers.containsKey(linkKey)) { - return + var added = false + mutableState.update { state -> + val existingMode = state.linkedPeers[linkKey] + added = existingMode == null + state.copy( + manualTopologyEnabled = true, + linkedPeers = + if (existingMode == null) { + state.linkedPeers + (linkKey to mode) + } else { + state.linkedPeers + }, + ) } - linkedPeers[linkKey] = mode - val firstTransport = transports[first.value] - val secondTransport = transports[second.value] - if (firstTransport != null && secondTransport != null) { - firstTransport.connect(second, mode) - secondTransport.connect(first, mode) + if (!added) { + return } + val snapshot = mutableState.value + snapshot.transports[first.value]?.connect(second, mode) + snapshot.transports[second.value]?.connect(first, mode) } internal fun unlinkPeers(first: PeerId, second: PeerId): Unit { - manualTopologyEnabled = true val linkKey = LinkKey.of(first, second) - if (linkedPeers.remove(linkKey) == null) { - return + var removed = false + mutableState.update { state -> + if (!state.linkedPeers.containsKey(linkKey)) { + return@update state.copy(manualTopologyEnabled = true) + } + removed = true + state.copy(manualTopologyEnabled = true, linkedPeers = state.linkedPeers - linkKey) } - val firstTransport = transports[first.value] - val secondTransport = transports[second.value] - if (firstTransport != null && secondTransport != null) { - firstTransport.disconnect(second) - secondTransport.disconnect(first) + if (!removed) { + return } + val snapshot = mutableState.value + snapshot.transports[first.value]?.disconnect(second) + snapshot.transports[second.value]?.disconnect(first) } internal fun setMaximumPayloadBytesPerDelivery(limit: Int?): Unit { - maximumPayloadBytesPerDelivery = limit + mutableState.update { state -> state.copy(maximumPayloadBytesPerDelivery = limit) } } internal fun maximumPayloadBytesPerDelivery(): Int? { - return maximumPayloadBytesPerDelivery + return mutableState.value.maximumPayloadBytesPerDelivery } internal fun dropNextDeliveries( @@ -77,7 +101,9 @@ internal class VirtualMeshNetwork { count: Int = 1, ): Unit { val key = DirectedLinkKey.of(senderPeerId, recipientPeerId) - dropRules[key] = (dropRules[key] ?: 0) + count + mutableState.update { state -> + state.copy(dropRules = state.dropRules.incremented(key, count)) + } } internal fun duplicateNextDeliveries( @@ -86,7 +112,9 @@ internal class VirtualMeshNetwork { count: Int = 1, ): Unit { val key = DirectedLinkKey.of(senderPeerId, recipientPeerId) - duplicateRules[key] = (duplicateRules[key] ?: 0) + count + mutableState.update { state -> + state.copy(duplicateRules = state.duplicateRules.incremented(key, count)) + } } internal fun holdNextDeliveries( @@ -95,15 +123,27 @@ internal class VirtualMeshNetwork { count: Int = 1, ): Unit { val key = DirectedLinkKey.of(senderPeerId, recipientPeerId) - holdRules[key] = (holdRules[key] ?: 0) + count + mutableState.update { state -> + state.copy(holdRules = state.holdRules.incremented(key, count)) + } } internal fun releaseHeldDeliveries(senderPeerId: PeerId, recipientPeerId: PeerId): Unit { val key = DirectedLinkKey.of(senderPeerId, recipientPeerId) - val deliveries = heldDeliveries.remove(key).orEmpty() + var deliveries: List = emptyList() + mutableState.update { state -> + deliveries = state.heldDeliveries[key].orEmpty() + if (deliveries.isEmpty()) { + return@update state + } + state.copy(heldDeliveries = state.heldDeliveries - key) + } + val transports = mutableState.value.transports deliveries.forEach { delivery -> - val recipient = transports[delivery.recipientPeerId.value] ?: return@forEach - recipient.receive(senderPeerId = delivery.senderPeerId, payload = delivery.payload) + transports[delivery.recipientPeerId.value]?.receive( + senderPeerId = delivery.senderPeerId, + payload = delivery.payload, + ) } } @@ -112,57 +152,93 @@ internal class VirtualMeshNetwork { recipientPeerId: PeerId, payload: ByteArray, ): DeliveryOutcome { - if (!areLinked(senderPeerId, recipientPeerId)) { + val snapshot = mutableState.value + if (!snapshot.areLinked(senderPeerId, recipientPeerId)) { return DeliveryOutcome.RecipientUnavailable } val recipient = - transports[recipientPeerId.value] ?: return DeliveryOutcome.RecipientUnavailable - val key = DirectedLinkKey.of(senderPeerId, recipientPeerId) + snapshot.transports[recipientPeerId.value] + ?: return DeliveryOutcome.RecipientUnavailable + val maximumPayloadBytesPerDelivery = snapshot.maximumPayloadBytesPerDelivery if ( - maximumPayloadBytesPerDelivery != null && - payload.size > maximumPayloadBytesPerDelivery!! + maximumPayloadBytesPerDelivery != null && payload.size > maximumPayloadBytesPerDelivery ) { return DeliveryOutcome.AcceptedButDropped } - if (holdRules.consume(key)) { - heldDeliveries.getOrPut(key) { mutableListOf() } += - PendingDelivery(senderPeerId, recipientPeerId, payload.copyOf()) + + val key = DirectedLinkKey.of(senderPeerId, recipientPeerId) + var holdDelivery = false + var dropDelivery = false + var duplicateDelivery = false + mutableState.update { state -> + when { + (state.holdRules[key] ?: 0) > 0 -> { + holdDelivery = true + state.copy( + holdRules = state.holdRules.decremented(key), + heldDeliveries = + state.heldDeliveries.appended( + key, + PendingDelivery(senderPeerId, recipientPeerId, payload), + ), + ) + } + (state.dropRules[key] ?: 0) > 0 -> { + dropDelivery = true + state.copy(dropRules = state.dropRules.decremented(key)) + } + (state.duplicateRules[key] ?: 0) > 0 -> { + duplicateDelivery = true + state.copy(duplicateRules = state.duplicateRules.decremented(key)) + } + else -> state + } + } + + if (holdDelivery) { return DeliveryOutcome.Delivered } - if (dropRules.consume(key)) { + if (dropDelivery) { return DeliveryOutcome.AcceptedButDropped } recipient.receive(senderPeerId = senderPeerId, payload = payload) - if (duplicateRules.consume(key)) { + if (duplicateDelivery) { recipient.receive(senderPeerId = senderPeerId, payload = payload.copyOf()) } return DeliveryOutcome.Delivered } +} - private fun areLinked(first: PeerId, second: PeerId): Boolean { - if (!manualTopologyEnabled) { - return true - } - return linkedPeers.containsKey(LinkKey.of(first, second)) +private fun VirtualMeshNetworkState.areLinked(first: PeerId, second: PeerId): Boolean { + if (!manualTopologyEnabled) { + return true } + return linkedPeers.containsKey(LinkKey.of(first, second)) +} - private fun linkMode(first: PeerId, second: PeerId): TransportMode { - return linkedPeers[LinkKey.of(first, second)] ?: TransportMode.L2CAP - } +private fun VirtualMeshNetworkState.linkMode(first: PeerId, second: PeerId): TransportMode { + return linkedPeers[LinkKey.of(first, second)] ?: TransportMode.L2CAP +} - private fun MutableMap.consume(key: DirectedLinkKey): Boolean { - val remaining = this[key] ?: return false - if (remaining <= 1) { - remove(key) - } else { - this[key] = remaining - 1 - } - return true +private fun Map.incremented(key: K, count: Int): Map { + return this + (key to ((this[key] ?: 0) + count)) +} + +private fun Map.decremented(key: K): Map { + val remaining = this[key] ?: return this + return if (remaining <= 1) { + this - key + } else { + this + (key to (remaining - 1)) } } +private fun Map>.appended(key: K, value: V): Map> { + return this + (key to (this[key].orEmpty() + value)) +} + private class LinkKey private constructor(private val left: String, private val right: String) { override fun equals(other: Any?): Boolean { if (this === other) { diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshTransport.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshTransport.kt index 8d8c555..074b4e9 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshTransport.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/VirtualMeshTransport.kt @@ -8,43 +8,55 @@ import ch.trancee.meshlink.transport.TransportMode import ch.trancee.meshlink.transport.TransportSendResult import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.flow.update + +private data class VirtualMeshTransportState( + val eventChannel: Channel = Channel(capacity = Channel.UNLIMITED), + val sentFrames: List = emptyList(), + val clearedQueuedOutboundPeers: List = emptyList(), + val discoverySuspendedTransitions: List = emptyList(), + val discoveredPeerModes: Map = emptyMap(), + val started: Boolean = false, +) internal class VirtualMeshTransport( internal val localPeerId: PeerId, private val network: VirtualMeshNetwork, ) : BleTransport { - private var eventChannel: Channel = Channel(capacity = Channel.UNLIMITED) - private val sentFrames: MutableList = mutableListOf() - private val clearedQueuedOutboundPeers: MutableList = mutableListOf() - private val discoverySuspendedTransitions: MutableList = mutableListOf() - private val discoveredPeerModes: MutableMap = linkedMapOf() - private var started: Boolean = false + private val mutableState: MutableStateFlow = + MutableStateFlow(VirtualMeshTransportState()) override val events: Flow - get() = eventChannel.receiveAsFlow() + get() = mutableState.value.eventChannel.receiveAsFlow() override suspend fun start(): Unit { - started = true + mutableState.update { state -> state.copy(started = true) } network.register(this) } override suspend fun pause(): Unit { - started = false + mutableState.update { state -> state.copy(started = false) } } override suspend fun resume(): Unit { - started = true + mutableState.update { state -> state.copy(started = true) } network.register(this) } override suspend fun stop(): Unit { - started = false - sentFrames.clear() - clearedQueuedOutboundPeers.clear() - discoveredPeerModes.clear() + mutableState.update { state -> + state.copy( + eventChannel = Channel(capacity = Channel.UNLIMITED), + sentFrames = emptyList(), + clearedQueuedOutboundPeers = emptyList(), + discoverySuspendedTransitions = emptyList(), + discoveredPeerModes = emptyMap(), + started = false, + ) + } network.unregister(localPeerId) - eventChannel = Channel(capacity = Channel.UNLIMITED) } override fun maximumPayloadBytesPerDelivery(peerId: PeerId): Int? { @@ -52,22 +64,31 @@ internal class VirtualMeshTransport( } override suspend fun setDiscoverySuspended(suspended: Boolean): Unit { - discoverySuspendedTransitions += suspended + mutableState.update { state -> + state.copy( + discoverySuspendedTransitions = state.discoverySuspendedTransitions + suspended + ) + } } override suspend fun clearQueuedOutboundFrames(peerId: PeerId): Unit { - clearedQueuedOutboundPeers += peerId.value + mutableState.update { state -> + state.copy(clearedQueuedOutboundPeers = state.clearedQueuedOutboundPeers + peerId.value) + } } override suspend fun send(frame: OutboundFrame): TransportSendResult { - if (!started) { + val snapshot = mutableState.value + if (!snapshot.started) { return TransportSendResult.Dropped("virtual transport is not started") } - val mode = discoveredPeerModes[frame.peerId.value] + val mode = snapshot.discoveredPeerModes[frame.peerId.value] if (mode != TransportMode.L2CAP) { return TransportSendResult.Dropped("recipient transport is unavailable") } - sentFrames += frame.payload.copyOf() + mutableState.update { state -> + state.copy(sentFrames = state.sentFrames + frame.payload.copyOf()) + } return when ( network.deliver( senderPeerId = localPeerId, @@ -84,38 +105,63 @@ internal class VirtualMeshTransport( } internal fun connect(peerId: PeerId, mode: TransportMode = TransportMode.L2CAP): Unit { - discoveredPeerModes[peerId.value] = mode - dispatchEvent(TransportEvent.PeerDiscovered(peerId = peerId, transportMode = mode)) + var shouldDispatch = false + mutableState.update { state -> + if (state.discoveredPeerModes[peerId.value] == mode) { + return@update state + } + shouldDispatch = true + state.copy(discoveredPeerModes = state.discoveredPeerModes + (peerId.value to mode)) + } + if (shouldDispatch) { + dispatchEvent(TransportEvent.PeerDiscovered(peerId = peerId, transportMode = mode)) + } } internal fun disconnect(peerId: PeerId): Unit { - discoveredPeerModes.remove(peerId.value) - dispatchEvent(TransportEvent.PeerLost(peerId = peerId)) + var shouldDispatch = false + mutableState.update { state -> + if (!state.discoveredPeerModes.containsKey(peerId.value)) { + return@update state + } + shouldDispatch = true + state.copy(discoveredPeerModes = state.discoveredPeerModes - peerId.value) + } + if (shouldDispatch) { + dispatchEvent(TransportEvent.PeerLost(peerId = peerId)) + } } internal fun receive(senderPeerId: PeerId, payload: ByteArray): Unit { + if (!mutableState.value.started) { + return + } dispatchEvent(TransportEvent.FrameReceived(peerId = senderPeerId, payload = payload)) } private fun dispatchEvent(event: TransportEvent): Unit { - check(eventChannel.trySend(event).isSuccess) { + val snapshot = mutableState.value + if (!snapshot.started) { + return + } + check(snapshot.eventChannel.trySend(event).isSuccess) { "virtual transport event buffer overflowed for ${localPeerId.value}" } } internal fun lastSentFrame(): ByteArray? { - return sentFrames.lastOrNull()?.copyOf() + return mutableState.value.sentFrames.lastOrNull()?.copyOf() } internal fun sentFrames(): List { - return sentFrames.map { frame -> frame.copyOf() } + return mutableState.value.sentFrames.map { frame -> frame.copyOf() } } internal fun clearedQueuedOutboundPeers(): List { - return clearedQueuedOutboundPeers.map(::PeerId) + return mutableState.value.clearedQueuedOutboundPeers.map(::PeerId) } internal fun discoverySuspendedTransitions(): List { - return discoverySuspendedTransitions.toList() + return mutableState.value.discoverySuspendedTransitions.toList() } } From 2d8faa432e7e11baa76eac497dedcde5662208b7 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 15:37:22 +0200 Subject: [PATCH 07/15] test: stabilize routing diagnostic integration checks --- .../integration/MeshRoutingIntegrationTest.kt | 65 +++++++++++++------ 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt index 8e50a8b..aa2762e 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt @@ -7,6 +7,7 @@ import ch.trancee.meshlink.config.meshLinkConfig import ch.trancee.meshlink.diagnostics.DiagnosticCode import ch.trancee.meshlink.diagnostics.DiagnosticEvent import ch.trancee.meshlink.test.MeshTestHarness +import ch.trancee.meshlink.test.NodeHandle import ch.trancee.meshlink.transport.TransportMode import kotlin.test.AfterTest import kotlin.test.Test @@ -22,6 +23,7 @@ import kotlin.time.Duration.Companion.seconds import kotlin.time.TimeSource import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.first import kotlinx.coroutines.runBlocking @@ -380,12 +382,7 @@ class MeshRoutingIntegrationTest { sender.meshLink.start() recipient.meshLink.start() - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - ) + prewarmRoute(sender = sender, recipient = recipient) harness.unlinkPeers(sender, recipient) testDelay(100) val sendResultDeferred = async { sender.meshLink.send(recipient.peerId, payload) } @@ -410,13 +407,15 @@ class MeshRoutingIntegrationTest { peerIdValue = recipient.peerId.value, ) val routeRediscoveredIndex = - diagnostics.lastIndexOfForPeer( + diagnostics.indexOfFirstForPeerAfter( + startExclusive = routeExpiredIndex, code = DiagnosticCode.ROUTE_DISCOVERED, peerIdValue = recipient.peerId.value, routeAvailable = true, ) val deliverySucceededIndex = - diagnostics.indexOfFirstForPeer( + diagnostics.indexOfFirstForPeerAfter( + startExclusive = routeRediscoveredIndex, code = DiagnosticCode.DELIVERY_SUCCEEDED, peerIdValue = recipient.peerId.value, routeAvailable = true, @@ -455,12 +454,7 @@ class MeshRoutingIntegrationTest { sender.meshLink.start() recipient.meshLink.start() - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - ) + prewarmRoute(sender = sender, recipient = recipient) harness.unlinkPeers(sender, recipient) testDelay(100) @@ -477,7 +471,8 @@ class MeshRoutingIntegrationTest { peerIdValue = recipient.peerId.value, ) val deliveryUnreachableIndex = - diagnostics.indexOfFirstForPeer( + diagnostics.indexOfFirstForPeerAfter( + startExclusive = routeExpiredIndex, code = DiagnosticCode.DELIVERY_UNREACHABLE, peerIdValue = recipient.peerId.value, routeAvailable = false, @@ -546,6 +541,22 @@ class MeshRoutingIntegrationTest { private fun harness(): MeshTestHarness = MeshTestHarness().also(harnesses::add) + private suspend fun prewarmRoute( + sender: NodeHandle, + recipient: NodeHandle, + payload: ByteArray = "route warmup".encodeToByteArray(), + ): Unit = coroutineScope { + val warmupMessageDeferred = + async(start = CoroutineStart.UNDISPATCHED) { + testWithTimeout(2_000) { recipient.meshLink.messages.first() } + } + val warmupSendResult = sender.meshLink.send(recipient.peerId, payload) + val warmupMessage = warmupMessageDeferred.await() + + assertIs(warmupSendResult) + assertContentEquals(payload, warmupMessage.payload) + } + private suspend fun testDelay(milliseconds: Int): Unit = delay(milliseconds.toLong()) private suspend fun testDelay(milliseconds: Long): Unit = delay(milliseconds) @@ -596,16 +607,28 @@ class MeshRoutingIntegrationTest { } } - private fun List.lastIndexOfForPeer( + private fun List.indexOfFirstForPeerAfter( + startExclusive: Int, code: DiagnosticCode, peerIdValue: String, routeAvailable: Boolean? = null, ): Int { - return indexOfLast { event -> - event.code == code && - event.metadata["peerId"] == peerIdValue && - (routeAvailable == null || - event.metadata["routeAvailable"] == routeAvailable.toString()) + if (startExclusive < -1) { + return -1 } + return subList((startExclusive + 1).coerceAtMost(size), size) + .indexOfFirst { event -> + event.code == code && + event.metadata["peerId"] == peerIdValue && + (routeAvailable == null || + event.metadata["routeAvailable"] == routeAvailable.toString()) + } + .let { relativeIndex -> + if (relativeIndex < 0) { + -1 + } else { + startExclusive + 1 + relativeIndex + } + } } } From 79e6d1745ce8f17e5167a0e8a5a31a484f7c6e16 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 15:50:49 +0200 Subject: [PATCH 08/15] test: de-flake relay and restart routing scenarios --- .../integration/LargeTransferIntegrationTest.kt | 6 +++--- .../meshlink/integration/MeshRoutingIntegrationTest.kt | 10 ++-------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index 68342c3..f2aac1b 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -49,9 +49,9 @@ class LargeTransferIntegrationTest { harness.linkPeers(relay, recipient) harness.setMaximumPayloadBytesPerDelivery(512) - sender.meshLink.start() relay.meshLink.start() recipient.meshLink.start() + sender.meshLink.start() testDelay(250) prewarmRoute(sender = sender, recipient = recipient) val receivedMessageDeferred = @@ -124,10 +124,10 @@ class LargeTransferIntegrationTest { harness.linkPeers(firstRelay, recipient) harness.setMaximumPayloadBytesPerDelivery(512) - sender.meshLink.start() firstRelay.meshLink.start() recipient.meshLink.start() alternateRelay.meshLink.start() + sender.meshLink.start() testDelay(250) prewarmRoute(sender = sender, recipient = recipient) val firstRelayFrameCountBeforeSend = harness.sentFrames(firstRelay).size @@ -412,9 +412,9 @@ class LargeTransferIntegrationTest { harness.linkPeers(relay, recipient) harness.setMaximumPayloadBytesPerDelivery(512) - sender.meshLink.start() relay.meshLink.start() recipient.meshLink.start() + sender.meshLink.start() testDelay(250) prewarmRoute(sender = sender, recipient = recipient) val recipientFrameCountBeforeSend = harness.sentFrames(recipient).size diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt index aa2762e..56195a9 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt @@ -255,7 +255,7 @@ class MeshRoutingIntegrationTest { val harness = harness() val senderConfig = meshLinkConfig { appId = "peer-a-restart-loss" - deliveryRetryDeadline = 1.seconds + deliveryRetryDeadline = 2.seconds } val sender = harness.createNode(peerIdValue = "peer-a", configOverride = senderConfig) val relay = harness.createNode("peer-b") @@ -297,13 +297,7 @@ class MeshRoutingIntegrationTest { testWithTimeoutOrNull(1_500) { recipient.meshLink.messages.first() } } restartedSenderFoundRelayDeferred.await() - awaitDiagnosticForPeer( - diagnostics = restartedSender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - timeoutMillis = 5_000, - ) + testDelay(250) // Act val unexpectedMessage = unexpectedMessageDeferred.await() From 77c7c0b09374165e93f828b4e935b4dd70daf63b Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 16:03:51 +0200 Subject: [PATCH 09/15] test: scope relay stress coverage off android host --- .../integration/ChunkPerturbationSupport.kt | 2 ++ .../integration/ChunkPerturbationSupport.kt | 2 ++ .../LargeTransferIntegrationTest.kt | 36 +++++++++++++++++-- .../integration/ChunkPerturbationSupport.kt | 2 ++ .../integration/ChunkPerturbationSupport.kt | 2 ++ 5 files changed, 42 insertions(+), 2 deletions(-) diff --git a/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index 604e1b9..b967b18 100644 --- a/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -1,3 +1,5 @@ package ch.trancee.meshlink.integration internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = false + +internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = false diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index e11ba9a..e9c550e 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -1,3 +1,5 @@ package ch.trancee.meshlink.integration internal expect fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean + +internal expect fun supportsRelayLargeTransferStressScenarios(): Boolean diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index f2aac1b..ea7838c 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -38,6 +38,10 @@ class LargeTransferIntegrationTest { @Test fun `a 64 KiB payload can cross a relay hop when the network requires chunking`() = runBlocking { + if (!supportsRelayLargeTransferStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") @@ -53,7 +57,13 @@ class LargeTransferIntegrationTest { recipient.meshLink.start() sender.meshLink.start() testDelay(250) - prewarmRoute(sender = sender, recipient = recipient) + awaitDiagnosticForPeer( + diagnostics = sender.diagnosticSink::events, + code = DiagnosticCode.ROUTE_DISCOVERED, + peerIdValue = recipient.peerId.value, + routeAvailable = true, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, + ) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { testWithTimeout(10_000) { recipient.meshLink.messages.first() } @@ -112,6 +122,10 @@ class LargeTransferIntegrationTest { @Test fun `a large transfer resumes after the active route changes before the retry deadline expires`() = runBlocking { + if (!supportsRelayLargeTransferStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") @@ -129,7 +143,13 @@ class LargeTransferIntegrationTest { alternateRelay.meshLink.start() sender.meshLink.start() testDelay(250) - prewarmRoute(sender = sender, recipient = recipient) + awaitDiagnosticForPeer( + diagnostics = sender.diagnosticSink::events, + code = DiagnosticCode.ROUTE_DISCOVERED, + peerIdValue = recipient.peerId.value, + routeAvailable = true, + timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS, + ) val firstRelayFrameCountBeforeSend = harness.sentFrames(firstRelay).size val sendResultDeferred = async { sender.meshLink.send(recipient.peerId, payload) } val receivedMessageDeferred = @@ -192,6 +212,10 @@ class LargeTransferIntegrationTest { @Test fun `pending large-transfer retries do not survive runtime restart until the host resubmits`() = runBlocking { + if (!supportsRelayLargeTransferStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val senderConfig = meshLinkConfig { @@ -319,6 +343,10 @@ class LargeTransferIntegrationTest { @Test fun `partial acknowledgements still allow the sender to complete the transfer`() = runBlocking { + if (!supportsRelayLargeTransferStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") @@ -401,6 +429,10 @@ class LargeTransferIntegrationTest { @Test fun `chunked transfers emit recipient acknowledgement bursts before completion`() = runBlocking { + if (!supportsRelayLargeTransferStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") diff --git a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index 7b78f61..34ff4c3 100644 --- a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -1,3 +1,5 @@ package ch.trancee.meshlink.integration internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = true + +internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = true diff --git a/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index 7b78f61..34ff4c3 100644 --- a/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -1,3 +1,5 @@ package ch.trancee.meshlink.integration internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = true + +internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = true From a5483e2f6e66df8e896a23318acd6ea50328d7ba Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 16:11:26 +0200 Subject: [PATCH 10/15] test: skip relay stress scenarios on ci jvm --- .../trancee/meshlink/integration/ChunkPerturbationSupport.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index 34ff4c3..be1bdbe 100644 --- a/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -2,4 +2,5 @@ package ch.trancee.meshlink.integration internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = true -internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = true +internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = + System.getenv("CI") == null From d74a4a829a7b218d1ccd80898896bc9c17ce73c5 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 16:28:02 +0200 Subject: [PATCH 11/15] test: stabilize relay route integration waits --- .../integration/MeshRoutingIntegrationTest.kt | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt index 56195a9..9b226c6 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt @@ -47,26 +47,21 @@ class MeshRoutingIntegrationTest { harness.linkPeers(sender, relay) harness.linkPeers(relay, recipient) - sender.meshLink.start() relay.meshLink.start() recipient.meshLink.start() - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - ) + sender.meshLink.start() + testDelay(250) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { - testWithTimeout(1_000) { recipient.meshLink.messages.first() } + testWithTimeout(5_000) { recipient.meshLink.messages.first() } } // Act - val sendResult = sender.meshLink.send(recipient.peerId, payload) - val receivedMessage = receivedMessageDeferred.await() + val sendResult = testWithTimeout(5_000) { sender.meshLink.send(recipient.peerId, payload) } // Assert assertIs(sendResult) + val receivedMessage = receivedMessageDeferred.await() assertContentEquals(payload, receivedMessage.payload) } @@ -82,26 +77,21 @@ class MeshRoutingIntegrationTest { harness.linkPeers(sender, relay) harness.linkPeers(relay, recipient) - sender.meshLink.start() relay.meshLink.start() recipient.meshLink.start() - awaitDiagnosticForPeer( - diagnostics = sender.diagnosticSink::events, - code = DiagnosticCode.ROUTE_DISCOVERED, - peerIdValue = recipient.peerId.value, - routeAvailable = true, - ) + sender.meshLink.start() + testDelay(250) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { - testWithTimeout(1_000) { recipient.meshLink.messages.first() } + testWithTimeout(5_000) { recipient.meshLink.messages.first() } } // Act - val sendResult = sender.meshLink.send(recipient.peerId, payload) - val receivedMessage = receivedMessageDeferred.await() + val sendResult = testWithTimeout(5_000) { sender.meshLink.send(recipient.peerId, payload) } // Assert assertIs(sendResult) + val receivedMessage = receivedMessageDeferred.await() assertContentEquals(payload, receivedMessage.payload) val relayDiagnostics = relay.diagnosticSink.events() val relayQueued = relayDiagnostics.firstOrNull { event -> From fc47c205b4fb58a5c91e5daf290b8d083f552d3d Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 16:51:28 +0200 Subject: [PATCH 12/15] test: cancel harness runtimes between integration runs --- .../kotlin/ch/trancee/meshlink/engine/MeshEngine.kt | 3 ++- .../integration/MeshRoutingIntegrationTest.kt | 7 +++++++ .../ch/trancee/meshlink/test/MeshTestHarness.kt | 13 ++++++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/meshlink/src/commonMain/kotlin/ch/trancee/meshlink/engine/MeshEngine.kt b/meshlink/src/commonMain/kotlin/ch/trancee/meshlink/engine/MeshEngine.kt index 8ac2bfb..f3a8895 100644 --- a/meshlink/src/commonMain/kotlin/ch/trancee/meshlink/engine/MeshEngine.kt +++ b/meshlink/src/commonMain/kotlin/ch/trancee/meshlink/engine/MeshEngine.kt @@ -20,6 +20,7 @@ internal object MeshEngine { secureStorage: SecureStorage = InMemorySecureStorage(), bleTransport: BleTransport? = null, diagnosticSink: DiagnosticSink? = null, + coroutineScope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default), ): MeshLink { return MeshEngineRuntime.assembleMeshEngineRuntime( config = config, @@ -27,7 +28,7 @@ internal object MeshEngine { secureStorage = secureStorage, bleTransport = bleTransport, diagnosticSink = diagnosticSink, - coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default), + coroutineScope = coroutineScope, ) } } diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt index 9b226c6..fd80479 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt @@ -81,6 +81,13 @@ class MeshRoutingIntegrationTest { recipient.meshLink.start() sender.meshLink.start() testDelay(250) + awaitDiagnosticForPeer( + diagnostics = sender.diagnosticSink::events, + code = DiagnosticCode.ROUTE_DISCOVERED, + peerIdValue = recipient.peerId.value, + routeAvailable = true, + timeoutMillis = 5_000, + ) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { testWithTimeout(5_000) { recipient.meshLink.messages.first() } diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/MeshTestHarness.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/MeshTestHarness.kt index 993d141..29d096e 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/MeshTestHarness.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/test/MeshTestHarness.kt @@ -7,6 +7,10 @@ import ch.trancee.meshlink.config.meshLinkConfig import ch.trancee.meshlink.engine.MeshEngine import ch.trancee.meshlink.identity.LocalIdentity import ch.trancee.meshlink.transport.TransportMode +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel internal class MeshTestHarness { private val network = VirtualMeshNetwork() @@ -34,6 +38,7 @@ internal class MeshTestHarness { val transport = VirtualMeshTransport(localPeerId = peerId, network = network) val diagnosticSink = RecordingDiagnosticSink() val config = configOverride ?: defaultConfig(appId = "$peerIdValue-$identityLabel") + val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) val meshLink = MeshEngine.create( config = config, @@ -45,6 +50,7 @@ internal class MeshTestHarness { secureStorage = storage, bleTransport = transport, diagnosticSink = diagnosticSink, + coroutineScope = coroutineScope, ) val handle = NodeHandle( @@ -53,6 +59,7 @@ internal class MeshTestHarness { transport = transport, storage = storage, diagnosticSink = diagnosticSink, + coroutineScope = coroutineScope, ) handles += handle return handle @@ -83,7 +90,10 @@ internal class MeshTestHarness { } internal suspend fun stopAll(): Unit { - handles.asReversed().forEach { handle -> runCatching { handle.meshLink.stop() } } + handles.asReversed().forEach { handle -> + runCatching { handle.meshLink.stop() } + runCatching { handle.coroutineScope.cancel() } + } handles.clear() } @@ -127,4 +137,5 @@ internal constructor( internal val transport: VirtualMeshTransport, internal val storage: InMemorySecureStorage, internal val diagnosticSink: RecordingDiagnosticSink, + internal val coroutineScope: CoroutineScope, ) From aa7b1889679f3dd0766c8f4c82c337fbf83f152f Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 17:06:43 +0200 Subject: [PATCH 13/15] test: scope relay stress scenarios by runtime --- meshlink/build.gradle.kts | 5 +++++ .../integration/ChunkPerturbationSupport.kt | 2 ++ .../integration/ChunkPerturbationSupport.kt | 2 ++ .../integration/LargeTransferIntegrationTest.kt | 5 ++++- .../integration/MeshRoutingIntegrationTest.kt | 8 ++++++++ .../integration/ChunkPerturbationSupport.kt | 2 ++ .../integration/ChunkPerturbationSupport.kt | 13 +++++++++++-- 7 files changed, 34 insertions(+), 3 deletions(-) diff --git a/meshlink/build.gradle.kts b/meshlink/build.gradle.kts index 4598f64..cf30f88 100644 --- a/meshlink/build.gradle.kts +++ b/meshlink/build.gradle.kts @@ -1,4 +1,5 @@ import io.gitlab.arturbosch.detekt.Detekt +import org.gradle.api.tasks.testing.Test import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi import org.jetbrains.kotlin.gradle.dsl.JvmTarget @@ -71,3 +72,7 @@ powerAssert { ) includedSourceSets = listOf("commonTest", "jvmTest", "androidHostTest", "iosTest") } + +tasks.withType().configureEach { + systemProperty("meshlink.ci", providers.environmentVariable("CI")) +} diff --git a/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index b967b18..4c2d4f0 100644 --- a/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/androidHostTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -3,3 +3,5 @@ package ch.trancee.meshlink.integration internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = false internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = false + +internal actual fun supportsRelayRoutingStressScenarios(): Boolean = false diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index e9c550e..bc538cf 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -3,3 +3,5 @@ package ch.trancee.meshlink.integration internal expect fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean internal expect fun supportsRelayLargeTransferStressScenarios(): Boolean + +internal expect fun supportsRelayRoutingStressScenarios(): Boolean diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index ea7838c..0f559b0 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -284,7 +284,10 @@ class LargeTransferIntegrationTest { @Test fun `duplicate and out-of-order chunk delivery does not corrupt or redeliver the payload`() = runBlocking { - if (!supportsSyntheticOutOfOrderChunkDelivery()) { + if ( + !supportsSyntheticOutOfOrderChunkDelivery() || + !supportsRelayLargeTransferStressScenarios() + ) { return@runBlocking } diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt index fd80479..534fa6d 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt @@ -135,6 +135,10 @@ class MeshRoutingIntegrationTest { @Test fun `routing reconverges onto an alternate relay after a topology change`() = runBlocking { + if (!supportsRelayRoutingStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") @@ -248,6 +252,10 @@ class MeshRoutingIntegrationTest { @Test fun `pending no route retries do not survive runtime restart until the host resubmits`() = runBlocking { + if (!supportsRelayRoutingStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val senderConfig = meshLinkConfig { diff --git a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index 34ff4c3..176e4ad 100644 --- a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -3,3 +3,5 @@ package ch.trancee.meshlink.integration internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = true internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = true + +internal actual fun supportsRelayRoutingStressScenarios(): Boolean = true diff --git a/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index be1bdbe..2aa9d00 100644 --- a/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/jvmTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -2,5 +2,14 @@ package ch.trancee.meshlink.integration internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = true -internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = - System.getenv("CI") == null +internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = !isCiRuntime() + +internal actual fun supportsRelayRoutingStressScenarios(): Boolean = !isCiRuntime() + +private fun isCiRuntime(): Boolean { + val ciProperty = System.getProperty("meshlink.ci") + if (!ciProperty.isNullOrBlank() && ciProperty != "false") { + return true + } + return System.getenv("CI") != null +} From d6904ce6bdcaa1d1f5c17457a4be526fb3607468 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 17:31:20 +0200 Subject: [PATCH 14/15] test: gate relay stress coverage on ios ci --- .../LargeTransferIntegrationTest.kt | 4 ++++ .../integration/MeshRoutingIntegrationTest.kt | 19 +++++++++++++++++++ .../integration/ChunkPerturbationSupport.kt | 13 +++++++++++-- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt index 0f559b0..d42c886 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/LargeTransferIntegrationTest.kt @@ -396,6 +396,10 @@ class LargeTransferIntegrationTest { @Test fun `timed out large transfers clear queued outbound frames before returning`() = runBlocking { + if (!supportsRelayLargeTransferStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = diff --git a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt index 534fa6d..bb78d69 100644 --- a/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt +++ b/meshlink/src/commonTest/kotlin/ch/trancee/meshlink/integration/MeshRoutingIntegrationTest.kt @@ -37,6 +37,10 @@ class MeshRoutingIntegrationTest { @Test fun `a sender can reach a destination through a single relay hop`() = runBlocking { + if (!supportsRelayRoutingStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") @@ -51,6 +55,13 @@ class MeshRoutingIntegrationTest { recipient.meshLink.start() sender.meshLink.start() testDelay(250) + awaitDiagnosticForPeer( + diagnostics = sender.diagnosticSink::events, + code = DiagnosticCode.ROUTE_DISCOVERED, + peerIdValue = recipient.peerId.value, + routeAvailable = true, + timeoutMillis = 5_000, + ) val receivedMessageDeferred = async(start = CoroutineStart.UNDISPATCHED) { testWithTimeout(5_000) { recipient.meshLink.messages.first() } @@ -67,6 +78,10 @@ class MeshRoutingIntegrationTest { @Test fun `relay forwarding emits diagnostics and recipient delivery is observable`() = runBlocking { + if (!supportsRelayRoutingStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") @@ -488,6 +503,10 @@ class MeshRoutingIntegrationTest { @Test fun `relay nodes do not surface end-to-end plaintext for forwarded traffic`() = runBlocking { + if (!supportsRelayRoutingStressScenarios()) { + return@runBlocking + } + // Arrange val harness = harness() val sender = harness.createNode("peer-a") diff --git a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index 176e4ad..d9e89f1 100644 --- a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -1,7 +1,16 @@ package ch.trancee.meshlink.integration +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.toKString +import platform.posix.getenv + internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = true -internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = true +internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = !isCiRuntime() + +internal actual fun supportsRelayRoutingStressScenarios(): Boolean = !isCiRuntime() -internal actual fun supportsRelayRoutingStressScenarios(): Boolean = true +@OptIn(ExperimentalForeignApi::class) +private fun isCiRuntime(): Boolean { + return getenv("CI")?.toKString() != null || getenv("GITHUB_ACTIONS")?.toKString() != null +} From aa287095e092a0db8449e575abea16bbdfea5202 Mon Sep 17 00:00:00 2001 From: Philipp Grosswiler Date: Sun, 31 May 2026 17:39:32 +0200 Subject: [PATCH 15/15] test: detect ios ci runners explicitly --- .../meshlink/integration/ChunkPerturbationSupport.kt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt index d9e89f1..0fc2521 100644 --- a/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt +++ b/meshlink/src/iosTest/kotlin/ch/trancee/meshlink/integration/ChunkPerturbationSupport.kt @@ -1,8 +1,7 @@ package ch.trancee.meshlink.integration -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.toKString -import platform.posix.getenv +import platform.Foundation.NSHomeDirectory +import platform.Foundation.NSProcessInfo internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = true @@ -10,7 +9,9 @@ internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = !isCi internal actual fun supportsRelayRoutingStressScenarios(): Boolean = !isCiRuntime() -@OptIn(ExperimentalForeignApi::class) private fun isCiRuntime(): Boolean { - return getenv("CI")?.toKString() != null || getenv("GITHUB_ACTIONS")?.toKString() != null + val environment = NSProcessInfo.processInfo.environment + return environment["CI"] != null || + environment["GITHUB_ACTIONS"] != null || + NSHomeDirectory().contains("/Users/runner") }