Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions src/main/java/com/pivovarit/collectors/ParallelCollectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Collector;
import java.util.stream.Stream;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

/**
* An umbrella class exposing static factory methods for instantiating parallel {@link Collector}s
Expand Down Expand Up @@ -823,6 +824,135 @@
return Factory.streamingBy(classifier, mapper, c -> c.parallelism(parallelism));
}

// parallelForEach

/**
* A convenience {@link Collector} for performing a parallel side-effecting operation
* on each element using Virtual Threads.
* <p>
* Unlike {@link #parallel(Function)}, this collector does not produce mapped results.
* Instead, the provided {@code action} is executed for its side effects (e.g. sending
* notifications, writing to external systems, updating shared state).
* The returned {@link CompletableFuture} completes when all actions have finished.
*
* <p><b>Note:</b> This collector does not limit parallelism in any way (it may spawn work for every
* element). As a result, it is not suitable for processing huge streams.
*
* <br>
* Example:
* <pre>{@code
* CompletableFuture<Void> result = Stream.of(1, 2, 3)
* .collect(parallelForEach(i -> sendNotification(i)));
* }</pre>
*
* @param action the side-effecting operation to perform on each element
* @param <T> the input element type
*
* @return a {@code Collector} producing a {@link CompletableFuture} that completes
* when all actions have finished
*
* @since 4.1.0
*/
public static <T> Collector<T, ?, CompletableFuture<@Nullable Void>> parallelForEach(
Consumer<? super T> action) {
Objects.requireNonNull(action, "action cannot be null");

Check failure on line 858 in src/main/java/com/pivovarit/collectors/ParallelCollectors.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "action cannot be null" 3 times.

See more on https://sonarcloud.io/project/issues?id=pivovarit_parallel-collectors&issues=AZzHpooLQh-IuXSnKU-Q&open=AZzHpooLQh-IuXSnKU-Q&pullRequest=1223

return Factory.collecting(consumeAndReturn(), asFunction(action), c -> {});
}

/**
* A convenience {@link Collector} for performing a parallel side-effecting operation
* on each element (by default on Virtual Threads), with additional configuration applied
* via the provided {@code configurer}.
* <p>
* Unlike {@link #parallel(Function, Consumer)}, this collector does not produce mapped results.
* Instead, the provided {@code action} is executed for its side effects. The {@code configurer}
* can be used to set a custom executor, parallelism level, batching, or decorators.
*
* <p><b>Note:</b> Unless the {@code configurer} explicitly limits parallelism (e.g. via
* {@link CollectingConfigurer#parallelism(int)}), this collector does not limit parallelism in any
* way (it may spawn work for every element). As a result, it is not suitable for processing huge
* streams.
*
* <p><b>Note:</b> For more information on available configuration options, see
* {@link CollectingConfigurer}.
*
* <br>
* Example:
* <pre>{@code
* CompletableFuture<Void> result = Stream.of(1, 2, 3)
* .collect(parallelForEach(i -> sendNotification(i), c -> c
* .parallelism(64)
* .batching()
* ));
* }</pre>
*
* @param action the side-effecting operation to perform on each element
* @param configurer callback used to configure execution (see {@link CollectingConfigurer})
* @param <T> the input element type
*
* @return a {@code Collector} producing a {@link CompletableFuture} that completes
* when all actions have finished
*
* @since 4.1.0
*/
public static <T> Collector<T, ?, CompletableFuture<@Nullable Void>> parallelForEach(
Consumer<? super T> action,
Consumer<CollectingConfigurer> configurer) {
Objects.requireNonNull(action, "action cannot be null");
Objects.requireNonNull(configurer, "configurer cannot be null");

return Factory.collecting(consumeAndReturn(), asFunction(action), configurer);
}

