Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Attempting to create a new perf test #865

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions perf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<name>shawkins</name>
<url>https://raw.githubusercontent.com/shawkins/repo/master</url>
<snapshots>
<enabled>false</enabled>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
Expand All @@ -34,16 +34,16 @@
<dependency>
<groupId>io.openmessaging.benchmark</groupId>
<artifactId>benchmark-framework</artifactId>
<version>0.0.1</version>
<version>0.0.2-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<!-- introduces a bouncy castle conflict -->
<groupId>io.openmessaging.benchmark</groupId>
<artifactId>driver-pulsar</artifactId>
<groupId>io.openmessaging.benchmark</groupId>
<artifactId>driver-pulsar</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
24 changes: 17 additions & 7 deletions perf/src/main/java/org/bf2/performance/OMB.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Handles installation and running of OpenMessagingBenchmark
Expand Down Expand Up @@ -114,6 +115,19 @@ public void install(TlsConfig tlsConfig) throws IOException {
}

public void install(String base64EncodedTrustStore) throws IOException {
install(() ->
ombCluster.kubeClient().client().secrets().inNamespace(Constants.OMB_NAMESPACE).create(new SecretBuilder()
.editOrNewMetadata()
.withName("ext-listener-crt")
.withNamespace(Constants.OMB_NAMESPACE)
.endMetadata()
.addToData("listener.jks", base64EncodedTrustStore)
.build())
);

}

