diff --git a/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml b/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml index 7ccc1bf7aa37..294c9a2e5153 100644 --- a/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml +++ b/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml @@ -26,7 +26,11 @@ metadata: name: beam-testing-cluster spec: kafka: - version: 3.4.0 + resources: + requests: + cpu: 8 + memory: 64Gi + version: 3.6.0 replicas: 3 config: offsets.topic.replication.factor: 3 @@ -40,9 +44,13 @@ spec: volumes: - id: 0 type: persistent-claim - size: 100Gi + size: 500Gi deleteClaim: false zookeeper: + resources: + requests: + cpu: 1 + memory: 2Gi replicas: 3 storage: type: persistent-claim @@ -50,4 +58,4 @@ spec: deleteClaim: false entityOperator: topicOperator: {} - userOperator: {} + userOperator: {} \ No newline at end of file diff --git a/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml b/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml index 84df24b26a87..556bdf32b27f 100644 --- a/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml +++ b/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml @@ -18,9 +18,11 @@ # This overlay patches the base strimzi kafka cluster with a GKE TCP # internal load balancer ingress -namespace: strimzi resources: - ../../base/v0.33.2 -patchesStrategicMerge: -- listeners.yaml \ No newline at end of file +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +patches: + - path: listeners.yaml \ No newline at end of file diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java index 311ce9575c2e..772f81062a8a 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java @@ -81,7 +81,7 @@ public class BigtableResourceManager implements ResourceManager { private static final Logger LOG = LoggerFactory.getLogger(BigtableResourceManager.class); private static final String DEFAULT_CLUSTER_ZONE = "us-central1-b"; - private static final int DEFAULT_CLUSTER_NUM_NODES = 1; + private static final int DEFAULT_CLUSTER_NUM_NODES = 10; private static final StorageType DEFAULT_CLUSTER_STORAGE_TYPE = StorageType.SSD; private final String projectId; diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java index 4abcee8e6d59..f8cf2511aaeb 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java @@ -26,6 +26,8 @@ import com.google.protobuf.ByteString; import java.io.IOException; import java.io.Serializable; +import java.nio.ByteBuffer; +import java.text.ParseException; import java.time.Duration; import java.util.List; import java.util.Map; @@ -38,15 +40,16 @@ import org.apache.beam.it.common.utils.ResourceManagerUtils; import org.apache.beam.it.gcp.IOStressTestBase; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.PeriodicImpulse; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -144,11 +147,11 @@ public void teardown() { ImmutableMap.of( "medium", Configuration.fromJsonString( - "{\"rowsPerSecond\":25000,\"minutes\":40,\"pipelineTimeout\":120,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}", + "{\"rowsPerSecond\":10000,\"minutes\":40,\"pipelineTimeout\":80,\"numRecords\":1000000,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString( - "{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":200,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}", + "{\"rowsPerSecond\":50000,\"minutes\":60,\"pipelineTimeout\":120,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}", Configuration.class)); } catch (IOException e) { throw new RuntimeException(e); @@ -157,7 +160,7 @@ public void teardown() { /** Run stress test with configurations specified by TestProperties. */ @Test - public void runTest() throws IOException { + public void runTest() throws IOException, ParseException, InterruptedException { if (configuration.exportMetricsToInfluxDB) { influxDBSettings = InfluxDBSettings.builder() @@ -194,9 +197,9 @@ public void runTest() throws IOException { readInfo.jobId(), getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)); - // Assert that writeNumRecords equals or greater than readNumRecords since there might be + // Assert that readNumRecords equals or greater than writeNumRecords since there might be // duplicates when testing big amount of data - assertTrue(writeNumRecords >= readNumRecords); + assertTrue(readNumRecords >= writeNumRecords); } finally { // clean up write streaming pipeline if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId()) @@ -230,22 +233,14 @@ public void runTest() throws IOException { * dynamically over time, with options to use configurable parameters. */ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException { - // The PeriodicImpulse source will generate an element every this many millis: - int fireInterval = 1; - // Each element from PeriodicImpulse will fan out to this many elements: int startMultiplier = Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / DEFAULT_ROWS_PER_SECOND; - long stopAfterMillis = - org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis(); - long totalRows = startMultiplier * stopAfterMillis / fireInterval; List loadPeriods = getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY); - PCollection source = - writePipeline.apply( - PeriodicImpulse.create() - .stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1)) - .withInterval(org.joda.time.Duration.millis(fireInterval))); + PCollection> source = + writePipeline.apply(Read.from(new SyntheticUnboundedSource(configuration))); + if (startMultiplier > 1) { source = source @@ -257,7 +252,7 @@ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException { source .apply( "Map records to BigTable format", - ParDo.of(new MapToBigTableFormat((int) configuration.valueSizeBytes, (int) totalRows))) + ParDo.of(new MapToBigTableFormat((int) configuration.valueSizeBytes))) .apply( "Write to BigTable", BigtableIO.write() @@ -353,20 +348,18 @@ static class Configuration extends SyntheticSourceOptions { /** Maps Instant to the BigTable format record. */ private static class MapToBigTableFormat - extends DoFn>> - implements Serializable { + extends DoFn, KV>> implements Serializable { private final int valueSizeBytes; - private final int totalRows; - public MapToBigTableFormat(int valueSizeBytes, int totalRows) { + public MapToBigTableFormat(int valueSizeBytes) { this.valueSizeBytes = valueSizeBytes; - this.totalRows = totalRows; } @ProcessElement public void processElement(ProcessContext c) { - long index = Objects.requireNonNull(c.element()).getMillis() % totalRows; + ByteBuffer byteBuffer = ByteBuffer.wrap(Objects.requireNonNull(c.element()).getValue()); + int index = byteBuffer.getInt(); ByteString key = ByteString.copyFromUtf8( diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java index 505b51cec04a..4e7c0d428cba 100644 --- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java +++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java @@ -22,33 +22,34 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.io.IOException; +import java.text.ParseException; import java.time.Duration; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineOperator; import org.apache.beam.it.common.TestProperties; import org.apache.beam.it.gcp.IOStressTestBase; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.PeriodicImpulse; -import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; @@ -150,11 +151,11 @@ public void setup() { ImmutableMap.of( "medium", Configuration.fromJsonString( - "{\"rowsPerSecond\":25000,\"minutes\":30,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}", + "{\"rowsPerSecond\":10000,\"numRecords\":1000000,\"valueSizeBytes\":1000,\"minutes\":20,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString( - "{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":300,\"runner\":\"DataflowRunner\"}", + "{\"rowsPerSecond\":50000,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"minutes\":60,\"pipelineTimeout\":120,\"runner\":\"DataflowRunner\"}", Configuration.class)); } catch (IOException e) { throw new RuntimeException(e); @@ -163,7 +164,7 @@ public void setup() { /** Run stress test with configurations specified by TestProperties. */ @Test - public void testWriteAndRead() throws IOException { + public void testWriteAndRead() throws IOException, ParseException, InterruptedException { if (configuration.exportMetricsToInfluxDB) { influxDBSettings = InfluxDBSettings.builder() @@ -240,34 +241,25 @@ public void testWriteAndRead() throws IOException { * dynamically over time, with options to use configurable parameters. */ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException { - // The PeriodicImpulse source will generate an element every this many millis: - int fireInterval = 1; - // Each element from PeriodicImpulse will fan out to this many elements: int startMultiplier = Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / DEFAULT_ROWS_PER_SECOND; - long stopAfterMillis = - org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis(); - long totalRows = startMultiplier * stopAfterMillis / fireInterval; List loadPeriods = getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY); PCollection source = writePipeline - .apply( - PeriodicImpulse.create() - .stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1)) - .withInterval(org.joda.time.Duration.millis(fireInterval))) + .apply(Read.from(new SyntheticUnboundedSource(configuration))) .apply( "Extract values", MapElements.into(TypeDescriptor.of(byte[].class)) - .via(instant -> Longs.toByteArray(instant.getMillis() % totalRows))); + .via(kv -> Objects.requireNonNull(kv).getValue())); + if (startMultiplier > 1) { source = source .apply( "One input to multiple outputs", ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods))) - .apply("Reshuffle fanout", Reshuffle.viaRandomKey()) .apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME))); } source.apply( @@ -336,7 +328,7 @@ static class Configuration extends SyntheticSourceOptions { * Determines whether to use Dataflow runner v2. If set to true, it uses SDF mode for reading * from Kafka. Otherwise, Unbounded mode will be used. */ - @JsonProperty public boolean useDataflowRunnerV2 = true; + @JsonProperty public boolean useDataflowRunnerV2 = false; /** Number of workers for the pipeline. */ @JsonProperty public int numWorkers = 20;