Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, the interval in milliseconds that the DLQ scheduler checks for stale segments
# to be flushed. A smaller value ensures faster segment rotation at the cost of CPU with more frequent scheduler runs.
# A larger value reduces scheduler overhead but may delay segment sealing.
# Minimum value is 1000 and cannot be greater than dead_letter_queue.flush_interval.
# Default is 1000.
#
# dead_letter_queue.flush_check_interval: 1000

# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.
Expand Down
15 changes: 14 additions & 1 deletion docs/static/dead-letter-queues.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,20 @@ This length of time can be set using the `dead_letter_queue.flush_interval` sett
dead_letter_queue.flush_interval: 5000
-------------------------------------------------------------------------------

NOTE: You may not use the same `dead_letter_queue` path for two different
Stale segments files are periodically checked if they need to be flushed.
This period is controlled by the `dead_letter_queue.flush_check_interval` setting.
This setting is in milliseconds, and defaults to 1000ms. A smaller value ensures faster
segment rotation when infrequent writes occur, at the cost of CPU consumption with more
frequent segment checks execution. A larger value reduces checks overhead but delays segment sealing,
for the worst case it will be `dead_letter_queue.flush_interval` + `dead_letter_queue.flush_check_interval`.
This value cannot be set to lower than 1000ms.

[source,yaml]
-------------------------------------------------------------------------------
dead_letter_queue.flush_check_interval: 1000
-------------------------------------------------------------------------------

NOTE: You cannot use the same `dead_letter_queue` path for two different
Logstash instances.

[[file-rotation]]
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ module Environment
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::SettingNumeric.new("dead_letter_queue.flush_interval", 5000),
Setting::SettingNumeric.new("dead_letter_queue.flush_check_interval", 1000),
Setting::SettingString.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]),
Setting::SettingNullableString.new("dead_letter_queue.retain.age"), # example 5d
Setting::TimeValue.new("slowlog.threshold.warn", "-1"),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def self.included(base)
"config.string",
"dead_letter_queue.enable",
"dead_letter_queue.flush_interval",
"dead_letter_queue.flush_check_interval",
"dead_letter_queue.max_bytes",
"dead_letter_queue.storage_policy",
"dead_letter_queue.retain.age",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,16 @@ private DeadLetterQueueFactory() {
* @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written
* that would make the size of this dlq greater than this value
* @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent.
* @param flushCheckInterval The interval between scheduler checks for stale segments.
* @param storageType overwriting type in case of queue full: drop_older or drop_newer.
* @return write manager for the specific id's dead-letter-queue context
*/
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType) {
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType));
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType) {
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType));
}

/**
* Like {@link #getWriter(String, String, long, Duration, QueueStorageType)} but also setting the age duration
* Like {@link #getWriter(String, String, long, Duration, Duration, QueueStorageType)} but also setting the age duration
* of the segments.
*
* @param id The identifier context for this dlq manager
Expand All @@ -93,23 +94,25 @@ public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long ma
* @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written
* that would make the size of this dlq greater than this value
* @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent.
* @param flushCheckInterval The interval between scheduler checks for stale segments.
* @param storageType overwriting type in case of queue full: drop_older or drop_newer.
* @param age the period that DLQ events should be considered as valid, before automatic removal.
* @return write manager for the specific id's dead-letter-queue context
* */
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType, Duration age) {
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType, age));
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, Duration flushCheckInterval, QueueStorageType storageType, Duration age) {
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, flushCheckInterval, storageType, age));
}

public static DeadLetterQueueWriter release(String id) {
return REGISTRY.remove(id);
}

private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize,
final Duration flushInterval, final QueueStorageType storageType) {
final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType) {
try {
return DeadLetterQueueWriter
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval)
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval)
.pipelineId(id)
.storageType(storageType)
.build();
} catch (IOException e) {
Expand All @@ -119,11 +122,12 @@ private static DeadLetterQueueWriter newWriter(final String id, final String dlq
}

private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize,
final Duration flushInterval, final QueueStorageType storageType,
final Duration flushInterval, final Duration flushCheckInterval, final QueueStorageType storageType,
final Duration age) {
try {
return DeadLetterQueueWriter
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval)
.newBuilder(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, flushCheckInterval)
.pipelineId(id)
.storageType(storageType)
.retentionTime(age)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public String toString() {
}
}