public void install(Runnable... runnables) {
LOGGER.info("Installing OMB in namespace {}", Constants.OMB_NAMESPACE);

pullAndHoldWorkerImageToAllNodesUsingDaemonSet();
Expand All @@ -124,13 +138,8 @@ public void install(String base64EncodedTrustStore) throws IOException {
nsAnnotations.put(Constants.ORG_BF2_KAFKA_PERFORMANCE_COLLECTPODLOG, "true");
}
ombCluster.createNamespace(Constants.OMB_NAMESPACE, nsAnnotations, Map.of());
ombCluster.kubeClient().client().secrets().inNamespace(Constants.OMB_NAMESPACE).create(new SecretBuilder()
.editOrNewMetadata()
.withName("ext-listener-crt")
.withNamespace(Constants.OMB_NAMESPACE)
.endMetadata()
.addToData("listener.jks", base64EncodedTrustStore)
.build());

Stream.of(runnables).forEachOrdered(Runnable::run);

LOGGER.info("Done installing OMB in namespace {}", Constants.OMB_NAMESPACE);
}
Expand Down Expand Up @@ -266,6 +275,7 @@ private void createWorker(String jvmOpts, String name, Node node) {
.withName("ca")
.editOrNewSecret()
.withSecretName("ext-listener-crt")
.withOptional(true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When targeting an instance that has a trusted cert, there won't be a need for a truststore.

.endSecret()
.endVolume()
.endSpec()
Expand Down
155 changes: 155 additions & 0 deletions perf/src/test/java/org/bf2/performance/ConnectionCreationRateTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package org.bf2.performance;

import io.fabric8.kubernetes.api.model.Quantity;
import io.openmessaging.benchmark.TestResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bf2.performance.framework.KubeClusterResource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvFileSource;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionCreationRateTest extends TestBase {
private static final Logger LOGGER = LogManager.getLogger(ConnectionCreationRateTest.class);

static ManagedKafkaProvisioner kafkaProvisioner;
static KubeClusterResource kafkaCluster;
static OMB omb;

List<String> workers;

@BeforeAll
void beforeAll() throws Exception {
omb = new OMB(KubeClusterResource.connectToKubeCluster(PerformanceEnvironment.OMB_KUBECONFIG));
}

@AfterAll
void afterAll() throws Exception {
omb.uninstall();
}

@AfterEach
void afterEach() throws Exception {
omb.deleteWorkers();
}

/*
* This test expects a CSV file at src/test/resources/test-inputs/ConnectionCreationRateTestInput.csv
* See src/test/resources/test-inputs/ExampleConnectionCreationRateTestInput.csv for examples.
*/
@ParameterizedTest(name = "testConnectionCreationRate: [{index}] {0}, {1}, {3}, {7}")
@CsvFileSource(resources = "/test-inputs/ConnectionCreationRateTestInput.csv", useHeadersInDisplayName = true)
void testConnectionCreationRate(int numProducers, int numConsumers, int replicationFactor, int numWorkers, String ombWorkerMem, String ombWorkerCpu, String bootstrapURL, String saslMechanism, String clientId, String clientSecret, String tokenEndpointURL, String trustStoreFileName, String trustStorePassword, int testDurationMinutes, TestInfo info) throws Exception {
int messageSize = 1024;
int targetRate = 100;
String instanceName = bootstrapURL.split("\\.", 2)[0];

if (trustStoreFileName != null) {
omb.install(Base64.getEncoder().encodeToString(Files.readAllBytes(Paths.get(trustStoreFileName))));
} else {
omb.install();
}

omb.setWorkerContainerMemory(Quantity.parse(ombWorkerMem));
omb.setWorkerCpu(Quantity.parse(ombWorkerCpu));
workers = omb.deployWorkers(numWorkers);

ExecutorService executorService = Executors.newFixedThreadPool(1);
AtomicInteger timeout = new AtomicInteger();
List<TestResult> testResults = new ArrayList<>();

try {
File ombDir = new File(instanceDir, instanceName);
Files.createDirectories(ombDir.toPath());

StringBuilder driverCommonConfig = new StringBuilder("sasl.mechanism=")
.append(saslMechanism.toUpperCase())
.append("\n")
.append("security.protocol=SASL_SSL")
.append("\n")
.append("sasl.oauthbearer.token.endpoint.url=")
.append(tokenEndpointURL)
.append("\n")
.append("bootstrap.servers=")
.append(bootstrapURL)
.append("\n");

LOGGER.info(trustStoreFileName);
LOGGER.info(trustStorePassword);
if (trustStoreFileName != null && trustStorePassword != null) {
driverCommonConfig.append("ssl.truststore.location=/cert/listener.jks\n")
.append("ssl.truststore.password=")
.append(trustStorePassword)
.append("\n");
}

if ("OAUTHBEARER".equals(saslMechanism.toUpperCase())) {
driverCommonConfig.append("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required scope=\"openid\" clientId=\"")
.append(clientId)
.append("\" ")
.append("clientSecret=\"")
.append(clientSecret)
.append("\" ;")
.append("\n")
.append("sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler")
.append("\n");
} else {
driverCommonConfig.append("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"")
.append(clientId)
.append("\" ")
.append("password=\"")
.append(clientSecret)
.append("\" ;")
.append("\n");
}

OMBDriver driver = new OMBDriver()
.setReplicationFactor(replicationFactor)
.setTopicConfig("")
.setCommonConfig(driverCommonConfig.toString())
.setProducerConfig("acks=all\n")
.setConsumerConfig("auto.offset.reset=earliest\nenable.auto.commit=false\n");

OMBWorkload workload = new OMBWorkload()
.setName(String.format("Kafka Cluster: %s", instanceName))
.setTopics(1)
.setPartitionsPerTopic(1)
.setWarmupDurationMinutes(0)
.setTestDurationMinutes(Math.max(testDurationMinutes, 1))
.setMessageSize(messageSize)
.setPayloadFile("src/test/resources/payload/payload-1Kb.data")
.setSubscriptionsPerTopic(numConsumers)
.setConsumerPerSubscription(1)
.setProducersPerTopic(numProducers)
.setProducerRate(targetRate)
.setConsumerBacklogSizeGB(0);
timeout.set(Math.max(workload.getTestDurationMinutes() + workload.getWarmupDurationMinutes(), timeout.get()) * 10);

Future<OMBWorkloadResult> resultFuture = executorService.submit(() -> {
OMBWorkloadResult result = omb.runWorkload(ombDir, driver, workers, workload);
LOGGER.info("Result stored in {}", result.getResultFile().getAbsolutePath());
return result;
});
testResults.add(resultFuture.get(timeout.get() * 5L, TimeUnit.MINUTES).getTestResult());
} finally {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Producers, Consumers, Replication Factor, OMB Workers, OMB Worker Memory, OMB Worker CPU, Bootstrap URL, Sasl Mechanism, Client ID, Client Secret, Token Endpoint URL, Truststore file, Truststore Password, Test Duration in minutes
1, 10, 1, 2, 3Gi, 2000m, REPLACE_WITH_BOOTSTRAP_URL, OAUTHBEARER, REPLACE_WITH_CLIENT_ID, REPLACE_WITH_CLIENT_SECRET, REPLACE_WITH_TOKEN_ENDPOINT_URL,,, 1
1, 10, 1, 2, 3Gi, 2000m, REPLACE_WITH_BOOTSTRAP_URL, OAUTHBEARER, REPLACE_WITH_CLIENT_ID, REPLACE_WITH_CLIENT_SECRET, REPLACE_WITH_TOKEN_ENDPOINT_URL, OPTIONALLY_REPLACE_WITH_ABSOLUTE_PATH_TO_LOCAL_TRUSTSTORE_FILE, OPTIONALLY_REPLACE_WITH_TRUSTSTORE_PASSWORD, 1