From 9a708ddb19fe1a4e843ee9eb9af13cc25b12d2b4 Mon Sep 17 00:00:00 2001 From: mudit-saxena Date: Fri, 11 Apr 2025 14:02:59 +0530 Subject: [PATCH] [MINOR] Added config flag to fetch disruption --- .../com/github/ambry/config/ReplicationConfig.java | 7 +++++++ .../ReplicationPrioritizationManager.java | 10 ++++++++++ 2 files changed, 17 insertions(+) diff --git a/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java b/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java index 774038c98b..cd71b58043 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java @@ -374,6 +374,11 @@ public class ReplicationConfig { @Default("false") public final boolean enableReplicationPrioritization; + public static final String ENABLE_DISRUPTION_SERVICE = "enable.disruption.service"; + @Config(ENABLE_DISRUPTION_SERVICE) + @Default("true") + public final boolean enableDisruptionService; + /** * The factory class the disruption service */ @@ -473,5 +478,7 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getBoolean(ENABLE_REPLICATION_PRIORITIZATION, false); disruptionServiceFactory = verifiableProperties.getString(DISRUPTION_SERVICE_FACTORY, DEFAULT_DISRUPTION_FACTORY); + enableDisruptionService = + verifiableProperties.getBoolean(ENABLE_DISRUPTION_SERVICE, true); } } diff --git a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java index 00615e3c8c..607c98337e 100644 --- a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java +++ b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java @@ -75,6 +75,7 @@ public class ReplicationPrioritizationManager implements Runnable { private final DisruptionService disruptionService; private Map> prioritizedPartitions; private final ScheduledExecutorService scheduler; + private final ReplicationConfig replicationConfig; /** * Creates a new ReplicationPrioritizationManager. * @@ -108,6 +109,7 @@ public ReplicationPrioritizationManager(ReplicationEngine replicationEngine, Clu this.disruptionService = disruptionService; this.prioritizedPartitions = new EnumMap<>(PriorityTier.class); this.scheduler = scheduler; + this.replicationConfig = replicationConfig; // Schedule periodic runs for prioritization run this.scheduler.scheduleAtFixedRate(this, 0, scheduleIntervalMinutes, TimeUnit.MINUTES); @@ -237,6 +239,14 @@ void startPrioritizationCycle() { */ private Map> fetchDisruptions(List partitionIds) { Map> disruptionsByPartition = new HashMap<>(); + + if (!replicationConfig.enableDisruptionService) { + for (PartitionId partitionId : partitionIds) { + disruptionsByPartition.computeIfAbsent(partitionId, k -> new ArrayList<>()); + } + return disruptionsByPartition; + } + try { // 3. Fetch disruptions for all partitions disruptionsByPartition =