/**
* A convenience {@link Collector} for performing a parallel side-effecting operation
* on each element using Virtual Threads, with a maximum parallelism level.
* <p>
* This overload is a convenience for applying an easy parallelism cap. For additional configuration
* options (e.g. batching or a custom {@link java.util.concurrent.Executor}), use the overload
* accepting a {@link CollectingConfigurer}.
*
* <br>
* Example:
* <pre>{@code
* CompletableFuture<Void> result = Stream.of(1, 2, 3)
* .collect(parallelForEach(i -> sendNotification(i), 64));
* }</pre>
*
* @param action the side-effecting operation to perform on each element
* @param parallelism maximum parallelism (must be positive)
* @param <T> the input element type
*
* @return a {@code Collector} producing a {@link CompletableFuture} that completes
* when all actions have finished
*
* @since 4.1.0
*/
public static <T> Collector<T, ?, CompletableFuture<@Nullable Void>> parallelForEach(
Consumer<? super T> action, int parallelism) {
Objects.requireNonNull(action, "action cannot be null");

return Factory.collecting(
consumeAndReturn(),
asFunction(action),
c -> c.parallelism(parallelism));
}

private static <T> Function<T, @Nullable Void> asFunction(Consumer<? super T> action) {
return t -> {
action.accept(t);
return null;
};
}

private static <T> Function<Stream<T>, @Nullable Void> consumeAndReturn() {
return s -> {
s.forEach(__ -> {});

Check warning on line 951 in src/main/java/com/pivovarit/collectors/ParallelCollectors.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this local variable to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=pivovarit_parallel-collectors&issues=AZzHpooLQh-IuXSnKU-P&open=AZzHpooLQh-IuXSnKU-P&pullRequest=1223
return null;
};
}

