Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ jobs:

- name: Verify MeshLink SDK on Linux
run: |
# The hosted Linux runners still expose tight CPU budgets for this suite.
# Keep verification single-worker so the routing and transfer integration
# tests do not starve while JVM and Android host tasks execute.
./gradlew \
:meshlink:jvmTest \
:meshlink:testAndroidHostTest \
Expand All @@ -49,6 +52,7 @@ jobs:
:meshlink:koverVerify \
verifyDocs \
checkAgp9Invariants \
--max-workers=1 \
--console=plain

- name: Upload verification reports
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ jobs:
if: matrix.language == 'java-kotlin'
shell: bash
run: |
# The Java/Kotlin extractor only needs JVM and Android compilation commands.
# Avoid clean/full assemble so PR scans do not rebuild iOS/native artifacts.
# CodeQL must observe real compiler invocations, so do not let Gradle satisfy
# these tasks from the build cache or up-to-date checks.
# Keep the task list scoped to JVM and Android compilation so this stays faster
# than a full build and avoids unnecessary iOS/native work.
./gradlew \
:meshlink:jvmJar \
:meshlink:androidJar \
Expand All @@ -147,6 +149,8 @@ jobs:
:meshlink-reference:bundleAndroidMainAar \
:meshlink-reference:android:compileDebugKotlin \
:meshlink-proof:android:compileDebugKotlin \
--rerun-tasks \
--no-build-cache \
--no-daemon \
--console=plain

Expand Down
5 changes: 5 additions & 0 deletions meshlink/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io.gitlab.arturbosch.detekt.Detekt
import org.gradle.api.tasks.testing.Test
import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi
import org.jetbrains.kotlin.gradle.dsl.JvmTarget

Expand Down Expand Up @@ -71,3 +72,7 @@ powerAssert {
)
includedSourceSets = listOf("commonTest", "jvmTest", "androidHostTest", "iosTest")
}

tasks.withType<Test>().configureEach {
systemProperty("meshlink.ci", providers.environmentVariable("CI"))
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
package ch.trancee.meshlink.integration

internal actual fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean = false

internal actual fun supportsRelayLargeTransferStressScenarios(): Boolean = false

internal actual fun supportsRelayRoutingStressScenarios(): Boolean = false
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ internal object MeshEngine {
secureStorage: SecureStorage = InMemorySecureStorage(),
bleTransport: BleTransport? = null,
diagnosticSink: DiagnosticSink? = null,
coroutineScope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default),
): MeshLink {
return MeshEngineRuntime.assembleMeshEngineRuntime(
config = config,
localIdentity = localIdentity,
secureStorage = secureStorage,
bleTransport = bleTransport,
diagnosticSink = diagnosticSink,
coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default),
coroutineScope = coroutineScope,
)
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
package ch.trancee.meshlink.integration

internal expect fun supportsSyntheticOutOfOrderChunkDelivery(): Boolean

internal expect fun supportsRelayLargeTransferStressScenarios(): Boolean

