diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/Config.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/Config.java index d2fafafa3..757966e7a 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/Config.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/Config.java @@ -16,6 +16,8 @@ public class Config { public short replicationFactor; + public boolean deleteTopicsOnClose = false; + public String topicConfig; public String commonConfig; diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index d506b5b37..dfa6ecef1 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -28,20 +28,27 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaBenchmarkDriver implements BenchmarkDriver { @@ -52,6 +59,7 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { private List producers = Collections.synchronizedList(new ArrayList<>()); private List consumers = Collections.synchronizedList(new ArrayList<>()); + private final Set createdTopics = Collections.synchronizedSet(new HashSet<>()); private Properties topicProperties; private Properties producerProperties; @@ -111,7 +119,9 @@ public CompletableFuture createTopics(List topicInfos) { Map topicConfigs = new HashMap<>((Map) topicProperties); KafkaTopicCreator topicCreator = new KafkaTopicCreator(admin, topicConfigs, config.replicationFactor); - return topicCreator.create(topicInfos); + return topicCreator + .create(topicInfos) + .thenRun(() -> topicInfos.forEach(topicInfo -> createdTopics.add(topicInfo.getTopic()))); } @Override @@ -158,9 +168,40 @@ public void close() throws Exception { for (BenchmarkConsumer consumer : consumers) { consumer.close(); } + if (config != null && config.deleteTopicsOnClose) { + deleteCreatedTopics(); + } admin.close(); } + private void deleteCreatedTopics() { + if (createdTopics.isEmpty()) { + return; + } + + final Set topicsToDelete; + synchronized (createdTopics) { + topicsToDelete = new HashSet<>(createdTopics); + } + + try { + DeleteTopicsResult deleteTopicsResult = admin.deleteTopics(topicsToDelete); + deleteTopicsResult.all().get(); + log.info("Deleted {} benchmark topics", topicsToDelete.size()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while deleting benchmark topics {}", topicsToDelete, e); + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + log.warn("Some benchmark topics were already deleted: {}", topicsToDelete); + } else { + log.warn("Failed deleting benchmark topics {}", topicsToDelete, e); + } + } catch (Exception e) { + log.warn("Failed deleting benchmark topics {}", topicsToDelete, e); + } + } + private static String applyZoneId(String clientId, String zoneId) { return clientId.replace(ZONE_ID_TEMPLATE, zoneId); } @@ -168,4 +209,6 @@ private static String applyZoneId(String clientId, String zoneId) { private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private static final Logger log = LoggerFactory.getLogger(KafkaBenchmarkDriver.class); }