/**
* A convenience {@code Collector} for collecting a {@code Stream<CompletableFuture<T>>}
* into a {@code CompletableFuture<R>} using a provided {@code Collector<T, ?, R>}
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/com/pivovarit/collectors/TypeChecks.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.pivovarit.collectors;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collector;

Expand Down Expand Up @@ -43,6 +44,12 @@ private static <A, R> void expectCollector(Collector<A, ?, R> c) {

static final class Covariance {

record Consumers(
Consumer<SuperClass> superConsumer,
Consumer<SubClass> subConsumer
) {
}

record Functions(
Function<SubClass, SuperClass> subToSuper,
Function<SubClass, SubClass> subToSub
Expand Down Expand Up @@ -105,12 +112,28 @@ record StreamingBy(Functions f) {
expectCollector(ParallelCollectors.parallelToStreamBy(f.subToSub, f.subToSub, c -> {}));
}
}

record ParallelForEach(Consumers cons) {
ParallelForEach {
// Consumer<SuperClass> should be accepted for Collector<SubClass, ...>
expectCollector(ParallelCollectors.<SubClass>parallelForEach(cons.superConsumer));
expectCollector(ParallelCollectors.<SubClass>parallelForEach(cons.subConsumer));

expectCollector(ParallelCollectors.<SubClass>parallelForEach(cons.superConsumer, c -> {}));
expectCollector(ParallelCollectors.<SubClass>parallelForEach(cons.subConsumer, c -> {}));

expectCollector(ParallelCollectors.<SubClass>parallelForEach(cons.superConsumer, 42));
expectCollector(ParallelCollectors.<SubClass>parallelForEach(cons.subConsumer, 42));
}
}
}

static final class Contravariance {

private static final Function<SuperClass, SubClass> superToSub = x -> new SubClass();
private static final Function<Object, SubClass> objToSub = x -> new SubClass();
private static final Consumer<SuperClass> superConsumer = x -> {};
private static final Consumer<Object> objConsumer = x -> {};

record Parallel() {
Parallel {
Expand Down Expand Up @@ -165,5 +188,18 @@ record StreamingBy() {
expectCollector(ParallelCollectors.parallelToStreamBy(objToSub, objToSub, c -> {}));
}
}

record ParallelForEach() {
ParallelForEach {
expectCollector(ParallelCollectors.parallelForEach(superConsumer));
expectCollector(ParallelCollectors.parallelForEach(objConsumer));

expectCollector(ParallelCollectors.parallelForEach(superConsumer, c -> {}));
expectCollector(ParallelCollectors.parallelForEach(objConsumer, c -> {}));

expectCollector(ParallelCollectors.parallelForEach(superConsumer, 42));
expectCollector(ParallelCollectors.parallelForEach(objConsumer, 42));
}
}
}
}
125 changes: 125 additions & 0 deletions src/test/java/com/pivovarit/collectors/test/ParallelForEachTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class ParallelForEachTest {

@TestFactory
Stream<DynamicTest> shouldProcessAllElements() {
return allForEach()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var processed = new ConcurrentLinkedQueue<Integer>();
var list = IntStream.range(0, 100).boxed().toList();

list.stream()
.collect(c.collector(processed::add))
.join();

assertThat(processed).containsExactlyInAnyOrderElementsOf(list);
}));
}

@TestFactory
Stream<DynamicTest> shouldProcessEmpty() {
return allForEach()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var counter = new AtomicInteger();

Stream.<Integer>empty()
.collect(c.collector(i -> counter.incrementAndGet()))
.join();

assertThat(counter.get()).isZero();
}));
}

@TestFactory
Stream<DynamicTest> shouldPropagateException() {
return allForEach()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
assertThatThrownBy(() -> IntStream.range(0, 10)

Check warning on line 69 in src/test/java/com/pivovarit/collectors/test/ParallelForEachTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor the code of the lambda to have only one invocation possibly throwing a runtime exception.

See more on https://sonarcloud.io/project/issues?id=pivovarit_parallel-collectors&issues=AZzHpoqXQh-IuXSnKU-R&open=AZzHpoqXQh-IuXSnKU-R&pullRequest=1223
.boxed()
.collect(c.collector(i -> {
if (i == 7) {
throw new IllegalArgumentException();
}
}))
.join())
.isInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(IllegalArgumentException.class);
}));
}

@TestFactory
Stream<DynamicTest> shouldReturnCompletableFutureVoid() {
return allForEach()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
CompletableFuture<Void> result = Stream.of(1, 2, 3)
.collect(c.collector(i -> {}));

assertThat(result.join()).isNull();
}));
}

record ForEachCollector(String name, ForEachFactory factory) {
Collector<Integer, ?, CompletableFuture<Void>> collector(Consumer<Integer> action) {
return factory.create(action);
}
}

@FunctionalInterface
interface ForEachFactory {
Collector<Integer, ?, CompletableFuture<Void>> create(Consumer<Integer> action);
}

private static Stream<ForEachCollector> allForEach() {
var executor = Executors.newCachedThreadPool();
return Stream.of(
new ForEachCollector("parallelForEach()",
ParallelCollectors::parallelForEach),
new ForEachCollector("parallelForEach(c -> {})",
a -> ParallelCollectors.parallelForEach(a, c -> {})),
new ForEachCollector("parallelForEach(c -> c.parallelism(4))",
a -> ParallelCollectors.parallelForEach(a, c -> c.parallelism(4))),
new ForEachCollector("parallelForEach(c -> c.executor(e))",
a -> ParallelCollectors.parallelForEach(a, c -> c.executor(executor))),
new ForEachCollector("parallelForEach(c -> c.executor(e).parallelism(4))",
a -> ParallelCollectors.parallelForEach(a, c -> c.executor(executor).parallelism(4))),
new ForEachCollector("parallelForEach(c -> c.executor(e).parallelism(4).batching())",
a -> ParallelCollectors.parallelForEach(a, c -> c.executor(executor).parallelism(4).batching())),
new ForEachCollector("parallelForEach(4)",
a -> ParallelCollectors.parallelForEach(a, 4)),
new ForEachCollector("parallelForEach(1)",
a -> ParallelCollectors.parallelForEach(a, 1))
);
}
}