static final Duration MIN_FLUSH_PERIOD = Duration.ofMillis(1000);
static final Duration MIN_FLUSH_CHECK_INTERVAL = Duration.ofMillis(1000);

@VisibleForTesting
static final String SEGMENT_FILE_PATTERN = "%d.log";
private static final Logger logger = LogManager.getLogger(DeadLetterQueueWriter.class);
Expand Down Expand Up @@ -174,21 +177,25 @@ interface SchedulerService {
private static class FixedRateScheduler implements SchedulerService {

private final ScheduledExecutorService scheduledExecutor;
private final Duration flushCheckInterval;

FixedRateScheduler() {
FixedRateScheduler(final Duration flushCheckInterval, final String pipelineId) {
//Set the name with pipeline ID for better visibility
final String threadName = pipelineId != null ? "dlq-flush-check[" + pipelineId + "]" : "dlq-flush-check";

this.flushCheckInterval = flushCheckInterval;
scheduledExecutor = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
//Allow this thread to die when the JVM dies
t.setDaemon(true);
//Set the name
t.setName("dlq-flush-check");
t.setName(threadName);
return t;
});
}

@Override
public void repeatedAction(Runnable action) {
scheduledExecutor.scheduleAtFixedRate(action, 1L, 1L, TimeUnit.SECONDS);
scheduledExecutor.scheduleAtFixedRate(action, flushCheckInterval.toMillis(), flushCheckInterval.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -215,21 +222,24 @@ public static final class Builder {
private final long maxSegmentSize;
private final long maxQueueSize;
private final Duration flushInterval;
private final Duration flushCheckInterval;
private boolean startScheduledFlusher;
private QueueStorageType storageType = QueueStorageType.DROP_NEWER;
private Duration retentionTime = null;
private Clock clock = Clock.systemDefaultZone();
private SchedulerService customSchedulerService = null;
private String pipelineId;

private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval) {
this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, true);
private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval) {
this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval, true);
}

private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval, boolean startScheduledFlusher) {
private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, final Duration flushInterval, final Duration flushCheckInterval, boolean startScheduledFlusher) {
this.queuePath = queuePath;
this.maxSegmentSize = maxSegmentSize;
this.maxQueueSize = maxQueueSize;
this.flushInterval = flushInterval;
this.flushCheckInterval = flushCheckInterval;
this.startScheduledFlusher = startScheduledFlusher;
}

Expand All @@ -243,6 +253,11 @@ public Builder retentionTime(Duration retentionTime) {
return this;
}

public Builder pipelineId(final String pipelineId) {
this.pipelineId = pipelineId;
return this;
}

@VisibleForTesting
Builder clock(Clock clock) {
this.clock = clock;
Expand All @@ -259,29 +274,64 @@ public DeadLetterQueueWriter build() throws IOException {
if (customSchedulerService != null && startScheduledFlusher) {
throw new IllegalArgumentException("Both default scheduler and custom scheduler were defined, ");
}

final Duration normalizedFlushInterval = normalizeFlushInterval(flushInterval);

SchedulerService schedulerService;
if (customSchedulerService != null) {
schedulerService = customSchedulerService;
} else {
if (startScheduledFlusher) {
schedulerService = new FixedRateScheduler();
final Duration normalizedFlushCheckInterval = normalizeFlushCheckInterval(flushCheckInterval, normalizedFlushInterval);
schedulerService = new FixedRateScheduler(normalizedFlushCheckInterval, pipelineId);
} else {
schedulerService = new NoopScheduler();
}
}

return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, flushInterval, storageType, retentionTime, clock, schedulerService);
return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, normalizedFlushInterval, storageType, retentionTime, clock, schedulerService);
}

@VisibleForTesting
Duration normalizeFlushInterval(final Duration flushInterval) {
if (!startScheduledFlusher) return flushInterval;

Duration effectiveFlushInterval = flushInterval;
if (flushInterval.compareTo(MIN_FLUSH_PERIOD) < 0) {
logger.warn("dead_letter_queue.flush_interval ({} ms) is below the minimum of {} ms; using {} ms",
flushInterval.toMillis(), MIN_FLUSH_PERIOD.toMillis(), MIN_FLUSH_PERIOD.toMillis());
effectiveFlushInterval = MIN_FLUSH_PERIOD;
}
return effectiveFlushInterval;
}