internal expect fun supportsRelayRoutingStressScenarios(): Boolean
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ch.trancee.meshlink.config.meshLinkConfig
import ch.trancee.meshlink.diagnostics.DiagnosticCode
import ch.trancee.meshlink.diagnostics.DiagnosticEvent
import ch.trancee.meshlink.test.MeshTestHarness
import ch.trancee.meshlink.test.NodeHandle
import kotlin.test.AfterTest
import kotlin.test.Test
import kotlin.test.assertContentEquals
Expand All @@ -21,6 +22,7 @@ import kotlin.time.Duration.Companion.seconds
import kotlin.time.TimeSource
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
Expand All @@ -30,11 +32,16 @@ import kotlinx.coroutines.withTimeoutOrNull
class LargeTransferIntegrationTest {
private companion object {
private const val TEST_TIMING_SLACK_MULTIPLIER: Long = 4
private const val ROUTE_DISCOVERY_TIMEOUT_MILLIS: Long = 8_000
}

@Test
fun `a 64 KiB payload can cross a relay hop when the network requires chunking`() =
runBlocking {
if (!supportsRelayLargeTransferStressScenarios()) {
return@runBlocking
}

// Arrange
val harness = harness()
val sender = harness.createNode("peer-a")
Expand All @@ -46,20 +53,20 @@ class LargeTransferIntegrationTest {
harness.linkPeers(relay, recipient)
harness.setMaximumPayloadBytesPerDelivery(512)

sender.meshLink.start()
relay.meshLink.start()
recipient.meshLink.start()
sender.meshLink.start()
testDelay(250)
awaitDiagnosticForPeer(
diagnostics = sender.diagnosticSink::events,
code = DiagnosticCode.ROUTE_DISCOVERED,
peerIdValue = recipient.peerId.value,
routeAvailable = true,
timeoutMillis = 5_000,
timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS,
)
val receivedMessageDeferred =
async(start = CoroutineStart.UNDISPATCHED) {
testWithTimeout(6_000) { recipient.meshLink.messages.first() }
testWithTimeout(10_000) { recipient.meshLink.messages.first() }
}

// Act
Expand All @@ -86,13 +93,7 @@ class LargeTransferIntegrationTest {
sender.meshLink.start()
recipient.meshLink.start()
testDelay(500)
awaitDiagnosticForPeer(
diagnostics = sender.diagnosticSink::events,
code = DiagnosticCode.ROUTE_DISCOVERED,
peerIdValue = recipient.peerId.value,
routeAvailable = true,
timeoutMillis = 5_000,
)
prewarmRoute(sender = sender, recipient = recipient)
val frameCountBeforeSend = harness.sentFrames(sender).size
val receivedMessageDeferred =
async(start = CoroutineStart.UNDISPATCHED) {
Expand Down Expand Up @@ -121,6 +122,10 @@ class LargeTransferIntegrationTest {
@Test
fun `a large transfer resumes after the active route changes before the retry deadline expires`() =
runBlocking {
if (!supportsRelayLargeTransferStressScenarios()) {
return@runBlocking
}

// Arrange
val harness = harness()
val sender = harness.createNode("peer-a")
Expand All @@ -133,26 +138,31 @@ class LargeTransferIntegrationTest {
harness.linkPeers(firstRelay, recipient)
harness.setMaximumPayloadBytesPerDelivery(512)

sender.meshLink.start()
firstRelay.meshLink.start()
recipient.meshLink.start()
alternateRelay.meshLink.start()
sender.meshLink.start()
testDelay(250)
awaitDiagnosticForPeer(
diagnostics = sender.diagnosticSink::events,
code = DiagnosticCode.ROUTE_DISCOVERED,
peerIdValue = recipient.peerId.value,
routeAvailable = true,
timeoutMillis = 5_000,
timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS,
)
val firstRelayFrameCountBeforeSend = harness.sentFrames(firstRelay).size
val sendResultDeferred = async { sender.meshLink.send(recipient.peerId, payload) }
val receivedMessageDeferred =
async(start = CoroutineStart.UNDISPATCHED) {
testWithTimeout(4_000) { recipient.meshLink.messages.first() }
testWithTimeout(10_000) { recipient.meshLink.messages.first() }
}

// Act
testDelay(250)
awaitSentFrameCountAtLeast(
harness = harness,
handle = firstRelay,
expectedCount = firstRelayFrameCountBeforeSend + 1,
)
harness.unlinkPeers(firstRelay, recipient)
harness.linkPeers(sender, alternateRelay)
harness.linkPeers(alternateRelay, recipient)
Expand Down Expand Up @@ -202,6 +212,10 @@ class LargeTransferIntegrationTest {
@Test
fun `pending large-transfer retries do not survive runtime restart until the host resubmits`() =
runBlocking {
if (!supportsRelayLargeTransferStressScenarios()) {
return@runBlocking
}

// Arrange
val harness = harness()
val senderConfig = meshLinkConfig {
Expand Down Expand Up @@ -239,13 +253,13 @@ class LargeTransferIntegrationTest {
}
}
}
restartedSender.meshLink.start()
harness.linkPeers(relay, recipient)
restartedSender.meshLink.start()
restartedSenderFoundRelayDeferred.await()
awaitDiagnostic(
diagnostics = restartedSender.diagnosticSink::events,
code = DiagnosticCode.ROUTE_DISCOVERED,
timeoutMillis = 5_000,
timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS,
)

// Act
Expand All @@ -270,7 +284,10 @@ class LargeTransferIntegrationTest {
@Test
fun `duplicate and out-of-order chunk delivery does not corrupt or redeliver the payload`() =
runBlocking {
if (!supportsSyntheticOutOfOrderChunkDelivery()) {
if (
!supportsSyntheticOutOfOrderChunkDelivery() ||
!supportsRelayLargeTransferStressScenarios()
) {
return@runBlocking
}

Expand All @@ -294,7 +311,7 @@ class LargeTransferIntegrationTest {
code = DiagnosticCode.ROUTE_DISCOVERED,
peerIdValue = recipient.peerId.value,
routeAvailable = true,
timeoutMillis = 5_000,
timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS,
)
val sendResultDeferred = async { sender.meshLink.send(recipient.peerId, payload) }
val receivedMessageDeferred = async {
Expand Down Expand Up @@ -329,6 +346,10 @@ class LargeTransferIntegrationTest {

@Test
fun `partial acknowledgements still allow the sender to complete the transfer`() = runBlocking {
if (!supportsRelayLargeTransferStressScenarios()) {
return@runBlocking
}

// Arrange
val harness = harness()
val sender = harness.createNode("peer-a")
Expand All @@ -349,7 +370,7 @@ class LargeTransferIntegrationTest {
code = DiagnosticCode.ROUTE_DISCOVERED,
peerIdValue = recipient.peerId.value,
routeAvailable = true,
timeoutMillis = 5_000,
timeoutMillis = ROUTE_DISCOVERY_TIMEOUT_MILLIS,
)
val receivedMessageDeferred =
async(start = CoroutineStart.UNDISPATCHED) {
Expand All @@ -375,6 +396,10 @@ class LargeTransferIntegrationTest {

@Test
fun `timed out large transfers clear queued outbound frames before returning`() = runBlocking {
if (!supportsRelayLargeTransferStressScenarios()) {
return@runBlocking
}

// Arrange
val harness = harness()
val sender =
Expand All @@ -395,13 +420,7 @@ class LargeTransferIntegrationTest {
sender.meshLink.start()
recipient.meshLink.start()
testDelay(250)
awaitDiagnosticForPeer(
diagnostics = sender.diagnosticSink::events,
code = DiagnosticCode.ROUTE_DISCOVERED,
peerIdValue = recipient.peerId.value,
routeAvailable = true,
timeoutMillis = 5_000,
)
prewarmRoute(sender = sender, recipient = recipient)
harness.dropNextDeliveries(recipient, sender, count = 256)

// Act
Expand All @@ -417,6 +436,10 @@ class LargeTransferIntegrationTest {
@Test
fun `chunked transfers emit recipient acknowledgement bursts before completion`() =
runBlocking {
if (!supportsRelayLargeTransferStressScenarios()) {
return@runBlocking
}

// Arrange
val harness = harness()
val sender = harness.createNode("peer-a")
Expand All @@ -428,17 +451,11 @@ class LargeTransferIntegrationTest {
harness.linkPeers(relay, recipient)
harness.setMaximumPayloadBytesPerDelivery(512)

sender.meshLink.start()
relay.meshLink.start()
recipient.meshLink.start()
sender.meshLink.start()
testDelay(250)
awaitDiagnosticForPeer(
diagnostics = sender.diagnosticSink::events,
code = DiagnosticCode.ROUTE_DISCOVERED,
peerIdValue = recipient.peerId.value,
routeAvailable = true,
timeoutMillis = 5_000,
)
prewarmRoute(sender = sender, recipient = recipient)
val recipientFrameCountBeforeSend = harness.sentFrames(recipient).size
val receivedMessageDeferred =
async(start = CoroutineStart.UNDISPATCHED) {
Expand Down Expand Up @@ -505,6 +522,35 @@ class LargeTransferIntegrationTest {

private fun harness(): MeshTestHarness = MeshTestHarness().also(harnesses::add)

private suspend fun prewarmRoute(
sender: NodeHandle,
recipient: NodeHandle,
payload: ByteArray = ByteArray(32) { index -> ((index * 29) % 251).toByte() },
): Unit = coroutineScope {
val warmupMessageDeferred =
async(start = CoroutineStart.UNDISPATCHED) {
testWithTimeout(5_000) { recipient.meshLink.messages.first() }
}
val warmupSendResult = sender.meshLink.send(recipient.peerId, payload)
val warmupMessage = warmupMessageDeferred.await()

assertIs<SendResult.Sent>(warmupSendResult)
assertContentEquals(payload, warmupMessage.payload)
}

private suspend fun awaitSentFrameCountAtLeast(
harness: MeshTestHarness,
handle: NodeHandle,
expectedCount: Int,
timeoutMillis: Long = 5_000,
): Unit {
testWithTimeout(timeoutMillis) {
while (harness.sentFrames(handle).size < expectedCount) {
testDelay(10)
}
}
}

private suspend fun testDelay(milliseconds: Int): Unit =
delay(milliseconds.toLong() * TEST_TIMING_SLACK_MULTIPLIER)

Expand Down
Loading
Loading