Skip to content

Commit

Permalink
Replace Source type for BigTable and Kafka stress tests (#30999)
Browse files Browse the repository at this point in the history
* replace Periodic to Unbounded for BigTableIOST

* replace PeriodicImpulse for Kafka stress test

* refactor

* correct a number of records and records per second

* specify correct resources for kafka servers

* refactor
  • Loading branch information
akashorabek authored Apr 22, 2024
1 parent 516ac97 commit 1a5dc1c
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ metadata:
name: beam-testing-cluster
spec:
kafka:
version: 3.4.0
resources:
requests:
cpu: 8
memory: 64Gi
version: 3.6.0
replicas: 3
config:
offsets.topic.replication.factor: 3
Expand All @@ -40,14 +44,18 @@ spec:
volumes:
- id: 0
type: persistent-claim
size: 100Gi
size: 500Gi
deleteClaim: false
zookeeper:
resources:
requests:
cpu: 1
memory: 2Gi
replicas: 3
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
userOperator: {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
# This overlay patches the base strimzi kafka cluster with a GKE TCP
# internal load balancer ingress

namespace: strimzi

resources:
- ../../base/v0.33.2
patchesStrategicMerge:
- listeners.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

patches:
- path: listeners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class BigtableResourceManager implements ResourceManager {

private static final Logger LOG = LoggerFactory.getLogger(BigtableResourceManager.class);
private static final String DEFAULT_CLUSTER_ZONE = "us-central1-b";
private static final int DEFAULT_CLUSTER_NUM_NODES = 1;
private static final int DEFAULT_CLUSTER_NUM_NODES = 10;
private static final StorageType DEFAULT_CLUSTER_STORAGE_TYPE = StorageType.SSD;

private final String projectId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand All @@ -38,15 +40,16 @@
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
Expand Down Expand Up @@ -144,11 +147,11 @@ public void teardown() {
ImmutableMap.of(
"medium",
Configuration.fromJsonString(
"{\"rowsPerSecond\":25000,\"minutes\":40,\"pipelineTimeout\":120,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}",
"{\"rowsPerSecond\":10000,\"minutes\":40,\"pipelineTimeout\":80,\"numRecords\":1000000,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}",
Configuration.class),
"large",
Configuration.fromJsonString(
"{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":200,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}",
"{\"rowsPerSecond\":50000,\"minutes\":60,\"pipelineTimeout\":120,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}",
Configuration.class));
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -157,7 +160,7 @@ public void teardown() {

/** Run stress test with configurations specified by TestProperties. */
@Test
public void runTest() throws IOException {
public void runTest() throws IOException, ParseException, InterruptedException {
if (configuration.exportMetricsToInfluxDB) {
influxDBSettings =
InfluxDBSettings.builder()
Expand Down Expand Up @@ -194,9 +197,9 @@ public void runTest() throws IOException {
readInfo.jobId(),
getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME));

// Assert that writeNumRecords equals or greater than readNumRecords since there might be
// Assert that readNumRecords equals or greater than writeNumRecords since there might be
// duplicates when testing big amount of data
assertTrue(writeNumRecords >= readNumRecords);
assertTrue(readNumRecords >= writeNumRecords);
} finally {
// clean up write streaming pipeline
if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId())
Expand Down Expand Up @@ -230,22 +233,14 @@ public void runTest() throws IOException {
* dynamically over time, with options to use configurable parameters.
*/
private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
// The PeriodicImpulse source will generate an element every this many millis:
int fireInterval = 1;
// Each element from PeriodicImpulse will fan out to this many elements:
int startMultiplier =
Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / DEFAULT_ROWS_PER_SECOND;
long stopAfterMillis =
org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis();
long totalRows = startMultiplier * stopAfterMillis / fireInterval;
List<LoadPeriod> loadPeriods =
getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);

PCollection<org.joda.time.Instant> source =
writePipeline.apply(
PeriodicImpulse.create()
.stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1))
.withInterval(org.joda.time.Duration.millis(fireInterval)));
PCollection<KV<byte[], byte[]>> source =
writePipeline.apply(Read.from(new SyntheticUnboundedSource(configuration)));

if (startMultiplier > 1) {
source =
source
Expand All @@ -257,7 +252,7 @@ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
source
.apply(
"Map records to BigTable format",
ParDo.of(new MapToBigTableFormat((int) configuration.valueSizeBytes, (int) totalRows)))
ParDo.of(new MapToBigTableFormat((int) configuration.valueSizeBytes)))
.apply(
"Write to BigTable",
BigtableIO.write()
Expand Down Expand Up @@ -353,20 +348,18 @@ static class Configuration extends SyntheticSourceOptions {

/** Maps Instant to the BigTable format record. */
private static class MapToBigTableFormat
extends DoFn<org.joda.time.Instant, KV<ByteString, Iterable<Mutation>>>
implements Serializable {
extends DoFn<KV<byte[], byte[]>, KV<ByteString, Iterable<Mutation>>> implements Serializable {

private final int valueSizeBytes;
private final int totalRows;

public MapToBigTableFormat(int valueSizeBytes, int totalRows) {
public MapToBigTableFormat(int valueSizeBytes) {
this.valueSizeBytes = valueSizeBytes;
this.totalRows = totalRows;
}

@ProcessElement
public void processElement(ProcessContext c) {
long index = Objects.requireNonNull(c.element()).getMillis() % totalRows;
ByteBuffer byteBuffer = ByteBuffer.wrap(Objects.requireNonNull(c.element()).getValue());
int index = byteBuffer.getInt();

ByteString key =
ByteString.copyFromUtf8(
Expand Down
30 changes: 11 additions & 19 deletions it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,34 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -150,11 +151,11 @@ public void setup() {
ImmutableMap.of(
"medium",
Configuration.fromJsonString(
"{\"rowsPerSecond\":25000,\"minutes\":30,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}",
"{\"rowsPerSecond\":10000,\"numRecords\":1000000,\"valueSizeBytes\":1000,\"minutes\":20,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}",
Configuration.class),
"large",
Configuration.fromJsonString(
"{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":300,\"runner\":\"DataflowRunner\"}",
"{\"rowsPerSecond\":50000,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"minutes\":60,\"pipelineTimeout\":120,\"runner\":\"DataflowRunner\"}",
Configuration.class));
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -163,7 +164,7 @@ public void setup() {

/** Run stress test with configurations specified by TestProperties. */
@Test
public void testWriteAndRead() throws IOException {
public void testWriteAndRead() throws IOException, ParseException, InterruptedException {
if (configuration.exportMetricsToInfluxDB) {
influxDBSettings =
InfluxDBSettings.builder()
Expand Down Expand Up @@ -240,34 +241,25 @@ public void testWriteAndRead() throws IOException {
* dynamically over time, with options to use configurable parameters.
*/
private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
// The PeriodicImpulse source will generate an element every this many millis:
int fireInterval = 1;
// Each element from PeriodicImpulse will fan out to this many elements:
int startMultiplier =
Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / DEFAULT_ROWS_PER_SECOND;
long stopAfterMillis =
org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis();
long totalRows = startMultiplier * stopAfterMillis / fireInterval;
List<LoadPeriod> loadPeriods =
getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);

PCollection<byte[]> source =
writePipeline
.apply(
PeriodicImpulse.create()
.stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1))
.withInterval(org.joda.time.Duration.millis(fireInterval)))
.apply(Read.from(new SyntheticUnboundedSource(configuration)))
.apply(
"Extract values",
MapElements.into(TypeDescriptor.of(byte[].class))
.via(instant -> Longs.toByteArray(instant.getMillis() % totalRows)));
.via(kv -> Objects.requireNonNull(kv).getValue()));

if (startMultiplier > 1) {
source =
source
.apply(
"One input to multiple outputs",
ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
.apply("Reshuffle fanout", Reshuffle.viaRandomKey())
.apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME)));
}
source.apply(
Expand Down Expand Up @@ -336,7 +328,7 @@ static class Configuration extends SyntheticSourceOptions {
* Determines whether to use Dataflow runner v2. If set to true, it uses SDF mode for reading
* from Kafka. Otherwise, Unbounded mode will be used.
*/
@JsonProperty public boolean useDataflowRunnerV2 = true;
@JsonProperty public boolean useDataflowRunnerV2 = false;

/** Number of workers for the pipeline. */
@JsonProperty public int numWorkers = 20;
Expand Down

0 comments on commit 1a5dc1c

Please sign in to comment.