@VisibleForTesting
Duration normalizeFlushCheckInterval(final Duration flushCheckInterval, final Duration effectiveFlushInterval) {
Duration effectiveFlushCheckInterval = flushCheckInterval;
// can't exceed flush interval
if (effectiveFlushCheckInterval.compareTo(effectiveFlushInterval) > 0) {
logger.warn("dead_letter_queue.flush_check_interval ({} ms) cannot be greater than dead_letter_queue.flush_interval ({} ms); using {} ms",
effectiveFlushCheckInterval.toMillis(), effectiveFlushInterval.toMillis(), effectiveFlushInterval.toMillis());
effectiveFlushCheckInterval = effectiveFlushInterval;
}
// can't be less than 1s
if (effectiveFlushCheckInterval.compareTo(MIN_FLUSH_CHECK_INTERVAL) < 0) {
logger.warn("dead_letter_queue.flush_check_interval ({} ms) is below the minimum of {} ms; using {} ms for the flush check interval",
effectiveFlushCheckInterval.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis(), MIN_FLUSH_CHECK_INTERVAL.toMillis());
effectiveFlushCheckInterval = MIN_FLUSH_CHECK_INTERVAL;
}
return effectiveFlushCheckInterval;
}
}

public static Builder newBuilder(final Path queuePath, final long maxSegmentSize, final long maxQueueSize,
final Duration flushInterval) {
return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval);
final Duration flushInterval, final Duration flushCheckInterval) {
return new Builder(queuePath, maxSegmentSize, maxQueueSize, flushInterval, flushCheckInterval);
}

@VisibleForTesting
static Builder newBuilderWithoutFlusher(final Path queuePath, final long maxSegmentSize, final long maxQueueSize) {
return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, false);
return new Builder(queuePath, maxSegmentSize, maxQueueSize, Duration.ZERO, Duration.ZERO, false);
}

private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public boolean hasWritten(){
return lastWrite != null;
}

public boolean isStale(Duration flushPeriod){
public boolean isStale(final Duration flushPeriod) {
return hasWritten() && Instant.now().minus(flushPeriod).isAfter(lastWrite);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,16 @@ private DeadLetterQueueWriter createDeadLetterQueueWriterFromSettings(ThreadCont
String dlqPath = getSetting(context, "path.dead_letter_queue").asJavaString();
long dlqMaxBytes = getSetting(context, "dead_letter_queue.max_bytes").convertToInteger().getLongValue();
Duration dlqFlushInterval = Duration.ofMillis(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger().getLongValue());
Duration dlqFlushCheckInterval = Duration.ofMillis(getSetting(context, "dead_letter_queue.flush_check_interval").convertToInteger().getLongValue());

if (hasSetting(context, "dead_letter_queue.retain.age") && !getSetting(context, "dead_letter_queue.retain.age").isNil()) {
// convert to Duration
final Duration age = parseToDuration(getSetting(context, "dead_letter_queue.retain.age").convertToString().toString());
return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes,
dlqFlushInterval, storageType, age);
dlqFlushInterval, dlqFlushCheckInterval, storageType, age);
}

return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, storageType);
return DeadLetterQueueFactory.getWriter(pipelineId.asJavaString(), dlqPath, dlqMaxBytes, dlqFlushInterval, dlqFlushCheckInterval, storageType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private void writeAnEventIntoDLQ(Path dlqPath, String pluginId, String pluginTyp
RubyString id = RubyString.newString(RubyUtil.RUBY, pluginId);
RubyString classConfigName = RubyString.newString(RubyUtil.RUBY, pluginType);

final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), QueueStorageType.DROP_NEWER);
final DeadLetterQueueWriter javaDlqWriter = DeadLetterQueueFactory.getWriter(dlqName, dlqPath.toString(), 1024 * 1024, Duration.ofHours(1), Duration.ofSeconds(1), QueueStorageType.DROP_NEWER);
IRubyObject dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter);

final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt(
Expand Down
Loading
Loading