diff --git a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java index 8b531b43..40b2baba 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java +++ b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java @@ -23,6 +23,7 @@ import java.util.stream.Collector; import java.util.stream.Stream; import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; /** * An umbrella class exposing static factory methods for instantiating parallel {@link Collector}s @@ -823,6 +824,135 @@ private ParallelCollectors() { return Factory.streamingBy(classifier, mapper, c -> c.parallelism(parallelism)); } + // parallelForEach + + /** + * A convenience {@link Collector} for performing a parallel side-effecting operation + * on each element using Virtual Threads. + *

+ * Unlike {@link #parallel(Function)}, this collector does not produce mapped results. + * Instead, the provided {@code action} is executed for its side effects (e.g. sending + * notifications, writing to external systems, updating shared state). + * The returned {@link CompletableFuture} completes when all actions have finished. + * + *

Note: This collector does not limit parallelism in any way (it may spawn work for every + * element). As a result, it is not suitable for processing huge streams. + * + *
+ * Example: + *

{@code
+     * CompletableFuture result = Stream.of(1, 2, 3)
+     *   .collect(parallelForEach(i -> sendNotification(i)));
+     * }
+ * + * @param action the side-effecting operation to perform on each element + * @param the input element type + * + * @return a {@code Collector} producing a {@link CompletableFuture} that completes + * when all actions have finished + * + * @since 4.1.0 + */ + public static Collector> parallelForEach( + Consumer action) { + Objects.requireNonNull(action, "action cannot be null"); + + return Factory.collecting(consumeAndReturn(), asFunction(action), c -> {}); + } + + /** + * A convenience {@link Collector} for performing a parallel side-effecting operation + * on each element (by default on Virtual Threads), with additional configuration applied + * via the provided {@code configurer}. + *

+ * Unlike {@link #parallel(Function, Consumer)}, this collector does not produce mapped results. + * Instead, the provided {@code action} is executed for its side effects. The {@code configurer} + * can be used to set a custom executor, parallelism level, batching, or decorators. + * + *

Note: Unless the {@code configurer} explicitly limits parallelism (e.g. via + * {@link CollectingConfigurer#parallelism(int)}), this collector does not limit parallelism in any + * way (it may spawn work for every element). As a result, it is not suitable for processing huge + * streams. + * + *

Note: For more information on available configuration options, see + * {@link CollectingConfigurer}. + * + *
+ * Example: + *

{@code
+     * CompletableFuture result = Stream.of(1, 2, 3)
+     *   .collect(parallelForEach(i -> sendNotification(i), c -> c
+     *     .parallelism(64)
+     *     .batching()
+     *   ));
+     * }
+ * + * @param action the side-effecting operation to perform on each element + * @param configurer callback used to configure execution (see {@link CollectingConfigurer}) + * @param the input element type + * + * @return a {@code Collector} producing a {@link CompletableFuture} that completes + * when all actions have finished + * + * @since 4.1.0 + */ + public static Collector> parallelForEach( + Consumer action, + Consumer configurer) { + Objects.requireNonNull(action, "action cannot be null"); + Objects.requireNonNull(configurer, "configurer cannot be null"); + + return Factory.collecting(consumeAndReturn(), asFunction(action), configurer); + } + + /** + * A convenience {@link Collector} for performing a parallel side-effecting operation + * on each element using Virtual Threads, with a maximum parallelism level. + *

+ * This overload is a convenience for applying an easy parallelism cap. For additional configuration + * options (e.g. batching or a custom {@link java.util.concurrent.Executor}), use the overload + * accepting a {@link CollectingConfigurer}. + * + *
+ * Example: + *

{@code
+     * CompletableFuture result = Stream.of(1, 2, 3)
+     *   .collect(parallelForEach(i -> sendNotification(i), 64));
+     * }
+ * + * @param action the side-effecting operation to perform on each element + * @param parallelism maximum parallelism (must be positive) + * @param the input element type + * + * @return a {@code Collector} producing a {@link CompletableFuture} that completes + * when all actions have finished + * + * @since 4.1.0 + */ + public static Collector> parallelForEach( + Consumer action, int parallelism) { + Objects.requireNonNull(action, "action cannot be null"); + + return Factory.collecting( + consumeAndReturn(), + asFunction(action), + c -> c.parallelism(parallelism)); + } + + private static Function asFunction(Consumer action) { + return t -> { + action.accept(t); + return null; + }; + } + + private static Function, @Nullable Void> consumeAndReturn() { + return s -> { + s.forEach(__ -> {}); + return null; + }; + } + /** * A convenience {@code Collector} for collecting a {@code Stream>} * into a {@code CompletableFuture} using a provided {@code Collector} diff --git a/src/test/java/com/pivovarit/collectors/TypeChecks.java b/src/test/java/com/pivovarit/collectors/TypeChecks.java index 75ac7247..80cadcc9 100644 --- a/src/test/java/com/pivovarit/collectors/TypeChecks.java +++ b/src/test/java/com/pivovarit/collectors/TypeChecks.java @@ -15,6 +15,7 @@ */ package com.pivovarit.collectors; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collector; @@ -43,6 +44,12 @@ private static void expectCollector(Collector c) { static final class Covariance { + record Consumers( + Consumer superConsumer, + Consumer subConsumer + ) { + } + record Functions( Function subToSuper, Function subToSub @@ -105,12 +112,28 @@ record StreamingBy(Functions f) { expectCollector(ParallelCollectors.parallelToStreamBy(f.subToSub, f.subToSub, c -> {})); } } + + record ParallelForEach(Consumers cons) { + ParallelForEach { + // Consumer should be accepted for Collector + expectCollector(ParallelCollectors.parallelForEach(cons.superConsumer)); + expectCollector(ParallelCollectors.parallelForEach(cons.subConsumer)); + + expectCollector(ParallelCollectors.parallelForEach(cons.superConsumer, c -> {})); + expectCollector(ParallelCollectors.parallelForEach(cons.subConsumer, c -> {})); + + expectCollector(ParallelCollectors.parallelForEach(cons.superConsumer, 42)); + expectCollector(ParallelCollectors.parallelForEach(cons.subConsumer, 42)); + } + } } static final class Contravariance { private static final Function superToSub = x -> new SubClass(); private static final Function objToSub = x -> new SubClass(); + private static final Consumer superConsumer = x -> {}; + private static final Consumer objConsumer = x -> {}; record Parallel() { Parallel { @@ -165,5 +188,18 @@ record StreamingBy() { expectCollector(ParallelCollectors.parallelToStreamBy(objToSub, objToSub, c -> {})); } } + + record ParallelForEach() { + ParallelForEach { + expectCollector(ParallelCollectors.parallelForEach(superConsumer)); + expectCollector(ParallelCollectors.parallelForEach(objConsumer)); + + expectCollector(ParallelCollectors.parallelForEach(superConsumer, c -> {})); + expectCollector(ParallelCollectors.parallelForEach(objConsumer, c -> {})); + + expectCollector(ParallelCollectors.parallelForEach(superConsumer, 42)); + expectCollector(ParallelCollectors.parallelForEach(objConsumer, 42)); + } + } } } diff --git a/src/test/java/com/pivovarit/collectors/test/ParallelForEachTest.java b/src/test/java/com/pivovarit/collectors/test/ParallelForEachTest.java new file mode 100644 index 00000000..98132c68 --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/test/ParallelForEachTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pivovarit.collectors.test; + +import com.pivovarit.collectors.ParallelCollectors; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collector; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class ParallelForEachTest { + + @TestFactory + Stream shouldProcessAllElements() { + return allForEach() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var processed = new ConcurrentLinkedQueue(); + var list = IntStream.range(0, 100).boxed().toList(); + + list.stream() + .collect(c.collector(processed::add)) + .join(); + + assertThat(processed).containsExactlyInAnyOrderElementsOf(list); + })); + } + + @TestFactory + Stream shouldProcessEmpty() { + return allForEach() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var counter = new AtomicInteger(); + + Stream.empty() + .collect(c.collector(i -> counter.incrementAndGet())) + .join(); + + assertThat(counter.get()).isZero(); + })); + } + + @TestFactory + Stream shouldPropagateException() { + return allForEach() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + assertThatThrownBy(() -> IntStream.range(0, 10) + .boxed() + .collect(c.collector(i -> { + if (i == 7) { + throw new IllegalArgumentException(); + } + })) + .join()) + .isInstanceOf(CompletionException.class) + .hasCauseExactlyInstanceOf(IllegalArgumentException.class); + })); + } + + @TestFactory + Stream shouldReturnCompletableFutureVoid() { + return allForEach() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + CompletableFuture result = Stream.of(1, 2, 3) + .collect(c.collector(i -> {})); + + assertThat(result.join()).isNull(); + })); + } + + record ForEachCollector(String name, ForEachFactory factory) { + Collector> collector(Consumer action) { + return factory.create(action); + } + } + + @FunctionalInterface + interface ForEachFactory { + Collector> create(Consumer action); + } + + private static Stream allForEach() { + var executor = Executors.newCachedThreadPool(); + return Stream.of( + new ForEachCollector("parallelForEach()", + ParallelCollectors::parallelForEach), + new ForEachCollector("parallelForEach(c -> {})", + a -> ParallelCollectors.parallelForEach(a, c -> {})), + new ForEachCollector("parallelForEach(c -> c.parallelism(4))", + a -> ParallelCollectors.parallelForEach(a, c -> c.parallelism(4))), + new ForEachCollector("parallelForEach(c -> c.executor(e))", + a -> ParallelCollectors.parallelForEach(a, c -> c.executor(executor))), + new ForEachCollector("parallelForEach(c -> c.executor(e).parallelism(4))", + a -> ParallelCollectors.parallelForEach(a, c -> c.executor(executor).parallelism(4))), + new ForEachCollector("parallelForEach(c -> c.executor(e).parallelism(4).batching())", + a -> ParallelCollectors.parallelForEach(a, c -> c.executor(executor).parallelism(4).batching())), + new ForEachCollector("parallelForEach(4)", + a -> ParallelCollectors.parallelForEach(a, 4)), + new ForEachCollector("parallelForEach(1)", + a -> ParallelCollectors.parallelForEach(a, 1)) + ); + } +}