From f56d7fc852147efe91676236ab3be187c45370bc Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Thu, 7 May 2026 09:10:32 -0700 Subject: [PATCH 1/3] `dead_letter_queue.flush_check_interval` new config for flushing staled segment files. (#19036) * Validates to be min 1s to keep consistency with the docs. Introduces new config for flushing staled segment files. * Add pipeline name to the DLQ flush thread name for better visibility in the threads API results. Add suggestions from the docs review. Re-organize the duration clam logic in a way for better maintainable and fix the unit tests. * Update logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java Remove unused method. Co-authored-by: Andrea Selva * Move the flush chech interval to the DeadLetterQueueWriter.Builder. Remove confusing scheduler from the docs explanations. unit tests for the only newly introduced conditions. * Apply suggestions from code review Doc consistency and test rename suggestions accepted. Co-authored-by: Andrea Selva * Keep the interval type as a Duration, rename and simplify test suites. --------- Co-authored-by: Andrea Selva (cherry picked from commit f2f0d3fde99787c888ff8a6849a60e8a8bd7ddc0) # Conflicts: # docs/reference/dead-letter-queues.md # logstash-core/lib/logstash/environment.rb # logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java # logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java --- config/logstash.yml | 8 + docs/reference/dead-letter-queues.md | 264 ++++++++++++++++++ logstash-core/lib/logstash/environment.rb | 28 ++ logstash-core/lib/logstash/settings.rb | 1 + .../common/DeadLetterQueueFactory.java | 22 +- .../common/io/DeadLetterQueueWriter.java | 74 ++++- .../logstash/common/io/RecordIOWriter.java | 2 +- .../execution/AbstractPipelineExt.java | 10 +- .../AbstractDeadLetterQueueWriterExtTest.java | 2 +- .../common/DeadLetterQueueFactoryTest.java | 8 +- .../common/io/DeadLetterQueueReaderTest.java | 26 +- ...DeadLetterQueueWriterAgeRetentionTest.java | 24 +- .../common/io/DeadLetterQueueWriterTest.java | 118 +++++--- 13 files changed, 502 insertions(+), 85 deletions(-) create mode 100644 docs/reference/dead-letter-queues.md diff --git a/config/logstash.yml b/config/logstash.yml index 23b4ed47a3..c1924f6139 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -278,6 +278,14 @@ # # dead_letter_queue.flush_interval: 5000 +# If using dead_letter_queue.enable: true, the interval in milliseconds that the DLQ scheduler checks for stale segments +# to be flushed. A smaller value ensures faster segment rotation at the cost of CPU with more frequent scheduler runs. +# A larger value reduces scheduler overhead but may delay segment sealing. +# Minimum value is 1000 and cannot be greater than dead_letter_queue.flush_interval. +# Default is 1000. +# +# dead_letter_queue.flush_check_interval: 1000 + # If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit. # Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit. # Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones. diff --git a/docs/reference/dead-letter-queues.md b/docs/reference/dead-letter-queues.md new file mode 100644 index 0000000000..7752e37d13 --- /dev/null +++ b/docs/reference/dead-letter-queues.md @@ -0,0 +1,264 @@ +--- +mapped_pages: + - https://www.elastic.co/guide/en/logstash/current/dead-letter-queues.html +applies_to: + stack: ga + serverless: ga +--- + +# Dead letter queues (DLQ) [dead-letter-queues] + +The dead letter queue (DLQ) is designed as a place to temporarily write events that cannot be processed. The DLQ gives you flexibility to investigate problematic events without blocking the pipeline or losing the events. Your pipeline keeps flowing, and the immediate problem is averted. But those events still need to be addressed. + +You can [process events from the DLQ](#es-proc-dlq) with the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) . + +Processing events does not delete items from the queue, and the DLQ sometimes needs attention. See [Track dead letter queue size](#dlq-size) and [Clear the dead letter queue](#dlq-clear) for more info. + +## How the dead letter queue works [dead-letter-how] + +By default, when Logstash encounters an event that it cannot process because the data contains a mapping error or some other issue, the Logstash pipeline either hangs or drops the unsuccessful event. In order to protect against data loss in this situation, you can [configure Logstash](#configuring-dlq) to write unsuccessful events to a dead letter queue instead of dropping them. + +::::{note} +The dead letter queue is currently supported only for the [{{es}} output](logstash-docs-md://lsr/plugins-outputs-elasticsearch.md) and [conditional statements evaluation](/reference/event-dependent-configuration.md#conditionals). The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried. It’s also used when a conditional evaluation encounter an error. +:::: + + +Each event written to the dead letter queue includes the original event, metadata that describes the reason the event could not be processed, information about the plugin that wrote the event, and the timestamp when the event entered the dead letter queue. + +To process events in the dead letter queue, create a Logstash pipeline configuration that uses the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) to read from the queue. See [Processing events in the dead letter queue](#processing-dlq-events) for more information. + +![Diagram showing pipeline reading from the dead letter queue](images/dead_letter_queue.png) + + +## {{es}} processing and the dead letter queue [es-proc-dlq] + +**HTTP request failure.** If the HTTP request fails (because {{es}} is unreachable or because it returned an HTTP error code), the {{es}} output retries the entire request indefinitely. In these scenarios, the dead letter queue has no opportunity to intercept. + +**HTTP request success.** The [{{es}} Bulk API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk) can perform multiple actions using the same request. If the Bulk API request is successful, it returns `200 OK`, even if some documents in the batch have [failed](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk#bulk-failures-ex). In this situation, the `errors` flag for the request will be `true`. + +The response body can include metadata indicating that one or more specific actions in the bulk request could not be performed, along with an HTTP-style status code per entry to indicate why the action could not be performed. If the DLQ is configured, individual indexing failures are routed there. + +Even if you regularly process events, events remain in the dead letter queue. The dead letter queue requires [manual intervention](#dlq-clear) to clear it. + + +## Conditional statements and the dead letter queue [conditionals-dlq] + +When a conditional statement reaches an error in processing an event, such as comparing string and integer values, the event, as it is at the time of evaluation, is inserted into the dead letter queue. + + +## Configuring {{ls}} to use dead letter queues [configuring-dlq] + +Dead letter queues are disabled by default. To enable dead letter queues, set the `dead_letter_queue_enable` option in the `logstash.yml` [settings file](/reference/logstash-settings-file.md): + +```yaml +dead_letter_queue.enable: true +``` + +Dead letter queues are stored as files in the local directory of the Logstash instance. By default, the dead letter queue files are stored in `path.data/dead_letter_queue`. Each pipeline has a separate queue. For example, the dead letter queue for the `main` pipeline is stored in `LOGSTASH_HOME/data/dead_letter_queue/main` by default. The queue files are numbered sequentially: `1.log`, `2.log`, and so on. + +You can set `path.dead_letter_queue` in the `logstash.yml` file to specify a different path for the files: + +```yaml +path.dead_letter_queue: "path/to/data/dead_letter_queue" +``` + +::::{tip} +Use the local filesystem for data integrity and performance. Network File System (NFS) is not supported. +:::: + + +Dead letter queue entries are written to a temporary file, which is then renamed to a dead letter queue segment file, which is then eligible for ingestion. The rename happens either when this temporary file is considered *full*, or when a period of time has elapsed since the last dead letter queue eligible event was written to the temporary file. + +This length of time can be set using the `dead_letter_queue.flush_interval` setting. This setting is in milliseconds, and defaults to 5000ms. A low value here will mean in the event of infrequent writes to the dead letter queue more, smaller, queue files may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and being made available for reading by the dead_letter_queue input. + +``` +Note that this value cannot be set to lower than 1000ms. +``` +```yaml +dead_letter_queue.flush_interval: 5000 +``` + +Stale segments files are periodically checked if they need to be flushed. This period is controlled by the `dead_letter_queue.flush_check_interval` setting. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent segment checks execution. A larger value reduces checks overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. + +```yaml +dead_letter_queue.flush_check_interval: 1000 +``` + +::::{note} +You cannot use the same `dead_letter_queue` path for two different Logstash instances. +:::: + + +### File rotation [file-rotation] + +Dead letter queues have a built-in file rotation policy that manages the file size of the queue. When the file size reaches a preconfigured threshold, a new file is created automatically. + + +### Size management [size-management] + +By default, the maximum size of each dead letter queue is set to 1024mb. To change this setting, use the `dead_letter_queue.max_bytes` option. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. Use the `dead_letter_queue.storage_policy` option to control which entries should be dropped to avoid exceeding the size limit. Set the value to `drop_newer` (default) to stop accepting new values that would push the file size over the limit. Set the value to `drop_older` to remove the oldest events to make space for new ones. + +#### Age policy [age-policy] + +You can use the age policy to automatically control the volume of events in the dead letter queue. Use the `dead_letter_queue.retain.age` setting (in `logstash.yml` or `pipelines.yml`) to have {{ls}} remove events that are older than a value you define. Available time units are `d`, `h`, `m`, `s` respectively for days, hours, minutes and seconds. There is no default time unit, so you need to specify it. + +```yaml +dead_letter_queue.retain.age: 2d +``` + +The age policy is verified and applied on event writes and during pipeline shutdown. For that reason, your dead-letter-queue folder may store expired events for longer than specified, and the reader pipeline could possibly encounter outdated events. + + + +### Automatic cleaning of consumed events [auto-clean] + +By default, the dead letter queue input plugin does not remove the events that it consumes. Instead, it commits a reference to avoid re-processing events. Use the `clean_consumed` setting in the dead letter queue input plugin in order to remove segments that have been fully consumed, freeing space while processing. + +```yaml +input { + dead_letter_queue { + path => "/path/to/data/dead_letter_queue" + pipeline_id => "main" + clean_consumed => true + } +} +``` + + + +## Processing events in the dead letter queue [processing-dlq-events] + +When you are ready to process events in the dead letter queue, you create a pipeline that uses the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) to read from the dead letter queue. The pipeline configuration that you use depends, of course, on what you need to do. For example, if the dead letter queue contains events that resulted from a mapping error in Elasticsearch, you can create a pipeline that reads the "dead" events, removes the field that caused the mapping issue, and re-indexes the clean events into Elasticsearch. + +The following example shows a simple pipeline that reads events from the dead letter queue and writes the events, including metadata, to standard output: + +```yaml +input { + dead_letter_queue { + path => "/path/to/data/dead_letter_queue" <1> + commit_offsets => true <2> + pipeline_id => "main" <3> + } +} + +output { + stdout { + codec => rubydebug { metadata => true } + } +} +``` + +1. The path to the top-level directory containing the dead letter queue. This directory contains a separate folder for each pipeline that writes to the dead letter queue. To find the path to this directory, look at the `logstash.yml` [settings file](/reference/logstash-settings-file.md). By default, Logstash creates the `dead_letter_queue` directory under the location used for persistent storage (`path.data`), for example, `LOGSTASH_HOME/data/dead_letter_queue`. However, if `path.dead_letter_queue` is set, it uses that location instead. +2. When `true`, saves the offset. When the pipeline restarts, it will continue reading from the position where it left off rather than reprocessing all the items in the queue. You can set `commit_offsets` to `false` when you are exploring events in the dead letter queue and want to iterate over the events multiple times. +3. The ID of the pipeline that’s writing to the dead letter queue. The default is `"main"`. + + +For another example, see [Example: Processing data that has mapping errors](#dlq-example). + +When the pipeline has finished processing all the events in the dead letter queue, it will continue to run and process new events as they stream into the queue. This means that you do not need to stop your production system to handle events in the dead letter queue. + +::::{note} +Events emitted from the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) plugin will not be resubmitted to the dead letter queue if they cannot be processed correctly. +:::: + + + +## Reading from a timestamp [dlq-timestamp] + +When you read from the dead letter queue, you might not want to process all the events in the queue, especially if there are a lot of old events in the queue. You can start processing events at a specific point in the queue by using the `start_timestamp` option. This option configures the pipeline to start processing events based on the timestamp of when they entered the queue: + +```yaml +input { + dead_letter_queue { + path => "/path/to/data/dead_letter_queue" + start_timestamp => "2017-06-06T23:40:37" + pipeline_id => "main" + } +} +``` + +For this example, the pipeline starts reading all events that were delivered to the dead letter queue on or after June 6, 2017, at 23:40:37. + + +## Example: Processing data that has mapping errors [dlq-example] + +In this example, the user attempts to index a document that includes geo_ip data, but the data cannot be processed because it contains a mapping error: + +```json +{"geoip":{"location":"home"}} +``` + +Indexing fails because the Logstash output plugin expects a `geo_point` object in the `location` field, but the value is a string. The failed event is written to the dead letter queue, along with metadata about the error that caused the failure: + +```json +{ + "@metadata" => { + "dead_letter_queue" => { + "entry_time" => #, + "plugin_id" => "fb80f1925088497215b8d037e622dec5819b503e-4", + "plugin_type" => "elasticsearch", + "reason" => "Could not index event to Elasticsearch. status: 400, action: [\"index\", {:_id=>nil, :_index=>\"logstash-2017.06.22\", :_type=>\"doc\", :_routing=>nil}, 2017-06-22T01:29:29.804Z My-MacBook-Pro-2.local {\"geoip\":{\"location\":\"home\"}}], response: {\"index\"=>{\"_index\"=>\"logstash-2017.06.22\", \"_type\"=>\"doc\", \"_id\"=>\"AVzNayPze1iR9yDdI2MD\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse\", \"caused_by\"=>{\"type\"=>\"illegal_argument_exception\", \"reason\"=>\"illegal latitude value [266.30859375] for geoip.location\"}}}}" + } + }, + "@timestamp" => 2017-06-22T01:29:29.804Z, + "@version" => "1", + "geoip" => { + "location" => "home" + }, + "host" => "My-MacBook-Pro-2.local", + "message" => "{\"geoip\":{\"location\":\"home\"}}" +} +``` + +To process the failed event, you create the following pipeline that reads from the dead letter queue and removes the mapping problem: + +```json +input { + dead_letter_queue { + path => "/path/to/data/dead_letter_queue/" <1> + } +} +filter { + mutate { + remove_field => "[geoip][location]" <2> + } +} +output { + elasticsearch{ + hosts => [ "localhost:9200" ] <3> + } +} +``` + +1. The [`dead_letter_queue` input](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) reads from the dead letter queue. +2. The `mutate` filter removes the problem field called `location`. +3. The clean event is sent to Elasticsearch, where it can be indexed because the mapping issue is resolved. + + + +## Track dead letter queue size [dlq-size] + +Monitor the size of the dead letter queue before it becomes a problem. By checking it periodically, you can determine the maximum queue size that makes sense for each pipeline. + +The size of the DLQ for each pipeline is available in the node stats API. + +```txt +pipelines.${pipeline_id}.dead_letter_queue.queue_size_in_bytes. +``` + +Where `{{pipeline_id}}` is the name of a pipeline with DLQ enabled. + + +## Clear the dead letter queue [dlq-clear] + +The dead letter queue cannot be cleared with the upstream pipeline running. + +The dead letter queue is a directory of pages. To clear it, stop the pipeline and delete location/. + +```txt +${path.data}/dead_letter_queue/${pipeline_id} +``` + +Where `{{pipeline_id}}` is the name of a pipeline with DLQ enabled. + +The pipeline creates a new dead letter queue when it starts again. diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 3a86cf73a4..c41901d7c1 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -90,6 +90,7 @@ module Environment Setting::ExistingFilePath.new("api.ssl.keystore.path", nil, false).nullable, Setting::Password.new("api.ssl.keystore.password", nil, false).nullable, Setting::StringArray.new("api.ssl.supported_protocols", nil, true, %w[TLSv1 TLSv1.1 TLSv1.2 TLSv1.3]), +<<<<<<< HEAD Setting::SettingString.new("queue.type", "memory", true, ["persisted", "memory"]), Setting::Boolean.new("queue.drain", false), Setting::Bytes.new("queue.page_capacity", "64mb"), @@ -112,6 +113,33 @@ module Environment Setting::SettingString.new("keystore.file", ::File.join(::File.join(LogStash::Environment::LOGSTASH_HOME, "config"), "logstash.keystore"), false), # will be populated on Setting::SettingNullableString.new("monitoring.cluster_uuid"), Setting::SettingString.new("pipeline.buffer.type", nil, false, ["direct", "heap"]) +======= + Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "minimal", true, ["disabled", "minimal", "full"]), + Setting::StringSetting.new("queue.type", "memory", true, ["persisted", "memory"]), + Setting::BooleanSetting.new("queue.drain", false), + Setting::BytesSetting.new("queue.page_capacity", "64mb"), + Setting::BytesSetting.new("queue.max_bytes", "1024mb"), + Setting::NumericSetting.new("queue.max_events", 0), # 0 is unlimited + Setting::NumericSetting.new("queue.checkpoint.acks", 1024), # 0 is unlimited + Setting::NumericSetting.new("queue.checkpoint.writes", 1024), # 0 is unlimited + Setting::NumericSetting.new("queue.checkpoint.interval", 1000), # remove it for #17155 + Setting::BooleanSetting.new("queue.checkpoint.retry", true), + Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)), + Setting::BooleanSetting.new("dead_letter_queue.enable", false), + Setting::BytesSetting.new("dead_letter_queue.max_bytes", "1024mb"), + Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000), + Setting::NumericSetting.new("dead_letter_queue.flush_check_interval", 1000), + Setting::StringSetting.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]), + Setting::NullableStringSetting.new("dead_letter_queue.retain.age"), # example 5d + Setting::TimeValueSetting.new("slowlog.threshold.warn", "-1"), + Setting::TimeValueSetting.new("slowlog.threshold.info", "-1"), + Setting::TimeValueSetting.new("slowlog.threshold.debug", "-1"), + Setting::TimeValueSetting.new("slowlog.threshold.trace", "-1"), + Setting::StringSetting.new("keystore.classname", "org.logstash.secret.store.backend.JavaKeyStore"), + Setting::StringSetting.new("keystore.file", ::File.join(::File.join(LogStash::Environment::LOGSTASH_HOME, "config"), "logstash.keystore"), false), # will be populated on + Setting::NullableStringSetting.new("monitoring.cluster_uuid"), + Setting::StringSetting.new("pipeline.buffer.type", "heap", true, ["direct", "heap"]) +>>>>>>> f2f0d3fde (`dead_letter_queue.flush_check_interval` new config for flushing staled segment files. (#19036)) # post_process ].each {|setting| SETTINGS.register(setting) } diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index c92c8f67d0..60b714f738 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -49,6 +49,7 @@ def self.included(base) "config.string", "dead_letter_queue.enable", "dead_letter_queue.flush_interval", + "dead_letter_queue.flush_check_interval", "dead_letter_queue.max_bytes", "dead_letter_queue.storage_policy", "dead_letter_queue.retain.age", diff --git a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java index 16fbc0c5b2..2b1a476a96 100644 --- a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java +++ b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java @@ -76,15 +76,16 @@ private DeadLetterQueueFactory() { * @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written * that would make the size of this dlq greater than this value * @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent. + * @param flushCheckInterval The interval between scheduler checks for stale segments. * @param storageType overwriting type in case of queue full: drop_older or drop_newer. * @return write manager for the specific id's dead-letter-queue context */ - public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType) { - return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType)); + public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType) { + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType)); } /** - * Like {@link #getWriter(String, String, long, Duration, QueueStorageType)} but also setting the age duration + * Like {@link #getWriter(String, String, long, Duration, Duration, QueueStorageType)} but also setting the age duration * of the segments. * * @param id The identifier context for this dlq manager @@ -93,12 +94,13 @@ public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long ma * @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written * that would make the size of this dlq greater than this value * @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent. + * @param flushCheckInterval The interval between scheduler checks for stale segments. * @param storageType overwriting type in case of queue full: drop_older or drop_newer. * @param age the period that DLQ events should be considered as valid, before automatic removal. * @return write manager for the specific id's dead-letter-queue context * */ - public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType, Duration age) { - return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType, age)); + public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType, Duration age) { + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType, age)); } public static DeadLetterQueueWriter release(String id) { @@ -106,10 +108,11 @@ public static DeadLetterQueueWriter release(String id) { } private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, - final Duration flushInterval, final QueueStorageType storageType) { + final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType) { try { return DeadLetterQueueWriter - .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) + .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval) + .pipelineId(id) .storageType(storageType) .build(); } catch (IOException e) { @@ -119,11 +122,12 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq } private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, - final Duration flushInterval, final QueueStorageType storageType, + final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType, final Duration age) { try { return DeadLetterQueueWriter - .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval) + .newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval) + .pipelineId(id) .storageType(storageType) .retentionTime(age) .build(); diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index 23c4e7e8f2..d4dfc3af67 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -124,6 +124,9 @@ public String toString() { } } + static final Duration MIN_FLUSH_PERIOD = Duration.ofMillis(1000); + static final Duration MIN_FLUSH_CHECK_INTERVAL = Duration.ofMillis(1000); + @VisibleForTesting static final String SEGMENT_FILE_PATTERN = "%d.log"; private static final Logger logger = LogManager.getLogger(DeadLetterQueueWriter.class); @@ -174,21 +177,25 @@ interface SchedulerService { private static class FixedRateScheduler implements SchedulerService { private final ScheduledExecutorService scheduledExecutor; + private final Duration flushCheckInterval; - FixedRateScheduler() { + FixedRateScheduler(final Duration flushCheckInterval, final String pipelineId) { + //Set the name with pipeline ID for better visibility + final String threadName = pipelineId != null ? "dlq-flush-check[" + pipelineId + "]" : "dlq-flush-check"; + + this.flushCheckInterval = flushCheckInterval; scheduledExecutor = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r); //Allow this thread to die when the JVM dies t.setDaemon(true); - //Set the name - t.setName("dlq-flush-check"); + t.setName(threadName); return t; }); } @Override public void repeatedAction(Runnable action) { - scheduledExecutor.scheduleAtFixedRate(action, 1L, 1L, TimeUnit.SECONDS); + scheduledExecutor.scheduleAtFixedRate(action, flushCheckInterval.toMillis(), flushCheckInterval.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -215,21 +222,24 @@ public static final class Builder { private final long maxSegmentSize; private final long maxQueueSize; private final Duration flushInterval; + private final Duration flushCheckInterval; private boolean startScheduledFlusher; private QueueStorageType storageType = QueueStorageType.DROP_NEWER; private Duration retentionTime = null; private Clock clock = Clock.systemDefaultZone(); private SchedulerService customSchedulerService = null; + private String pipelineId; - private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval) { - this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, true); + private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval) { + this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval, true); } - private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval, boolean startScheduledFlusher) { + private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval, boolean startScheduledFlusher) { this.queuePath = queuePath; this.maxSegmentSize = maxSegmentSize; this.maxQueueSize = maxQueueSize; this.flushInterval = flushInterval; + this.flushCheckInterval = flushCheckInterval; this.startScheduledFlusher = startScheduledFlusher; } @@ -243,6 +253,11 @@ public Builder retentionTime(Duration retentionTime) { return this; } + public Builder pipelineId(final String pipelineId) { + this.pipelineId = pipelineId; + return this; + } + @VisibleForTesting Builder clock(Clock clock) { this.clock = clock; @@ -259,29 +274,64 @@ public DeadLetterQueueWriter build() throws IOException { if (customSchedulerService != null && startScheduledFlusher) { throw new IllegalArgumentException("Both default scheduler and custom scheduler were defined, "); } + + final Duration normalizedFlushInterval = normalizeFlushInterval(flushInterval); + SchedulerService schedulerService; if (customSchedulerService != null) { schedulerService = customSchedulerService; } else { if (startScheduledFlusher) { - schedulerService = new FixedRateScheduler(); + final Duration normalizedFlushCheckInterval = normalizeFlushCheckInterval(flushCheckInterval, normalizedFlushInterval); + schedulerService = new FixedRateScheduler(normalizedFlushCheckInterval, pipelineId); } else { schedulerService = new NoopScheduler(); } } - return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, flushInterval, storageType, retentionTime, clock, schedulerService); + return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, normalizedFlushInterval, storageType, retentionTime, clock, schedulerService); + } + + @VisibleForTesting + Duration normalizeFlushInterval(final Duration flushInterval) { + if (!startScheduledFlusher) return flushInterval; + + Duration effectiveFlushInterval = flushInterval; + if (flushInterval.compareTo(MIN_FLUSH_PERIOD) < 0) { + logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms", + flushInterval.toMillis(), MIN_FLUSH_PERIOD.toMillis(), MIN_FLUSH_PERIOD.toMillis()); + effectiveFlushInterval = MIN_FLUSH_PERIOD; + } + return effectiveFlushInterval; + } + + @VisibleForTesting + Duration normalizeFlushCheckInterval(final Duration flushCheckInterval, final Duration effectiveFlushInterval) { + Duration effectiveFlushCheckInterval = flushCheckInterval; + // can't exceed flush interval + if (effectiveFlushCheckInterval.compareTo(effectiveFlushInterval) > 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms", + effectiveFlushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis()); + effectiveFlushCheckInterval = effectiveFlushInterval; + } + // can't be less than 1s + if (effectiveFlushCheckInterval.compareTo(MIN_FLUSH_CHECK_INTERVAL) < 0) { + logger.warn("dead_letter_queue.flush_check_interval ({} ms) is below the minimum of {} ms; using {} ms for the flush check interval", + effectiveFlushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis()); + effectiveFlushCheckInterval = MIN_FLUSH_CHECK_INTERVAL; + } + return effectiveFlushCheckInterval; } } public static Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, - final Duration flushInterval) { - return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval); + final Duration flushInterval, final Duration flushCheckInterval) { + return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval); } @VisibleForTesting static Builder newBuilderWithoutFlusher(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) { - return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, false); + return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, Duration.ZERO, false); } private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java index 34d3b5024f..d529e97e34 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOWriter.java @@ -173,7 +173,7 @@ public boolean hasWritten(){ return lastWrite != null; } - public boolean isStale(Duration flushPeriod){ + public boolean isStale(final Duration flushPeriod) { return hasWritten() && Instant.now().minus(flushPeriod).isAfter(lastWrite); } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index ec26b9b073..c76d3c5a2f 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -399,17 +399,23 @@ private DeadLetterQueueWriter createDeadLetterQueueWriterFromSettings(ThreadCont final QueueStorageType storageType = QueueStorageType.parse(getSetting(context, "dead_letter_queue.storage_policy").asJavaString()); String dlqPath = getSetting(context, "path.dead_letter_queue").asJavaString(); +<<<<<<< HEAD long dlqMaxBytes = getSetting(context, "dead_letter_queue.max_bytes").convertToInteger().getLongValue(); Duration dlqFlushInterval = Duration.ofMillis(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger().getLongValue()); +======= + long dlqMaxBytes = org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.max_bytes").convertToInteger()); + Duration dlqFlushInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger())); + Duration dlqFlushCheckInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_check_interval").convertToInteger())); +>>>>>>> f2f0d3fde (`dead_letter_queue.flush_check_interval` new config for flushing staled segment files. (#19036)) if (hasSetting(context, "dead_letter_queue.retain.age") && !getSetting(context, "dead_letter_queue.retain.age").isNil()) { // convert to Duration final Duration age = parseToDuration(getSetting(context, "dead_letter_queue.retain.age").convertToString().toString()); return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, - dlqFlushInterval, storageType, age); + dlqFlushInterval, dlqFlushCheckInterval, storageType, age); } - return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, storageType); + return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, dlqFlushCheckInterval, storageType); } /** diff --git a/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java b/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java index 335dec971f..8e87bf7eb1 100644 --- a/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/AbstractDeadLetterQueueWriterExtTest.java @@ -60,7 +60,7 @@ private void writeAnEventIntoDLQ(Path dlqPath, String pluginId, String pluginTyp RubyString id = RubyString.newString(RubyUtil.RUBY, pluginId); RubyString classConfigName = RubyString.newString(RubyUtil.RUBY, pluginType); - final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), QueueStorageType.DROP_NEWER); + final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); IRubyObject dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter); final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt( diff --git a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java index cf85824d7a..6802d97849 100644 --- a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java +++ b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java @@ -69,9 +69,9 @@ public void setUp() throws Exception { public void testSameBeforeRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); - DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertSame(writer, writer2); writer.close(); } finally { @@ -83,11 +83,11 @@ public void testSameBeforeRelease() throws IOException { public void testOpenableAfterRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); DeadLetterQueueFactory.release(PIPELINE_NAME); - writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); + writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); }finally{ diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index d6b2365a78..82cde713b2 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -157,7 +157,7 @@ public void testRereadFinalBlock() throws Exception { long startTime = System.currentTimeMillis(); int messageSize = 0; try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), constantSerializationLengthTimestamp(startTime++)); @@ -214,7 +214,7 @@ private void writeSegmentSizeEntries(int count) throws IOException { DLQEntry templateEntry = new DLQEntry(event, "1", "1", String.valueOf(0), constantSerializationLengthTimestamp(startTime)); int size = templateEntry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE + VERSION_SIZE; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, size, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, size, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i <= count; i++) { writeManager.writeEntry(new DLQEntry(event, "1", "1", String.valueOf(i), constantSerializationLengthTimestamp(startTime++))); @@ -248,7 +248,7 @@ public void testBlockBoundary() throws Exception { Timestamp timestamp = constantSerializationLengthTimestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); @@ -273,7 +273,7 @@ public void testBlockBoundaryMultiple() throws Exception { long startTime = System.currentTimeMillis(); int messageSize = 0; try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i <= 5; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", constantSerializationLengthTimestamp(startTime++)); @@ -298,7 +298,7 @@ public void testFlushAfterWriterClose() throws Exception { Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 6; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -320,7 +320,7 @@ public void testFlushAfterSegmentComplete() throws Exception { event.setField("T", generateMessageContent(PAD_FOR_BLOCK_SIZE_EVENT)); Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE * EVENTS_BEFORE_FLUSH, defaultDlqSize, Duration.ofHours(1)) + .newBuilder(dir, BLOCK_SIZE * EVENTS_BEFORE_FLUSH, defaultDlqSize, Duration.ofHours(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < EVENTS_BEFORE_FLUSH; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -355,7 +355,7 @@ public void testMultiFlushAfterSegmentComplete() throws Exception { Timestamp timestamp = new Timestamp(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE * eventsInSegment, defaultDlqSize, Duration.ofHours(1)) + .newBuilder(dir, BLOCK_SIZE * eventsInSegment, defaultDlqSize, Duration.ofHours(1), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < totalEventsToWrite; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -398,7 +398,7 @@ public void testFlushAfterDelay() throws Exception { System.out.println("events per block= " + eventsPerBlock); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(2)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(2), Duration.ofSeconds(1)) .build()) { for (int i = 1; i < eventsToWrite; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); @@ -432,7 +432,7 @@ public void testBlockAndSegmentBoundary() throws Exception { Timestamp timestamp = constantSerializationLengthTimestamp(); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); @@ -455,7 +455,7 @@ public void testWriteReadRandomEventSize() throws Exception { long startTime = System.currentTimeMillis(); try(DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < eventCount; i++) { event.setField("message", generateMessageContent((int)(Math.random() * (maxEventSize)))); @@ -575,7 +575,7 @@ public void testConcurrentWriteReadRandomEventSize() throws Exception { final Event event = new Event(); long startTime = System.currentTimeMillis(); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(10)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(10), Duration.ofSeconds(1)) .build()) { for (int i = 0; i < eventCount; i++) { event.setField( @@ -920,7 +920,7 @@ private void seekReadAndVerify(final Timestamp seekTarget, final String expected private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException { try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { for (int i = offset; i <= offset + numberOfEvents; i++) { DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++)); @@ -943,7 +943,7 @@ private int prepareFilledSegmentFiles(int segments, long start) throws IOExcepti final int maxSegmentSize = 10 * MB; final int loopPerSegment = (int) Math.floor((maxSegmentSize - 1.0) / BLOCK_SIZE); try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, maxSegmentSize, defaultDlqSize, Duration.ofSeconds(1)) + .newBuilder(dir, maxSegmentSize, defaultDlqSize, Duration.ofSeconds(1), Duration.ofSeconds(1)) .build()) { final int loops = loopPerSegment * segments; for (int i = 0; i < loops; i++) { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java index 6edcf46b92..940f1b41d0 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java @@ -94,6 +94,9 @@ void executeAction() { private SynchronizedScheduledService synchScheduler; + private static final long MAX_SEGMENT_SIZE = 10 * MB; + private static final long MAX_QUEUE_SIZE = 1 * GB; + @Before public void setUp() throws Exception { dir = temporaryFolder.newFolder().toPath(); @@ -102,6 +105,11 @@ public void setUp() throws Exception { synchScheduler = new SynchronizedScheduledService(); } + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath) { + return DeadLetterQueueWriter + .newBuilder(queuePath, MAX_SEGMENT_SIZE, MAX_QUEUE_SIZE, Duration.ofSeconds(1), Duration.ofSeconds(1)); + } + @Test public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegments() throws IOException { final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); @@ -115,8 +123,7 @@ public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegme // Exercise final long prevQueueSize; final long beheadedQueueSize; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(Duration.ofDays(2)) .clock(fakeClock) .build()) { @@ -139,8 +146,7 @@ private void prepareDLQWithFirstSegmentOlderThanRetainPeriod(Event event, Forwar final Duration littleMoreThanRetainedPeriod = retainedPeriod.plusMinutes(1); long startTime = fakeClock.instant().minus(littleMoreThanRetainedPeriod).toEpochMilli(); int messageSize = 0; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { @@ -166,8 +172,7 @@ public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments() int messageSize = 0; final Duration retention = Duration.ofDays(2); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -209,8 +214,7 @@ public void testRemoveMultipleOldestSegmentsWhenRetainedAgeIsExceeded() throws I int messageSize = 0; final Duration retention = Duration.ofDays(2); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retention) .clock(fakeClock) .build()) { @@ -304,9 +308,7 @@ public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentWriterIsStale() final ForwardableClock fakeClock = new ForwardableClock(pointInTimeFixedClock); Duration retainedPeriod = Duration.ofDays(1); - Duration flushInterval = Duration.ofSeconds(1); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 1 * GB, flushInterval) + try (DeadLetterQueueWriter writeManager = newBuilder(dir) .retentionTime(retainedPeriod) .clock(fakeClock) .build()) { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index 040a06dbd2..511a541d7e 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -72,11 +72,26 @@ public void setUp() throws Exception { private static long EMPTY_DLQ = VERSION_SIZE; // Only the version field has been written + private static final long DEFAULT_MAX_SEGMENT_SIZE = 1_000; + private static final long DEFAULT_MAX_QUEUE_SIZE = 100_000; + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath) { + return newBuilder(queuePath, DEFAULT_MAX_SEGMENT_SIZE, DEFAULT_MAX_QUEUE_SIZE); + } + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) { + return newBuilder(queuePath, maxSegmentSize, maxQueueSize, Duration.ofSeconds(1)); + } + + private static DeadLetterQueueWriter.Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, final Duration flushInterval) { + return DeadLetterQueueWriter + .newBuilder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, Duration.ofSeconds(1)); + } + @Test public void testLockFileManagement() throws Exception { Path lockFile = dir.resolve(".lock"); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); assertTrue(Files.exists(lockFile)); writer.close(); @@ -85,12 +100,10 @@ public void testLockFileManagement() throws Exception { @Test public void testFileLocking() throws Exception { - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); try { - DeadLetterQueueWriter - .newBuilder(dir, 100, 1_000, Duration.ofSeconds(1)) + newBuilder(dir, 100, 1_000) .build(); fail(); } catch (LockException e) { @@ -103,8 +116,7 @@ public void testFileLocking() throws Exception { public void testUncleanCloseOfPreviousWriter() throws Exception { Path lockFilePath = dir.resolve(".lock"); boolean created = lockFilePath.toFile().createNewFile(); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); FileChannel channel = FileChannel.open(lockFilePath, StandardOpenOption.WRITE); @@ -120,8 +132,7 @@ public void testUncleanCloseOfPreviousWriter() throws Exception { @Test public void testWrite() throws Exception { - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason"); writer.writeEntry(entry); @@ -135,8 +146,7 @@ public void testDoesNotWriteMessagesAlreadyRoutedThroughDLQ() throws Exception { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason"); DLQEntry dlqEntry = new DLQEntry(dlqEvent, "type", "id", "reason"); - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir) .build()) { writer.writeEntry(entry); long dlqLengthAfterEvent = dlqLength(); @@ -156,8 +166,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { long MAX_QUEUE_LENGTH = payloadLength * MESSAGE_COUNT; - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, payloadLength, MAX_QUEUE_LENGTH) .build()) { for (int i = 0; i < MESSAGE_COUNT; i++) @@ -174,8 +183,7 @@ public void testDoesNotWriteBeyondLimit() throws Exception { @Test public void testSlowFlush() throws Exception { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1_000, 1_000_000) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -196,8 +204,7 @@ public void testSlowFlush() throws Exception { @Test public void testNotFlushed() throws Exception { - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, BLOCK_SIZE, 1_000_000_000, Duration.ofSeconds(5)) .build()) { for (int i = 0; i < 4; i++) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); @@ -219,8 +226,7 @@ public void testNotFlushed() throws Exception { @Test public void testCloseFlush() throws Exception { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1_000, 1_000_000, Duration.ofHours(1)) .build()) { DLQEntry entry = new DLQEntry(new Event(), "type", "id", "1"); writer.writeEntry(entry); @@ -255,8 +261,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE long startTime = System.currentTimeMillis(); int messageSize = 0; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // 320 generates 10 Mb of data @@ -287,8 +292,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE final long prevQueueSize; final long beheadedQueueSize; long droppedEvent; - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .storageType(QueueStorageType.DROP_OLDER) .build()) { prevQueueSize = writeManager.getCurrentQueueSize(); @@ -325,8 +329,7 @@ public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsE @Test public void testRemoveSegmentsOrder() throws IOException { - try (DeadLetterQueueWriter sut = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter sut = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // create some segments files Files.createFile(dir.resolve("9.log")); @@ -495,8 +498,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I Event bigEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); bigEvent.setField("message", DeadLetterQueueReaderTest.generateMessageContent(2 * BLOCK_SIZE)); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) .build()) { // enqueue a record with size smaller than BLOCK_SIZE DLQEntry entry = new DLQEntry(blockAlmostFullEvent, "", "", "00001", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(System.currentTimeMillis())); @@ -512,9 +514,14 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I // fill the queue to push out the segment with the 2 previous events Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); +<<<<<<< HEAD event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479)); try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) +======= + event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500)); + try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) +>>>>>>> f2f0d3fde (`dead_letter_queue.flush_check_interval` new config for flushing staled segment files. (#19036)) .storageType(QueueStorageType.DROP_NEWER) .build()) { @@ -543,16 +550,14 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I public void testInitializeWriterWith1ByteEntry() throws Exception { Files.write(dir.resolve("1.log"), "1".getBytes()); - DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1_000, 100_000, Duration.ofSeconds(1)) + DeadLetterQueueWriter writer = newBuilder(dir) .build(); writer.close(); } @Test public void givenDLQWriterCreatedSomeSegmentsWhenReaderWithCleanConsumedNotifyTheDeletionOfSomeThenWriterUpdatesItsMetricsSize() throws IOException, InterruptedException { - try (DeadLetterQueueWriter writer = DeadLetterQueueWriter - .newBuilder(dir, 1 * MB, 100 * MB, Duration.ofSeconds(1)) + try (DeadLetterQueueWriter writer = newBuilder(dir, 1 * MB, 100 * MB) .build()) { // fill at least 3 segments @@ -616,4 +621,53 @@ private long countDlqSegments(Path dir) throws IOException { } } + @Test + public void givenFlushIntervalGreaterThanMinimumWhenNormalizedThenRemainsUnmodified() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration result = builder.normalizeFlushInterval(Duration.ofSeconds(10)); + assertEquals("Valid flush interval should remain unchanged", Duration.ofSeconds(10), result); + } + + @Test + public void givenFlushIntervalBelowTheMinimumWhenNormalizedThenIsClampedToMinimum() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration result = builder.normalizeFlushInterval(Duration.ofMillis(100)); + assertEquals("Flush interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); + } + + @Test + public void testNormalizeFlushCheckIntervalWithinLimits() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(5); + Duration flushCheckInterval = Duration.ofSeconds(2); + Duration result = builder.normalizeFlushCheckInterval(flushCheckInterval, flushInterval); + assertEquals("Valid flush check interval should remain unchanged", flushCheckInterval, result); + } + + @Test + public void givenFlushCheckIntervalBelowMinimumWhenNormalizedThenClampedToMinimum() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(5); + Duration belowMinimum = Duration.ofMillis(500); + Duration result = builder.normalizeFlushCheckInterval(belowMinimum, flushInterval); + assertEquals("Flush check interval below 1s should be clamped to 1s", Duration.ofSeconds(1), result); + } + + @Test + public void givenFlushCheckIntervalExceedsFlushIntervalWhenNormalizedThenClampedToFlushInterval() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(3); + Duration aboveFlushInterval = Duration.ofSeconds(5); + Duration result = builder.normalizeFlushCheckInterval(aboveFlushInterval, flushInterval); + assertEquals("Flush check interval exceeding flush interval should be clamped to flush interval", flushInterval, result); + } + + @Test + public void givenFlushCheckIntervalJustBelowFlushIntervalWhenNormalizedThenAccepted() { + DeadLetterQueueWriter.Builder builder = newBuilder(dir); + Duration flushInterval = Duration.ofSeconds(5); + Duration justBelowFlushInterval = Duration.ofMillis(4900); + Duration result = builder.normalizeFlushCheckInterval(justBelowFlushInterval, flushInterval); + assertEquals("Flush check interval just below flush interval should be accepted", justBelowFlushInterval, result); + } } From faeac1ba7dfb7a3073c8e0f2d7e5c2a2d7039033 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 7 May 2026 11:52:15 -0700 Subject: [PATCH 2/3] Convert MD format to ASCII docs in 8.19 --- docs/reference/dead-letter-queues.md | 264 ------------------------ docs/static/dead-letter-queues.asciidoc | 15 +- 2 files changed, 14 insertions(+), 265 deletions(-) delete mode 100644 docs/reference/dead-letter-queues.md diff --git a/docs/reference/dead-letter-queues.md b/docs/reference/dead-letter-queues.md deleted file mode 100644 index 7752e37d13..0000000000 --- a/docs/reference/dead-letter-queues.md +++ /dev/null @@ -1,264 +0,0 @@ ---- -mapped_pages: - - https://www.elastic.co/guide/en/logstash/current/dead-letter-queues.html -applies_to: - stack: ga - serverless: ga ---- - -# Dead letter queues (DLQ) [dead-letter-queues] - -The dead letter queue (DLQ) is designed as a place to temporarily write events that cannot be processed. The DLQ gives you flexibility to investigate problematic events without blocking the pipeline or losing the events. Your pipeline keeps flowing, and the immediate problem is averted. But those events still need to be addressed. - -You can [process events from the DLQ](#es-proc-dlq) with the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) . - -Processing events does not delete items from the queue, and the DLQ sometimes needs attention. See [Track dead letter queue size](#dlq-size) and [Clear the dead letter queue](#dlq-clear) for more info. - -## How the dead letter queue works [dead-letter-how] - -By default, when Logstash encounters an event that it cannot process because the data contains a mapping error or some other issue, the Logstash pipeline either hangs or drops the unsuccessful event. In order to protect against data loss in this situation, you can [configure Logstash](#configuring-dlq) to write unsuccessful events to a dead letter queue instead of dropping them. - -::::{note} -The dead letter queue is currently supported only for the [{{es}} output](logstash-docs-md://lsr/plugins-outputs-elasticsearch.md) and [conditional statements evaluation](/reference/event-dependent-configuration.md#conditionals). The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried. It’s also used when a conditional evaluation encounter an error. -:::: - - -Each event written to the dead letter queue includes the original event, metadata that describes the reason the event could not be processed, information about the plugin that wrote the event, and the timestamp when the event entered the dead letter queue. - -To process events in the dead letter queue, create a Logstash pipeline configuration that uses the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) to read from the queue. See [Processing events in the dead letter queue](#processing-dlq-events) for more information. - -![Diagram showing pipeline reading from the dead letter queue](images/dead_letter_queue.png) - - -## {{es}} processing and the dead letter queue [es-proc-dlq] - -**HTTP request failure.** If the HTTP request fails (because {{es}} is unreachable or because it returned an HTTP error code), the {{es}} output retries the entire request indefinitely. In these scenarios, the dead letter queue has no opportunity to intercept. - -**HTTP request success.** The [{{es}} Bulk API](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk) can perform multiple actions using the same request. If the Bulk API request is successful, it returns `200 OK`, even if some documents in the batch have [failed](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk#bulk-failures-ex). In this situation, the `errors` flag for the request will be `true`. - -The response body can include metadata indicating that one or more specific actions in the bulk request could not be performed, along with an HTTP-style status code per entry to indicate why the action could not be performed. If the DLQ is configured, individual indexing failures are routed there. - -Even if you regularly process events, events remain in the dead letter queue. The dead letter queue requires [manual intervention](#dlq-clear) to clear it. - - -## Conditional statements and the dead letter queue [conditionals-dlq] - -When a conditional statement reaches an error in processing an event, such as comparing string and integer values, the event, as it is at the time of evaluation, is inserted into the dead letter queue. - - -## Configuring {{ls}} to use dead letter queues [configuring-dlq] - -Dead letter queues are disabled by default. To enable dead letter queues, set the `dead_letter_queue_enable` option in the `logstash.yml` [settings file](/reference/logstash-settings-file.md): - -```yaml -dead_letter_queue.enable: true -``` - -Dead letter queues are stored as files in the local directory of the Logstash instance. By default, the dead letter queue files are stored in `path.data/dead_letter_queue`. Each pipeline has a separate queue. For example, the dead letter queue for the `main` pipeline is stored in `LOGSTASH_HOME/data/dead_letter_queue/main` by default. The queue files are numbered sequentially: `1.log`, `2.log`, and so on. - -You can set `path.dead_letter_queue` in the `logstash.yml` file to specify a different path for the files: - -```yaml -path.dead_letter_queue: "path/to/data/dead_letter_queue" -``` - -::::{tip} -Use the local filesystem for data integrity and performance. Network File System (NFS) is not supported. -:::: - - -Dead letter queue entries are written to a temporary file, which is then renamed to a dead letter queue segment file, which is then eligible for ingestion. The rename happens either when this temporary file is considered *full*, or when a period of time has elapsed since the last dead letter queue eligible event was written to the temporary file. - -This length of time can be set using the `dead_letter_queue.flush_interval` setting. This setting is in milliseconds, and defaults to 5000ms. A low value here will mean in the event of infrequent writes to the dead letter queue more, smaller, queue files may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and being made available for reading by the dead_letter_queue input. - -``` -Note that this value cannot be set to lower than 1000ms. -``` -```yaml -dead_letter_queue.flush_interval: 5000 -``` - -Stale segments files are periodically checked if they need to be flushed. This period is controlled by the `dead_letter_queue.flush_check_interval` setting. This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster segment rotation when infrequent writes occur, at the cost of CPU consumption with more frequent segment checks execution. A larger value reduces checks overhead but delays segment sealing, for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. This value cannot be set to lower than 1000ms. - -```yaml -dead_letter_queue.flush_check_interval: 1000 -``` - -::::{note} -You cannot use the same `dead_letter_queue` path for two different Logstash instances. -:::: - - -### File rotation [file-rotation] - -Dead letter queues have a built-in file rotation policy that manages the file size of the queue. When the file size reaches a preconfigured threshold, a new file is created automatically. - - -### Size management [size-management] - -By default, the maximum size of each dead letter queue is set to 1024mb. To change this setting, use the `dead_letter_queue.max_bytes` option. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. Use the `dead_letter_queue.storage_policy` option to control which entries should be dropped to avoid exceeding the size limit. Set the value to `drop_newer` (default) to stop accepting new values that would push the file size over the limit. Set the value to `drop_older` to remove the oldest events to make space for new ones. - -#### Age policy [age-policy] - -You can use the age policy to automatically control the volume of events in the dead letter queue. Use the `dead_letter_queue.retain.age` setting (in `logstash.yml` or `pipelines.yml`) to have {{ls}} remove events that are older than a value you define. Available time units are `d`, `h`, `m`, `s` respectively for days, hours, minutes and seconds. There is no default time unit, so you need to specify it. - -```yaml -dead_letter_queue.retain.age: 2d -``` - -The age policy is verified and applied on event writes and during pipeline shutdown. For that reason, your dead-letter-queue folder may store expired events for longer than specified, and the reader pipeline could possibly encounter outdated events. - - - -### Automatic cleaning of consumed events [auto-clean] - -By default, the dead letter queue input plugin does not remove the events that it consumes. Instead, it commits a reference to avoid re-processing events. Use the `clean_consumed` setting in the dead letter queue input plugin in order to remove segments that have been fully consumed, freeing space while processing. - -```yaml -input { - dead_letter_queue { - path => "/path/to/data/dead_letter_queue" - pipeline_id => "main" - clean_consumed => true - } -} -``` - - - -## Processing events in the dead letter queue [processing-dlq-events] - -When you are ready to process events in the dead letter queue, you create a pipeline that uses the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) to read from the dead letter queue. The pipeline configuration that you use depends, of course, on what you need to do. For example, if the dead letter queue contains events that resulted from a mapping error in Elasticsearch, you can create a pipeline that reads the "dead" events, removes the field that caused the mapping issue, and re-indexes the clean events into Elasticsearch. - -The following example shows a simple pipeline that reads events from the dead letter queue and writes the events, including metadata, to standard output: - -```yaml -input { - dead_letter_queue { - path => "/path/to/data/dead_letter_queue" <1> - commit_offsets => true <2> - pipeline_id => "main" <3> - } -} - -output { - stdout { - codec => rubydebug { metadata => true } - } -} -``` - -1. The path to the top-level directory containing the dead letter queue. This directory contains a separate folder for each pipeline that writes to the dead letter queue. To find the path to this directory, look at the `logstash.yml` [settings file](/reference/logstash-settings-file.md). By default, Logstash creates the `dead_letter_queue` directory under the location used for persistent storage (`path.data`), for example, `LOGSTASH_HOME/data/dead_letter_queue`. However, if `path.dead_letter_queue` is set, it uses that location instead. -2. When `true`, saves the offset. When the pipeline restarts, it will continue reading from the position where it left off rather than reprocessing all the items in the queue. You can set `commit_offsets` to `false` when you are exploring events in the dead letter queue and want to iterate over the events multiple times. -3. The ID of the pipeline that’s writing to the dead letter queue. The default is `"main"`. - - -For another example, see [Example: Processing data that has mapping errors](#dlq-example). - -When the pipeline has finished processing all the events in the dead letter queue, it will continue to run and process new events as they stream into the queue. This means that you do not need to stop your production system to handle events in the dead letter queue. - -::::{note} -Events emitted from the [`dead_letter_queue` input plugin](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) plugin will not be resubmitted to the dead letter queue if they cannot be processed correctly. -:::: - - - -## Reading from a timestamp [dlq-timestamp] - -When you read from the dead letter queue, you might not want to process all the events in the queue, especially if there are a lot of old events in the queue. You can start processing events at a specific point in the queue by using the `start_timestamp` option. This option configures the pipeline to start processing events based on the timestamp of when they entered the queue: - -```yaml -input { - dead_letter_queue { - path => "/path/to/data/dead_letter_queue" - start_timestamp => "2017-06-06T23:40:37" - pipeline_id => "main" - } -} -``` - -For this example, the pipeline starts reading all events that were delivered to the dead letter queue on or after June 6, 2017, at 23:40:37. - - -## Example: Processing data that has mapping errors [dlq-example] - -In this example, the user attempts to index a document that includes geo_ip data, but the data cannot be processed because it contains a mapping error: - -```json -{"geoip":{"location":"home"}} -``` - -Indexing fails because the Logstash output plugin expects a `geo_point` object in the `location` field, but the value is a string. The failed event is written to the dead letter queue, along with metadata about the error that caused the failure: - -```json -{ - "@metadata" => { - "dead_letter_queue" => { - "entry_time" => #, - "plugin_id" => "fb80f1925088497215b8d037e622dec5819b503e-4", - "plugin_type" => "elasticsearch", - "reason" => "Could not index event to Elasticsearch. status: 400, action: [\"index\", {:_id=>nil, :_index=>\"logstash-2017.06.22\", :_type=>\"doc\", :_routing=>nil}, 2017-06-22T01:29:29.804Z My-MacBook-Pro-2.local {\"geoip\":{\"location\":\"home\"}}], response: {\"index\"=>{\"_index\"=>\"logstash-2017.06.22\", \"_type\"=>\"doc\", \"_id\"=>\"AVzNayPze1iR9yDdI2MD\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse\", \"caused_by\"=>{\"type\"=>\"illegal_argument_exception\", \"reason\"=>\"illegal latitude value [266.30859375] for geoip.location\"}}}}" - } - }, - "@timestamp" => 2017-06-22T01:29:29.804Z, - "@version" => "1", - "geoip" => { - "location" => "home" - }, - "host" => "My-MacBook-Pro-2.local", - "message" => "{\"geoip\":{\"location\":\"home\"}}" -} -``` - -To process the failed event, you create the following pipeline that reads from the dead letter queue and removes the mapping problem: - -```json -input { - dead_letter_queue { - path => "/path/to/data/dead_letter_queue/" <1> - } -} -filter { - mutate { - remove_field => "[geoip][location]" <2> - } -} -output { - elasticsearch{ - hosts => [ "localhost:9200" ] <3> - } -} -``` - -1. The [`dead_letter_queue` input](logstash-docs-md://lsr/plugins-inputs-dead_letter_queue.md) reads from the dead letter queue. -2. The `mutate` filter removes the problem field called `location`. -3. The clean event is sent to Elasticsearch, where it can be indexed because the mapping issue is resolved. - - - -## Track dead letter queue size [dlq-size] - -Monitor the size of the dead letter queue before it becomes a problem. By checking it periodically, you can determine the maximum queue size that makes sense for each pipeline. - -The size of the DLQ for each pipeline is available in the node stats API. - -```txt -pipelines.${pipeline_id}.dead_letter_queue.queue_size_in_bytes. -``` - -Where `{{pipeline_id}}` is the name of a pipeline with DLQ enabled. - - -## Clear the dead letter queue [dlq-clear] - -The dead letter queue cannot be cleared with the upstream pipeline running. - -The dead letter queue is a directory of pages. To clear it, stop the pipeline and delete location/. - -```txt -${path.data}/dead_letter_queue/${pipeline_id} -``` - -Where `{{pipeline_id}}` is the name of a pipeline with DLQ enabled. - -The pipeline creates a new dead letter queue when it starts again. diff --git a/docs/static/dead-letter-queues.asciidoc b/docs/static/dead-letter-queues.asciidoc index 70afd8337a..264c2fd83b 100644 --- a/docs/static/dead-letter-queues.asciidoc +++ b/docs/static/dead-letter-queues.asciidoc @@ -114,7 +114,20 @@ This length of time can be set using the `dead_letter_queue.flush_interval` sett dead_letter_queue.flush_interval: 5000 ------------------------------------------------------------------------------- -NOTE: You may not use the same `dead_letter_queue` path for two different +Stale segments files are periodically checked if they need to be flushed. +This period is controlled by the `dead_letter_queue.flush_check_interval` setting. +This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster +segment rotation when infrequent writes occur, at the cost of CPU consumption with more +frequent segment checks execution. A larger value reduces checks overhead but delays segment sealing, +for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`. +This value cannot be set to lower than 1000ms. + +[source,yaml] +------------------------------------------------------------------------------- +dead_letter_queue.flush_check_interval: 1000 +------------------------------------------------------------------------------- + +NOTE: You cannot use the same `dead_letter_queue` path for two different Logstash instances. [[file-rotation]] From bc2ff0f5c7d03de5b96782bc6d76abefb2023255 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 7 May 2026 12:04:02 -0700 Subject: [PATCH 3/3] Adapt the flush check interval to the 8.19 specific SettingNumeric type. --- logstash-core/lib/logstash/environment.rb | 29 +------------------ .../execution/AbstractPipelineExt.java | 7 +---- .../common/io/DeadLetterQueueWriterTest.java | 6 ---- 3 files changed, 2 insertions(+), 40 deletions(-) diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index c41901d7c1..f1bd9c499a 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -90,7 +90,6 @@ module Environment Setting::ExistingFilePath.new("api.ssl.keystore.path", nil, false).nullable, Setting::Password.new("api.ssl.keystore.password", nil, false).nullable, Setting::StringArray.new("api.ssl.supported_protocols", nil, true, %w[TLSv1 TLSv1.1 TLSv1.2 TLSv1.3]), -<<<<<<< HEAD Setting::SettingString.new("queue.type", "memory", true, ["persisted", "memory"]), Setting::Boolean.new("queue.drain", false), Setting::Bytes.new("queue.page_capacity", "64mb"), @@ -103,6 +102,7 @@ module Environment Setting::Boolean.new("dead_letter_queue.enable", false), Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"), Setting::SettingNumeric.new("dead_letter_queue.flush_interval", 5000), + Setting::SettingNumeric.new("dead_letter_queue.flush_check_interval", 1000), Setting::SettingString.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]), Setting::SettingNullableString.new("dead_letter_queue.retain.age"), # example 5d Setting::TimeValue.new("slowlog.threshold.warn", "-1"), @@ -113,33 +113,6 @@ module Environment Setting::SettingString.new("keystore.file", ::File.join(::File.join(LogStash::Environment::LOGSTASH_HOME, "config"), "logstash.keystore"), false), # will be populated on Setting::SettingNullableString.new("monitoring.cluster_uuid"), Setting::SettingString.new("pipeline.buffer.type", nil, false, ["direct", "heap"]) -======= - Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "minimal", true, ["disabled", "minimal", "full"]), - Setting::StringSetting.new("queue.type", "memory", true, ["persisted", "memory"]), - Setting::BooleanSetting.new("queue.drain", false), - Setting::BytesSetting.new("queue.page_capacity", "64mb"), - Setting::BytesSetting.new("queue.max_bytes", "1024mb"), - Setting::NumericSetting.new("queue.max_events", 0), # 0 is unlimited - Setting::NumericSetting.new("queue.checkpoint.acks", 1024), # 0 is unlimited - Setting::NumericSetting.new("queue.checkpoint.writes", 1024), # 0 is unlimited - Setting::NumericSetting.new("queue.checkpoint.interval", 1000), # remove it for #17155 - Setting::BooleanSetting.new("queue.checkpoint.retry", true), - Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)), - Setting::BooleanSetting.new("dead_letter_queue.enable", false), - Setting::BytesSetting.new("dead_letter_queue.max_bytes", "1024mb"), - Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000), - Setting::NumericSetting.new("dead_letter_queue.flush_check_interval", 1000), - Setting::StringSetting.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]), - Setting::NullableStringSetting.new("dead_letter_queue.retain.age"), # example 5d - Setting::TimeValueSetting.new("slowlog.threshold.warn", "-1"), - Setting::TimeValueSetting.new("slowlog.threshold.info", "-1"), - Setting::TimeValueSetting.new("slowlog.threshold.debug", "-1"), - Setting::TimeValueSetting.new("slowlog.threshold.trace", "-1"), - Setting::StringSetting.new("keystore.classname", "org.logstash.secret.store.backend.JavaKeyStore"), - Setting::StringSetting.new("keystore.file", ::File.join(::File.join(LogStash::Environment::LOGSTASH_HOME, "config"), "logstash.keystore"), false), # will be populated on - Setting::NullableStringSetting.new("monitoring.cluster_uuid"), - Setting::StringSetting.new("pipeline.buffer.type", "heap", true, ["direct", "heap"]) ->>>>>>> f2f0d3fde (`dead_letter_queue.flush_check_interval` new config for flushing staled segment files. (#19036)) # post_process ].each {|setting| SETTINGS.register(setting) } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index c76d3c5a2f..a6f6f9dfea 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -399,14 +399,9 @@ private DeadLetterQueueWriter createDeadLetterQueueWriterFromSettings(ThreadCont final QueueStorageType storageType = QueueStorageType.parse(getSetting(context, "dead_letter_queue.storage_policy").asJavaString()); String dlqPath = getSetting(context, "path.dead_letter_queue").asJavaString(); -<<<<<<< HEAD long dlqMaxBytes = getSetting(context, "dead_letter_queue.max_bytes").convertToInteger().getLongValue(); Duration dlqFlushInterval = Duration.ofMillis(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger().getLongValue()); -======= - long dlqMaxBytes = org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.max_bytes").convertToInteger()); - Duration dlqFlushInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger())); - Duration dlqFlushCheckInterval = Duration.ofMillis(org.jruby.RubyNumeric.num2long(getSetting(context, "dead_letter_queue.flush_check_interval").convertToInteger())); ->>>>>>> f2f0d3fde (`dead_letter_queue.flush_check_interval` new config for flushing staled segment files. (#19036)) + Duration dlqFlushCheckInterval = Duration.ofMillis(getSetting(context, "dead_letter_queue.flush_check_interval").convertToInteger().getLongValue()); if (hasSetting(context, "dead_letter_queue.retain.age") && !getSetting(context, "dead_letter_queue.retain.age").isNil()) { // convert to Duration diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java index 511a541d7e..b8710cdd59 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java @@ -514,14 +514,8 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I // fill the queue to push out the segment with the 2 previous events Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); -<<<<<<< HEAD event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479)); - try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter - .newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1)) -======= - event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500)); try (DeadLetterQueueWriter writeManager = newBuilder(dir, 10 * MB, 20 * MB) ->>>>>>> f2f0d3fde (`dead_letter_queue.flush_check_interval` new config for flushing staled segment files. (#19036)) .storageType(QueueStorageType.DROP_NEWER) .build()) {