From e72ebad4711e7ae69205930673741633e62ebb6e Mon Sep 17 00:00:00 2001 From: Micah Hunsberger Date: Fri, 6 Mar 2026 13:42:50 -0500 Subject: [PATCH 1/4] add api to restart a pipeline --- logstash-core/lib/logstash/agent.rb | 22 +++++++++++++++++++ .../lib/logstash/api/commands/node.rb | 4 ++++ .../lib/logstash/api/modules/node.rb | 11 ++++++++++ 3 files changed, 37 insertions(+) diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index ab854e8974..273101d0fc 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -262,6 +262,28 @@ def stop_pipeline(pipeline_id) converge_state_with_resolved_actions([action]) end + def reload_pipeline(pipeline_id) + pipeline = get_pipeline(pipeline_id) + return if pipeline.nil? + converge_result = @convergence_lock.synchronize do + pipeline_config = pipeline.pipeline_config + converge_state([LogStash::PipelineAction::Reload.new(pipeline_config, metric)]) + end + + update_metrics(converge_result) + + logger.info( + "Pipelines running", + :count => running_pipelines.size, + :running_pipelines => running_pipelines.keys, + :non_running_pipelines => non_running_pipelines.keys + ) if converge_result.success? && converge_result.total > 0 + + dispatch_events(converge_result) + + converge_result + end + # Calculate the Logstash uptime in milliseconds # # @return [Integer] Uptime in milliseconds diff --git a/logstash-core/lib/logstash/api/commands/node.rb b/logstash-core/lib/logstash/api/commands/node.rb index 8bba5aa486..4b64b8097d 100644 --- a/logstash-core/lib/logstash/api/commands/node.rb +++ b/logstash-core/lib/logstash/api/commands/node.rb @@ -66,6 +66,10 @@ def pipeline(pipeline_id, options = {}) {} # empty end + def reload_pipeline(pipeline_id) + service.agent.reload_pipeline(pipeline_id.to_sym) + end + def os { :name => java.lang.System.getProperty("os.name"), diff --git a/logstash-core/lib/logstash/api/modules/node.rb b/logstash-core/lib/logstash/api/modules/node.rb index cae3db339e..ab94d81dd4 100644 --- a/logstash-core/lib/logstash/api/modules/node.rb +++ b/logstash-core/lib/logstash/api/modules/node.rb @@ -52,6 +52,17 @@ def node respond_with(:pipelines => { pipeline_id => payload }) end + post "/pipelines/:id/_reload" do + pipeline_id = params["id"] + halt(404) if node.pipeline(pipeline_id).empty? + converge_result = node.reload_pipeline(pipeline_id) + respond_with({ + 'success' => converge_result.success?, + 'failed_actions' => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" }, + 'successful_actions' => converge_result.successful_actions.collect { |a, r| { 'id' => a.pipeline_id, 'action_type' => a.class } } + }) + end + get "/pipelines" do opts = {:graph => as_boolean(params.fetch("graph", false)), :vertices => as_boolean(params.fetch("vertices", false))} From ad07bcb3fe7de14a28f9c5348ae35a989cdb1447 Mon Sep 17 00:00:00 2001 From: Micah Hunsberger Date: Fri, 6 Mar 2026 14:06:49 -0500 Subject: [PATCH 2/4] add openapi doc for reload pipeline --- docs/static/spec/openapi/logstash-api.yaml | 57 ++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/docs/static/spec/openapi/logstash-api.yaml b/docs/static/spec/openapi/logstash-api.yaml index 06d55b987f..e18d9fe431 100644 --- a/docs/static/spec/openapi/logstash-api.yaml +++ b/docs/static/spec/openapi/logstash-api.yaml @@ -331,6 +331,63 @@ paths: - content: Logstash name: product_name +/_node/pipelines/{pipeline_name}/_reload: + post: + summary: Reload a pipeline + description: Reloads the specified pipeline. + operationId: nodeReloadPipeline + tags: + - node info + parameters: + - name: pipeline_name + in: path + required: true + schema: + type: string + description: The name of the pipeline to reload. + - $ref: "#/components/parameters/pretty" + responses: + '200': + description: Indicates a successful call + content: + application/json: + schema: + allOf: + - type: object + properties: + success: + type: boolean + description: Whether the pipeline was reloaded successfully. + failed_actions: + type: array + items: + type: object + properties: + id: + type: string + actions_type: + type: string + successful_actions: + type: array + items: + type: object + properties: + id: + type: string + action_type: + type: string + example: + ReloadPipelineExample1: + value: + success: true + failed_actions: [] + successful_actions: + - id: "main" + actionn_type: "Logstash::PipelineAction::Reload" + x-metaTags: + - content: Logstash + name: product_name + /_node/plugins: get: summary: Get plugin info From 475fee40ac2d81c29d8b66a84d760708408de49e Mon Sep 17 00:00:00 2001 From: Micah Hunsberger Date: Fri, 6 Mar 2026 14:08:40 -0500 Subject: [PATCH 3/4] add comment following code style --- logstash-core/lib/logstash/agent.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 273101d0fc..765d8a1c8e 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -262,6 +262,9 @@ def stop_pipeline(pipeline_id) converge_state_with_resolved_actions([action]) end + ## + # Reload a pipeline and wait for it to fully reload. + # @param pipeline_id [String] def reload_pipeline(pipeline_id) pipeline = get_pipeline(pipeline_id) return if pipeline.nil? From 6e34b5c396b515ad4a043849f623c9944b3d74be Mon Sep 17 00:00:00 2001 From: Micah Hunsberger Date: Fri, 6 Mar 2026 14:56:54 -0500 Subject: [PATCH 4/4] add reload_pipeline tests --- logstash-core/spec/logstash/agent_spec.rb | 28 +++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 55359d2617..a738aa6833 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -376,6 +376,34 @@ end end + describe "#reload_pipeline" do + let(:config_string) { "input { generator { id => 'old'} } output { }" } + let(:mock_config_pipeline) { mock_pipeline_config(:main, config_string, pipeline_settings) } + let(:source_loader) { TestSourceLoader.new(mock_config_pipeline) } + subject { described_class.new(agent_settings, source_loader) } + + before(:each) do + expect(subject.converge_state_and_update).to be_a_successful_converge + expect(subject.get_pipeline('main').running?).to be_truthy + end + + after(:each) do + subject.shutdown + end + + context "when agent reloads the pipeline" do + it "should reload successfully", :aggregate_failures do + pipeline_before_reload = subject.get_pipeline('main') + converge_result = subject.reload_pipeline('main') + pipeline_after_reload = subject.get_pipeline('main') + + expect(converge_result).to be_a_successful_converge + expect(pipeline_after_reload.running?).to be_truthy + expect(pipeline_before_reload.object_id).not_to eq(pipeline_after_reload.object_id) + end + end + end + context "#started_at" do it "return the start time when the agent is started" do expect(described_class::STARTED_AT).to be_kind_of(Time)