Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
public class Config {
public short replicationFactor;

public boolean deleteTopicsOnClose = false;

public String topicConfig;

public String commonConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,27 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.Set;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBenchmarkDriver implements BenchmarkDriver {

Expand All @@ -52,6 +59,7 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver {

private List<BenchmarkProducer> producers = Collections.synchronizedList(new ArrayList<>());
private List<BenchmarkConsumer> consumers = Collections.synchronizedList(new ArrayList<>());
private final Set<String> createdTopics = Collections.synchronizedSet(new HashSet<>());

private Properties topicProperties;
private Properties producerProperties;
Expand Down Expand Up @@ -111,7 +119,9 @@ public CompletableFuture<Void> createTopics(List<TopicInfo> topicInfos) {
Map<String, String> topicConfigs = new HashMap<>((Map) topicProperties);
KafkaTopicCreator topicCreator =
new KafkaTopicCreator(admin, topicConfigs, config.replicationFactor);
return topicCreator.create(topicInfos);
return topicCreator
.create(topicInfos)
.thenRun(() -> topicInfos.forEach(topicInfo -> createdTopics.add(topicInfo.getTopic())));
}

@Override
Expand Down Expand Up @@ -158,14 +168,47 @@ public void close() throws Exception {
for (BenchmarkConsumer consumer : consumers) {
consumer.close();
}
if (config != null && config.deleteTopicsOnClose) {
deleteCreatedTopics();
}
admin.close();
}

private void deleteCreatedTopics() {
if (createdTopics.isEmpty()) {
return;
}

final Set<String> topicsToDelete;
synchronized (createdTopics) {
topicsToDelete = new HashSet<>(createdTopics);
}

try {
DeleteTopicsResult deleteTopicsResult = admin.deleteTopics(topicsToDelete);
deleteTopicsResult.all().get();
log.info("Deleted {} benchmark topics", topicsToDelete.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while deleting benchmark topics {}", topicsToDelete, e);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
log.warn("Some benchmark topics were already deleted: {}", topicsToDelete);
} else {
log.warn("Failed deleting benchmark topics {}", topicsToDelete, e);
}
} catch (Exception e) {
log.warn("Failed deleting benchmark topics {}", topicsToDelete, e);
}
}

private static String applyZoneId(String clientId, String zoneId) {
return clientId.replace(ZONE_ID_TEMPLATE, zoneId);
}

private static final ObjectMapper mapper =
new ObjectMapper(new YAMLFactory())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private static final Logger log = LoggerFactory.getLogger(KafkaBenchmarkDriver.class);
}
Loading