From 963183e4a3d7c80d4fcc96512d0137c65fbacc21 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 14 Apr 2026 16:15:02 -0500 Subject: [PATCH 1/8] Remove legacy output concurrenncy --- logstash-core/lib/logstash/outputs/base.rb | 23 ++--- .../src/main/java/org/logstash/RubyUtil.java | 10 --- .../config/ir/compiler/OutputStrategyExt.java | 85 ------------------- .../ir/compiler/OutputDelegatorTest.java | 6 +- 4 files changed, 10 insertions(+), 114 deletions(-) diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index 13d10f92ca..ce958312c9 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -33,17 +33,20 @@ class LogStash::Outputs::Base < LogStash::Plugin # The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline. config :codec, :validate => :codec, :default => "plain" - # TODO remove this in Logstash 6.0 - # when we no longer support the :legacy type - # This is hacky, but it can only be herne - config :workers, :type => :number, :default => 1 # Set or return concurrency type def self.concurrency(type = nil) if type + if type == :legacy + self.logger.warn("Output plugin #{self.name} declares `concurrency :legacy` which is removed. Defaulting to :single. Please update the plugin to use `concurrency :single` or `concurrency :shared`.") + type = :single + end + if ![:shared, :single].include?(type) + raise ArgumentError, "Invalid concurrency type '#{type}', must be one of :shared, :single" + end @concurrency = type else - @concurrency || :legacy # default is :legacyo + @concurrency || :single end end @@ -57,12 +60,6 @@ def self.threadsafe? concurrency == :shared end - # Deprecated: Favor `concurrency :single` - # Remove in Logstash 6.0.0 - def self.declare_workers_not_supported!(message = nil) - concurrency :single - end - public def self.plugin_type @@ -74,10 +71,6 @@ def initialize(params = {}) super config_init(@params) - if self.workers != 1 - raise LogStash::ConfigurationError, "You are using a plugin that doesn't support workers but have set the workers value explicitly! This plugin uses the #{concurrency} and doesn't need this option" - end - # If we're running with a single thread we must enforce single-threaded concurrency by default # Maybe in a future version we'll assume output plugins are threadsafe @single_worker_mutex = Mutex.new diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index f4c296e352..3bc5682639 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -149,8 +149,6 @@ public final class RubyUtil { public static final RubyClass OUTPUT_STRATEGY_SIMPLE_ABSTRACT; - public static final RubyClass OUTPUT_STRATEGY_LEGACY; - public static final RubyClass OUTPUT_STRATEGY_SINGLE; public static final RubyClass OUTPUT_STRATEGY_SHARED; @@ -373,10 +371,6 @@ public final class RubyUtil { context, "SimpleAbstractStrategy", OUTPUT_STRATEGY_ABSTRACT, ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR ); - OUTPUT_STRATEGY_LEGACY = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder( - context, "Legacy", OUTPUT_STRATEGY_ABSTRACT, - OutputStrategyExt.LegacyOutputStrategyExt::new - ); OUTPUT_STRATEGY_SINGLE = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder( context, "Single", OUTPUT_STRATEGY_SIMPLE_ABSTRACT, OutputStrategyExt.SingleOutputStrategyExt::new @@ -389,7 +383,6 @@ public final class RubyUtil { OUTPUT_STRATEGY_ABSTRACT.defineMethods(context, OutputStrategyExt.SimpleAbstractOutputStrategyExt.class); OUTPUT_STRATEGY_SHARED.defineMethods(context, OutputStrategyExt.SharedOutputStrategyExt.class); OUTPUT_STRATEGY_SINGLE.defineMethods(context, OutputStrategyExt.SingleOutputStrategyExt.class); - OUTPUT_STRATEGY_LEGACY.defineMethods(context, OutputStrategyExt.LegacyOutputStrategyExt.class); final OutputStrategyExt.OutputStrategyRegistryExt outputStrategyRegistry = OutputStrategyExt.OutputStrategyRegistryExt.instance( context, OUTPUT_DELEGATOR_STRATEGIES @@ -397,9 +390,6 @@ public final class RubyUtil { outputStrategyRegistry.register( context, RUBY.newSymbol("shared"), OUTPUT_STRATEGY_SHARED ); - outputStrategyRegistry.register( - context, RUBY.newSymbol("legacy"), OUTPUT_STRATEGY_LEGACY - ); outputStrategyRegistry.register( context, RUBY.newSymbol("single"), OUTPUT_STRATEGY_SINGLE ); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java index e247552949..4cc9c62496 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java @@ -21,13 +21,9 @@ package org.logstash.config.ir.compiler; import org.jruby.Ruby; -import org.jruby.RubyArray; import org.jruby.RubyClass; -import org.jruby.RubyFixnum; import org.jruby.RubyHash; import org.jruby.RubyObject; -import org.jruby.api.Convert; -import org.jruby.api.Create; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; @@ -37,8 +33,6 @@ import org.logstash.execution.ExecutionContextExt; import org.logstash.plugins.factory.ContextualizerExt; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; public final class OutputStrategyExt { @@ -166,85 +160,6 @@ protected abstract IRubyObject output(ThreadContext context, IRubyObject events) protected abstract IRubyObject reg(ThreadContext context); } - @JRubyClass(name = "Legacy", parent = "AbstractStrategy") - public static final class LegacyOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt { - - private static final long serialVersionUID = 1L; - - private transient BlockingQueue workerQueue; - - private transient IRubyObject workerCount; - - private @SuppressWarnings({"rawtypes"}) RubyArray workers; - - public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) { - super(runtime, metaClass); - } - - @JRubyMethod(required = 4) - public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { - final RubyClass outputClass = (RubyClass) args[0]; - final IRubyObject metric = args[1]; - final ExecutionContextExt executionContext = (ExecutionContextExt) args[2]; - final RubyHash pluginArgs = (RubyHash) args[3]; - workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers")); - if (workerCount.isNil()) { - workerCount = RubyFixnum.one(context.runtime); - } - final int count = Convert.toInt(context, workerCount.convertToInteger()); - workerQueue = new ArrayBlockingQueue<>(count); - workers = (RubyArray) Create.allocArray(context, count); - for (int i = 0; i < count; ++i) { - final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs); - initOutputCallsite(outputClass); - output.callMethod(context, "metric=", metric); - workers.append(context, output); - workerQueue.add(output); - } - return this; - } - - @JRubyMethod(name = "worker_count") - public IRubyObject workerCount() { - return workerCount; - } - - @JRubyMethod - public IRubyObject workers() { - return workers; - } - - @Override - public IRubyObject getRubyPlugin(final ThreadContext context) { - return workers.isEmpty() ? context.nil : (IRubyObject) workers.get(0); - } - - @Override - protected IRubyObject output(final ThreadContext context, final IRubyObject events) throws InterruptedException { - final IRubyObject worker = workerQueue.take(); - try { - invokeOutput(context, events, worker); - return context.nil; - } finally { - workerQueue.put(worker); - } - } - - @Override - @SuppressWarnings("unchecked") - protected IRubyObject close(final ThreadContext context) { - workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "do_close")); - return this; - } - - @Override - @SuppressWarnings("unchecked") - protected IRubyObject reg(final ThreadContext context) { - workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "register")); - return this; - } - } - @JRubyClass(name = "SimpleAbstractStrategy", parent = "AbstractStrategy") public abstract static class SimpleAbstractOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt { diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java index 5c378d571a..8d0200f3e5 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java @@ -154,8 +154,7 @@ public void singleConcurrencyStrategyIsDefault() { public void outputStrategyTests() { StrategyPair[] outputStrategies = new StrategyPair[]{ new StrategyPair("shared", OutputStrategyExt.SharedOutputStrategyExt.class), - new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class), - new StrategyPair("legacy", OutputStrategyExt.LegacyOutputStrategyExt.class) + new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class) }; for (StrategyPair pair : outputStrategies) { @@ -184,8 +183,7 @@ public void outputStrategyTests() { public void outputStrategyMethodDelegationTests() { RubySymbol[] outputStrategies = new RubySymbol[]{ RUBY.newSymbol("shared"), - RUBY.newSymbol("single"), - RUBY.newSymbol("legacy") + RUBY.newSymbol("single") }; final ThreadContext context = RUBY.getCurrentContext(); for (RubySymbol symbol : outputStrategies) { From 3ecd923a5334846aad4d628cbcd8d75aae959f50 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 14 Apr 2026 16:38:12 -0500 Subject: [PATCH 2/8] Fix failing tests --- .../spec/logstash/outputs/base_spec.rb | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index e1f52efd32..8a2c152f9b 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -39,7 +39,7 @@ class LogStash::Outputs::NOOPShared < ::LogStash::Outputs::Base def register; end end -class LogStash::Outputs::NOOPLegacy < ::LogStash::Outputs::Base +class LogStash::Outputs::NOOPDefault < ::LogStash::Outputs::Base def register; end end @@ -60,9 +60,6 @@ def multi_receive_encoded(events_and_encoded) let(:klass) { LogStash::Outputs::NOOPSingle } it "should instantiate cleanly" do - params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 } - worker_params = params.dup; worker_params["workers"] = 1 - expect { subject }.not_to raise_error end @@ -79,19 +76,11 @@ def multi_receive_encoded(events_and_encoded) end end - context "legacy" do - let(:klass) { LogStash::Outputs::NOOPLegacy } - - it "should set concurrency correctly" do - expect(subject.concurrency).to eq(:legacy) - end + context "default (no concurrency declared)" do + let(:klass) { LogStash::Outputs::NOOPDefault } - it "should default the # of workers to 1" do - expect(subject.workers).to eq(1) - end - - it "should default concurrency to :legacy" do - expect(subject.concurrency).to eq(:legacy) + it "should default concurrency to :single" do + expect(subject.concurrency).to eq(:single) end end From 6eb1e2d59a3864d87f6fe817c7f7fa5673dea171 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 15 Apr 2026 14:14:19 -0500 Subject: [PATCH 3/8] Add safeguard for 'workers => 1' --- logstash-core/lib/logstash/outputs/base.rb | 8 ++++++++ logstash-core/spec/logstash/outputs/base_spec.rb | 2 ++ 2 files changed, 10 insertions(+) diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index ce958312c9..d427113f25 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -69,6 +69,14 @@ def self.plugin_type public def initialize(params = {}) super + + # Outputs that never declared a concurrency strategy used the now-removed :legacy + # strategy, which accepted a `workers` setting. Strip it so existing configs with + # `workers => 1` don't blow up. + if !self.class.instance_variable_defined?(:@concurrency) && @params.delete("workers") + self.logger.warn("Output plugin #{self.class.name}: the `workers` setting is no longer used and will be removed in a future release.") + end + config_init(@params) # If we're running with a single thread we must enforce single-threaded concurrency by default diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 8a2c152f9b..594c2ecb3c 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -60,6 +60,8 @@ def multi_receive_encoded(events_and_encoded) let(:klass) { LogStash::Outputs::NOOPSingle } it "should instantiate cleanly" do + params = { "dummy_option" => "potatoes", "codec" => "json" } + expect { subject }.not_to raise_error end From a8d2940342f3836c953811c7f32e28ecc1144533 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 15 Apr 2026 14:16:26 -0500 Subject: [PATCH 4/8] Add safeguard for 'workers => 1' --- logstash-core/spec/logstash/outputs/base_spec.rb | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 594c2ecb3c..454fa1bf82 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -84,6 +84,15 @@ def multi_receive_encoded(events_and_encoded) it "should default concurrency to :single" do expect(subject.concurrency).to eq(:single) end + + it "should accept the deprecated workers setting without error" do + expect { klass.new("workers" => 1) }.not_to raise_error + end + + it "should log a warning when workers is set" do + expect_any_instance_of(klass).to receive(:logger).and_return(double("logger").as_null_object) + klass.new("workers" => 1) + end end context "execution context" do From 1d56ce0c31e9db2bde87d0723bd9ebef8b83f50d Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 15 Apr 2026 18:51:48 -0500 Subject: [PATCH 5/8] Fix failing test --- logstash-core/spec/logstash/outputs/base_spec.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 454fa1bf82..3a46a5d266 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -90,8 +90,10 @@ def multi_receive_encoded(events_and_encoded) end it "should log a warning when workers is set" do - expect_any_instance_of(klass).to receive(:logger).and_return(double("logger").as_null_object) - klass.new("workers" => 1) + logger = klass.logger + expect(logger).to receive(:warn).with(/workers.*no longer used/) + output = klass.new("workers" => 1) + expect(output.params).not_to include("workers") end end From b30ee46caf92f8f6f379bad47504fce5603a8300 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 16 Apr 2026 18:03:22 -0500 Subject: [PATCH 6/8] Incorporate PR feedback, switch to parameter --- logstash-core/lib/logstash/config/mixin.rb | 4 ++-- logstash-core/lib/logstash/outputs/base.rb | 9 ++------- logstash-core/spec/logstash/outputs/base_spec.rb | 10 +--------- 3 files changed, 5 insertions(+), 18 deletions(-) diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index 5e04ec7307..1a11ba1900 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -122,7 +122,7 @@ def config_init(params) opts = self.class.get_config[name] if opts && opts[:deprecated] extra = opts[:deprecated].is_a?(String) ? opts[:deprecated] : "" - extra.gsub!("%PLUGIN%", self.class.config_name) + extra.gsub!("%PLUGIN%", self.class.config_name.to_s) self.deprecation_logger.deprecated( "You are using a deprecated config setting #{name.inspect} set in " \ "#{self.class.config_name}. Deprecated settings will continue to work, " \ @@ -135,7 +135,7 @@ def config_init(params) if opts && opts[:obsolete] extra = opts[:obsolete].is_a?(String) ? opts[:obsolete] : "" - extra.gsub!("%PLUGIN%", self.class.config_name) + extra.gsub!("%PLUGIN%", self.class.config_name.to_s) raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.obsolete", :name => name, :plugin => self.class.config_name, :extra => extra) diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index d427113f25..0a8e82d4fe 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -34,6 +34,8 @@ class LogStash::Outputs::Base < LogStash::Plugin # The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline. config :codec, :validate => :codec, :default => "plain" + config :workers, :type => :number, :deprecated => "This parameter will be ignored." + # Set or return concurrency type def self.concurrency(type = nil) if type @@ -70,13 +72,6 @@ def self.plugin_type def initialize(params = {}) super - # Outputs that never declared a concurrency strategy used the now-removed :legacy - # strategy, which accepted a `workers` setting. Strip it so existing configs with - # `workers => 1` don't blow up. - if !self.class.instance_variable_defined?(:@concurrency) && @params.delete("workers") - self.logger.warn("Output plugin #{self.class.name}: the `workers` setting is no longer used and will be removed in a future release.") - end - config_init(@params) # If we're running with a single thread we must enforce single-threaded concurrency by default diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 3a46a5d266..61ad7bc869 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -58,10 +58,9 @@ def multi_receive_encoded(events_and_encoded) context "single" do let(:klass) { LogStash::Outputs::NOOPSingle } + let(:params) { { "dummy_option" => "potatoes", "codec" => "json" } } it "should instantiate cleanly" do - params = { "dummy_option" => "potatoes", "codec" => "json" } - expect { subject }.not_to raise_error end @@ -88,13 +87,6 @@ def multi_receive_encoded(events_and_encoded) it "should accept the deprecated workers setting without error" do expect { klass.new("workers" => 1) }.not_to raise_error end - - it "should log a warning when workers is set" do - logger = klass.logger - expect(logger).to receive(:warn).with(/workers.*no longer used/) - output = klass.new("workers" => 1) - expect(output.params).not_to include("workers") - end end context "execution context" do From 5989c5c6a28d0a56f0b0e311b06e8e83f006bbac Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 30 Apr 2026 14:06:45 -0500 Subject: [PATCH 7/8] Update logstash-core/lib/logstash/outputs/base.rb Add rye's suggestion for improved error message for legacy concurrency plugins Co-authored-by: Rye Biesemeyer --- logstash-core/lib/logstash/outputs/base.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index 0a8e82d4fe..615bfa7595 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -43,8 +43,13 @@ def self.concurrency(type = nil) self.logger.warn("Output plugin #{self.name} declares `concurrency :legacy` which is removed. Defaulting to :single. Please update the plugin to use `concurrency :single` or `concurrency :shared`.") type = :single end - if ![:shared, :single].include?(type) - raise ArgumentError, "Invalid concurrency type '#{type}', must be one of :shared, :single" + if !LogStash::OutputDelegatorStrategyRegistry.instance.types.include?(type) + raise ArgumentError, <<~MESSAGE.gsub("\n", " ") + The concurrency type `#{type.inspect}` specified for output plugin `#{config_name}` is not supported + on this version of Logstash. If you installed this plugin specifically on this Logstash version, + it is not compatible. If you are a plugin author, please see update your plugin to use one of + the supported plugin types: #{LogStash::OutputDelegatorStrategyRegistry.instance.types} + MESSAGE end @concurrency = type else From 2dab9bd4c8937c1a0a5a86f922cfaa80935cfe91 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 30 Apr 2026 14:08:30 -0500 Subject: [PATCH 8/8] Apply suggestions from code review Use inspect in the corner case of missing `config_name` Co-authored-by: Rye Biesemeyer --- logstash-core/lib/logstash/config/mixin.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index 1a11ba1900..fd558392cd 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -122,7 +122,7 @@ def config_init(params) opts = self.class.get_config[name] if opts && opts[:deprecated] extra = opts[:deprecated].is_a?(String) ? opts[:deprecated] : "" - extra.gsub!("%PLUGIN%", self.class.config_name.to_s) + extra.gsub!("%PLUGIN%", self.class.config_name || self.class.inspect) self.deprecation_logger.deprecated( "You are using a deprecated config setting #{name.inspect} set in " \ "#{self.class.config_name}. Deprecated settings will continue to work, " \