Skip to content

Commit

Permalink
finished WordCount with KafkaSource and KafkaSink
Browse files Browse the repository at this point in the history
  • Loading branch information
kamir committed Sep 2, 2024
1 parent a7afeee commit dfe309a
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,8 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
topicName,
new TransformationDescriptor(formatterUdf, basicDataUnitType[Out], basicDataUnitType[String], udfLoad)
)
sink.setName(s"Write to KafkaTopic $topicName")
sink.setName(s"*#-> Write to KafkaTopic $topicName")
println(s"*#-> Write to KafkaTopic $topicName")
this.connectTo(sink, 0)

// Do the execution.
Expand All @@ -991,6 +992,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
new TransformationDescriptor(formatterUdf, basicDataUnitType[Out], basicDataUnitType[String], udfLoad)
)
sink.setName(s"Write to $url")

this.connectTo(sink, 0)

// Do the execution.
Expand Down
52 changes: 52 additions & 0 deletions wayang-applications/bin/bootstrap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

#brew install confluentinc/tap/cli
#brew install jq
#brew install git-lfs

export topic_l1_a=region_emea_counts
export topic_l1_b=region_apac_counts
export topic_l1_c=region_uswest_counts
export topic_l2_a=global_contribution
export topic_l2_b=global_averages

#confluent login
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2
confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2
confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2
confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2
confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2

confluent kafka topic create topic_l1_a --cluster lkc-m2kpj2
confluent kafka topic create topic_l1_b --cluster lkc-m2kpj2
confluent kafka topic create topic_l1_c --cluster lkc-m2kpj2
confluent kafka topic create topic_l2_a --cluster lkc-m2kpj2
confluent kafka topic create topic_l2_b --cluster lkc-m2kpj2

######################################################################################################
# https://docs.confluent.io/cloud/current/sr/schema_registry_ccloud_tutorial.html
export SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="ZZUZ3HASNNFGE2DF:EQNB0QpzDd868qlW0Nz49anodp7JjeDkoCaZelCJiUhfTX7BhuRPhNlDA/swx/Fa"
export SCHEMA_REGISTRY_URL="https://psrc-lo5k9.eu-central-1.aws.confluent.cloud"

confluent kafka topic list --cluster lkc-m2kpj2

###
# List schemas
curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO $SCHEMA_REGISTRY_URL/subjects | jq .
25 changes: 25 additions & 0 deletions wayang-applications/bin/cleanup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2
confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2
confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2
confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2
confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2

confluent kafka topic list --cluster lkc-m2kpj2
2 changes: 1 addition & 1 deletion wayang-applications/bin/run_wordcount.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
cd ..
cd ..

#mvn clean compile package install -pl :wayang-assembly -Pdistribution -DskipTests
mvn clean compile package install -pl :wayang-assembly -Pdistribution -DskipTests

cd wayang-applications
mvn compile package install -DskipTests
Expand Down
2 changes: 1 addition & 1 deletion wayang-applications/bin/run_wordcount_kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ mvn compile package install -DskipTests

cd ..

source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCountOnKafkaTopic
4 changes: 2 additions & 2 deletions wayang-applications/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-api-scala-java_2.12</artifactId>
<version>${WAYANG_VERSION}</version>
<!--scope>system</scope-->
<!--systemPath>/Users/mkaempf/GITHUB.private/kamir-incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-SNAPSHOT-incubating-dist/wayang-0.7.1-SNAPSHOT/jars/wayang-api-scala-java_2.12-0.7.1-SNAPSHOT.jar</systemPath-->
<scope>system</scope>
<systemPath>/Users/kamir/GITHUB.merge/incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1/jars/wayang-api-scala-java-0.7.1.jar</systemPath>
</dependency>
<!--dependency>
<groupId>org.apache.wayang</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.applications;

import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.java.Java;

import java.util.Arrays;
import java.util.Collection;

// Import the Logger class
import org.apache.log4j.Logger;


public class WordCountOnKafkaTopic {


// Create a logger instance
private static final Logger logger = Logger.getLogger(WordCountOnKafkaTopic.class);

// Define the lambda function for formatting the output
private static final FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf = tuple -> {
return tuple.getField0() + ": " + tuple.getField1();
};

public static void main(String[] args){

System.out.println( ">>> Apache Wayang Test #02");
System.out.println( " Process data from a Kafka topic using a 'Java Context'.");

// Default topic name
String input_topicName = "banking-tx-small-csv";
String output_topicName = "word_count_contribution___banking-tx-small-csv";

System.out.println( " Topic: " + input_topicName );

// Check if at least one argument is provided
if (args.length > 0) {
// Assuming the first argument is the topic name
input_topicName = args[0];

int i = 0;
for (String arg : args) {
String line = String.format( " %d - %s", i,arg);
System.out.println(line);
i=i+1;
}

}
else {
System.out.println( "*** Use default topic name: " + input_topicName );
}

Configuration configuration = new Configuration();
// Get a plan builder.
WayangContext wayangContext = new WayangContext(configuration)
.withPlugin(Java.basicPlugin());
// .withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount using Java Context on Kafka topic (%s)", input_topicName))
.withUdfJarOf(WordCountOnKafkaTopic.class);

// Start building the WayangPlan.
//Collection<Tuple2<String, Integer>> wordcounts_collection =
planBuilder
// Read the text file.
.readKafkaTopic(input_topicName).withName("Load data from topic")

// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")

// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")

// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")

// Execute the plan and collect the results.
//.collect();

.writeKafkaTopic(output_topicName, d -> String.format("%d, %s", d.getField1(), d.getField0()), "job_test_1",
LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load", configuration));

//System.out.println( wordcounts_collection );
System.out.println( "### Done. ***" );


}


}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public KafkaProducer<String, String> getProducer( Properties props ){
if ( props == null ) {
props = getDefaultProperties();
System.out.println(">>> Create producer from DEFAULT PROPERTIES.");
props.list( System.out );
}
else {
System.out.println(">>> Create producer from PROPERTIES: " + props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
System.out.println("### 7 ... ");
this.initConsumer( (KafkaTopicSource) this );

ConsumerRecords<String, String> records = this.getConsumer().poll(Duration.ofMillis(15000));
ConsumerRecords<String, String> records = this.getConsumer().poll(Duration.ofMillis(100));

List<String> collectedRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
Expand Down

0 comments on commit dfe309a

Please sign in to comment.