Skip to content

Commit

Permalink
WIP: Attempting to create a new perf test
Browse files Browse the repository at this point in the history
  • Loading branch information
grdryn committed Feb 13, 2023
1 parent 6bfd420 commit 78629e7
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 11 deletions.
8 changes: 4 additions & 4 deletions perf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<name>shawkins</name>
<url>https://raw.githubusercontent.com/shawkins/repo/master</url>
<snapshots>
<enabled>false</enabled>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
Expand All @@ -34,16 +34,16 @@
<dependency>
<groupId>io.openmessaging.benchmark</groupId>
<artifactId>benchmark-framework</artifactId>
<version>0.0.1</version>
<version>0.0.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<!-- introduces a bouncy castle conflict -->
<groupId>io.openmessaging.benchmark</groupId>
<artifactId>driver-pulsar</artifactId>
<groupId>io.openmessaging.benchmark</groupId>
<artifactId>driver-pulsar</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
24 changes: 17 additions & 7 deletions perf/src/main/java/org/bf2/performance/OMB.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Handles installation and running of OpenMessagingBenchmark
Expand Down Expand Up @@ -114,6 +115,19 @@ public void install(TlsConfig tlsConfig) throws IOException {
}

public void install(String base64EncodedTrustStore) throws IOException {
install(() ->
ombCluster.kubeClient().client().secrets().inNamespace(Constants.OMB_NAMESPACE).create(new SecretBuilder()
.editOrNewMetadata()
.withName("ext-listener-crt")
.withNamespace(Constants.OMB_NAMESPACE)
.endMetadata()
.addToData("listener.jks", base64EncodedTrustStore)
.build())
);

}

public void install(Runnable... runnables) {
LOGGER.info("Installing OMB in namespace {}", Constants.OMB_NAMESPACE);

pullAndHoldWorkerImageToAllNodesUsingDaemonSet();
Expand All @@ -124,13 +138,8 @@ public void install(String base64EncodedTrustStore) throws IOException {
nsAnnotations.put(Constants.ORG_BF2_KAFKA_PERFORMANCE_COLLECTPODLOG, "true");
}
ombCluster.createNamespace(Constants.OMB_NAMESPACE, nsAnnotations, Map.of());
ombCluster.kubeClient().client().secrets().inNamespace(Constants.OMB_NAMESPACE).create(new SecretBuilder()
.editOrNewMetadata()
.withName("ext-listener-crt")
.withNamespace(Constants.OMB_NAMESPACE)
.endMetadata()
.addToData("listener.jks", base64EncodedTrustStore)
.build());

Stream.of(runnables).forEachOrdered(Runnable::run);

LOGGER.info("Done installing OMB in namespace {}", Constants.OMB_NAMESPACE);
}
Expand Down Expand Up @@ -266,6 +275,7 @@ private void createWorker(String jvmOpts, String name, Node node) {
.withName("ca")
.editOrNewSecret()
.withSecretName("ext-listener-crt")
.withOptional(true)
.endSecret()
.endVolume()
.endSpec()
Expand Down
129 changes: 129 additions & 0 deletions perf/src/test/java/org/bf2/performance/ConnectionCreationRateTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package org.bf2.performance;

import io.fabric8.kubernetes.api.model.Quantity;
import io.openmessaging.benchmark.TestResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bf2.performance.framework.KubeClusterResource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvFileSource;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionCreationRateTest extends TestBase {
private static final Logger LOGGER = LogManager.getLogger(ConnectionCreationRateTest.class);

static ManagedKafkaProvisioner kafkaProvisioner;
static KubeClusterResource kafkaCluster;
static OMB omb;

List<String> workers;

@BeforeAll
void beforeAll() throws Exception {
omb = new OMB(KubeClusterResource.connectToKubeCluster(PerformanceEnvironment.OMB_KUBECONFIG));
}

@AfterAll
void afterAll() throws Exception {
omb.uninstall();
}

@AfterEach
void afterEach() throws Exception {
omb.deleteWorkers();
}

@ParameterizedTest(name = "testConnectionCreationRate: [{index}] {0}, {1}, {3}")
@CsvFileSource(resources = "/test-inputs/ConnectionCreationRateTestInput.csv", useHeadersInDisplayName = true)
void testConnectionCreationRate(int numProducers, int numConsumers, int replicationFactor, int numWorkers, String ombWorkerMem, String ombWorkerCpu, String bootstrapURL, String clientId, String clientSecret, String tokenEndpointURL, String trustStoreFileName, String trustStorePassword, TestInfo info) throws Exception {
int messageSize = 1024;
int targetRate = 100;
String instanceName = bootstrapURL.split("\\.", 2)[0];

omb.install(Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(trustStoreFileName))));
omb.setWorkerContainerMemory(Quantity.parse(ombWorkerMem));
omb.setWorkerCpu(Quantity.parse(ombWorkerCpu));
workers = omb.deployWorkers(numWorkers);

ExecutorService executorService = Executors.newFixedThreadPool(1);
AtomicInteger timeout = new AtomicInteger();
List<TestResult> testResults = new ArrayList<>();

try {
File ombDir = new File(instanceDir, instanceName);
Files.createDirectories(ombDir.toPath());

// OAuthBearer connection details
String driverCommonConfig = new StringBuilder("sasl.mechanism=OAUTHBEARER")
.append("\n")
.append("security.protocol=SASL_SSL")
.append("\n")
.append("sasl.oauthbearer.token.endpoint.url=")
.append(tokenEndpointURL)
.append("\n")
.append("ssl.truststore.location=/cert/listener.jks\n")
.append("ssl.truststore.password=password\n")
.append("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required scope=\"openid\" clientId=\"")
.append(clientId)
.append("\" ")
.append("clientSecret=\"")
.append(clientSecret)
.append("\" ;")
.append("\n")
.append("sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler")
.append("\n")
.append("bootstrap.servers=")
.append(bootstrapURL)
.append("\n")
.toString();

OMBDriver driver = new OMBDriver()
.setReplicationFactor(replicationFactor)
.setTopicConfig("")
.setCommonConfig(driverCommonConfig)
.setProducerConfig("acks=all\n")
.setConsumerConfig("auto.offset.reset=earliest\nenable.auto.commit=false\n");

OMBWorkload workload = new OMBWorkload()
.setName(String.format("Kafka Cluster: %s", instanceName))
.setTopics(1)
.setPartitionsPerTopic(1)
.setWarmupDurationMinutes(0)
.setTestDurationMinutes(1)
.setMessageSize(messageSize)
.setPayloadFile("src/test/resources/payload/payload-1Kb.data")
.setSubscriptionsPerTopic(numConsumers)
.setConsumerPerSubscription(1)
.setProducersPerTopic(numProducers)
.setProducerRate(targetRate)
.setConsumerBacklogSizeGB(0);
timeout.set(Math.max(workload.getTestDurationMinutes() + workload.getWarmupDurationMinutes(), timeout.get()) * 10);

Future<OMBWorkloadResult> resultFuture = executorService.submit(() -> {
OMBWorkloadResult result = omb.runWorkload(ombDir, driver, workers, workload);
LOGGER.info("Result stored in {}", result.getResultFile().getAbsolutePath());
return result;
});
testResults.add(resultFuture.get(timeout.get() * 5L, TimeUnit.MINUTES).getTestResult());
} finally {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}

}
}

0 comments on commit 78629e7

Please sign in to comment.