Unify CollectingConfigurer and StreamingConfigurer#1225
Unify CollectingConfigurer and StreamingConfigurer#1225
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the configuration builders by extracting shared configuration logic from CollectingConfigurer and StreamingConfigurer into a new common base class (Configurer) to remove duplication and centralize option handling.
Changes:
- Introduce a shared
Configurer<SELF>base class containing common configuration options and option de-duplication logic. - Update
CollectingConfigurerandStreamingConfigurerto extend the new base class and remove duplicated implementations. - Keep streaming-specific configuration (
ordered()) inStreamingConfigurer.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
src/main/java/com/pivovarit/collectors/StreamingConfigurer.java |
Simplifies streaming configurer by inheriting common configuration methods from Configurer and keeping ordered(). |
src/main/java/com/pivovarit/collectors/Configurer.java |
Adds new shared base class implementing common fluent configuration methods and option tracking. |
src/main/java/com/pivovarit/collectors/CollectingConfigurer.java |
Simplifies collecting configurer by inheriting common configuration methods from Configurer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| sealed abstract class Configurer<SELF extends Configurer<SELF>> permits CollectingConfigurer, StreamingConfigurer { | ||
|
|
||
| private final List<ConfigProcessor.Option> modifiers = new ArrayList<>(); | ||
| private final Set<Class<? extends ConfigProcessor.Option>> seen = new HashSet<>(); | ||
|
|
||
| abstract SELF self(); | ||
|
|
||
| /** | ||
| * Enables batching of work submitted to workers. | ||
| * <p> | ||
| * When enabled, each worker thread receives a batch of input items and processes them in one go, | ||
| * instead of scheduling one task per item. This reduces the number of tasks created and typically | ||
| * decreases contention on the underlying worker queue. | ||
| * | ||
| * <p><b>Note:</b> Depending on batch sizing and workload skew, batching may reduce load balancing and | ||
| * can lead to thread starvation (some workers become idle while others remain overloaded). | ||
| * | ||
| * @return this configurer instance for fluent chaining | ||
| */ | ||
| public SELF batching() { | ||
| addOnce(ConfigProcessor.Option.Batched.INSTANCE); | ||
| return self(); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the maximum level of parallelism. | ||
| * <p> | ||
| * This limits the number of tasks submitted to the worker queue at once, effectively bounding | ||
| * the amount of in-flight work and the maximum concurrency used by the collector. | ||
| * | ||
| * @param parallelism the desired parallelism level (must be positive) | ||
| * | ||
| * @return this configurer instance for fluent chaining | ||
| */ | ||
| public SELF parallelism(int parallelism) { | ||
| Preconditions.requireValidParallelism(parallelism); | ||
| addOnce(new ConfigProcessor.Option.Parallelism(parallelism)); | ||
| return self(); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the {@link Executor} used for running tasks. | ||
| * | ||
| * <p><b>Note:</b> The provided executor must not <em>drop</em> tasks on rejection (e.g. using a | ||
| * {@code RejectedExecutionHandler} that discards submitted work). Dropping tasks will cause the | ||
| * collector to wait for results that will never be produced, which can lead to deadlocks. | ||
| * | ||
| * @param executor the executor to use | ||
| * | ||
| * @return this configurer instance for fluent chaining | ||
| */ | ||
| public SELF executor(Executor executor) { | ||
| Preconditions.requireValidExecutor(executor); | ||
| addOnce(new ConfigProcessor.Option.ThreadPool(executor)); | ||
| return self(); | ||
| } |
There was a problem hiding this comment.
Configurer is package-private, but it now declares the public fluent API methods (batching/parallelism/executor/executorDecorator/taskDecorator). For callers outside com.pivovarit.collectors, these methods become inaccessible (e.g. Configurer.parallelism(int) is defined in an inaccessible class), which breaks the public CollectingConfigurer/StreamingConfigurer API used from README and ParallelCollectors examples. To keep the external API working, either make Configurer public, or redeclare these methods in each public configurer (delegating to the superclass) so the declaring class is accessible to library consumers.
|



No description provided.