Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new iterator implementation has a thread-safety issue. removeFirst() and isNotEmpty() are called on a plain ArrayList without synchronization. two threads iterating concurrently can corrupt the list.

the AtomicRef doesn't help here because we never atomically swap the reference we just read .value and mutate the underlying mutable list directly.

the fix is to pair AtomicRef with immutable data. thread-safe without locks. my proposal:

public class SideEffects<T>() : Iterable<T> {
    private val sideEffects: AtomicRef<List<T>> = atomic(emptyList())

    private constructor(sideEffects: List<T>) : this() {
        this.sideEffects.value = sideEffects
    }

    public fun add(vararg sideEffectsToAdd: T): SideEffects<T> {
        return SideEffects(sideEffects.value + sideEffectsToAdd)
    }

    public fun clear(): SideEffects<T> {
        return SideEffects()
    }

    override fun iterator(): Iterator<T> {
        return sideEffects.getAndSet(emptyList()).iterator()
    }
}

Copy link
Copy Markdown
Collaborator Author

@matejdro matejdro May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This proposal has a slight drawback: it does not support partial consumption e.g. when a view starts iterating on the side effects, it will wipe them out, even if it does not consume anything. But I think the better thread safety is worth it, I think this is a really edge edge case that should never happen. So I have updated the implementation with your proposal and removed a test for this behavior.

However, it also does not fix the issue in the title of this PR. shouldBeEmpty() works like this:

  1. Checks emptiness via iterator.hasNext()
  2. Attempts to get the element for the error message via iterable.first() which fails, because the list is empty.

However, I think this is a bug in the kotest that we shouldn't hack around to fix: kotest/kotest#6005

I will rename this PR to only update this implementation and leave the kotest issue.

Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,20 @@ import kotlinx.atomicfu.atomic
* It locks itself, so you can't add and read at the same time, also it's not possible to read it at the same time from different threads, being completely thread-safe.
*/

public class SideEffects<T>() : Iterable<T> {
private val sideEffects: AtomicRef<MutableList<T>> = atomic(ArrayList())
public class SideEffects<T>private constructor(sideEffects: List<T>) : Iterable<T> {
private val sideEffects: AtomicRef<List<T>> = atomic(sideEffects)

// Private constructor to initialize from an Iterable
private constructor(sideEffects: Iterable<T>) : this() {
this.sideEffects.value.addAll(sideEffects)
}
public constructor() : this(emptyList())

public fun add(vararg sideEffectsToAdd: T): SideEffects<T> {
val newList = sideEffects.value.toMutableList()
newList.addAll(sideEffectsToAdd)
return SideEffects(newList)
return SideEffects(sideEffects.value + sideEffectsToAdd)
}

public fun clear(): SideEffects<T> {
return SideEffects()
}

override fun iterator(): Iterator<T> =
iterator {
while (true) {
val currentList = sideEffects.value
if (currentList.isEmpty()) break
val nextSideEffect = currentList.removeFirstOrNull()
nextSideEffect?.let { yield(it) }
}
}
override fun iterator(): Iterator<T> {
return sideEffects.getAndSet(emptyList()).iterator()
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package com.adidas.mvi.sideeffects

import io.kotest.core.spec.style.BehaviorSpec
import io.kotest.matchers.booleans.shouldBeFalse
import io.kotest.matchers.collections.shouldBeEmpty
import io.kotest.matchers.collections.shouldContain
import io.kotest.matchers.collections.shouldContainInOrder
import kotlin.time.DurationUnit
import kotlin.time.ExperimentalTime
import kotlin.time.toDuration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore

@ExperimentalTime
internal class SideEffectsTest : BehaviorSpec({
Expand Down Expand Up @@ -60,42 +54,5 @@ internal class SideEffectsTest : BehaviorSpec({
clearedSideEffects.shouldBeEmpty()
}
}

`when`("I try to read SideEffects and it takes time, simulated by a semaphore") {
val firstSideEffect = TestSideEffect()
val secondSideEffectToBeAddedLater = TestSideEffect()

var returnedSideEffects = sideEffects.add(firstSideEffect)

val semaphore = Semaphore(2)

val readJob =
launch(Dispatchers.Default) {
returnedSideEffects.forEach { _ ->
semaphore.acquire() // Wait for the signal
}
}

val addJob =
launch(Dispatchers.Default) {
returnedSideEffects = sideEffects.add(secondSideEffectToBeAddedLater)
}

semaphore.release()

then("It should be released only by the semaphore").config(
timeout =
5.toDuration(
DurationUnit.SECONDS,
),
) {
readJob.join()
addJob.join()

readJob.isActive.shouldBeFalse()
addJob.isActive.shouldBeFalse()
returnedSideEffects.shouldContain(secondSideEffectToBeAddedLater)
}
}
}
})
Loading