-
Notifications
You must be signed in to change notification settings - Fork 0
Run MapReduce on hadoop 3 Group sum the value with conditions in MapReduce
Let's review back our question in last page --- What if i want to output a group sum on two values with each player in 2017 season only?
In normal SQL, we will do it like:
select playername, sum(kick_score), sum(kick_avg) from player
where year = 2017
group by playername
Seems easy, but in Map-reduce, how to do it? let's write down some ideas:
1 The key is still playername;
2 for map() method, for the new_value, we need to generate something like: <kick_score,key_avg>
3 the output of the map() will be like: {K:playername, V:<kick_score,key_avg>}
4 we need to let the Mapper class could recognize a new class format, which will contain two fields like:
class(field1,field2).
5 So we need to re-write the writable class to make it suitable for the Mapper and Reducer.
6 For each key in reduce(), we just need to sum the kick_score and the key_avg and store them in new writable class
7 The reduce() will output the same key but new writable class
At here, we will understand, why the input for Mapper is always containing a Longwritable / Intwritable / Nullwritable... That is because hadoop provide us some re-defined writable class for us to use
The pre-defined writable class in hadoop.io is listed as below:
So let's write the new writable class:
public class playerbean implements Writable {
//private long year_p;
private long kick_score;
private float kick_avg;
public playerbean(){
}
public playerbean(long kick_score,float kick_avg){
//this.year_p=year_p;
this.kick_avg = kick_avg;
this.kick_score = kick_score;
}
/*
public long getyear(){
return year_p;
}
public void setyear(long year_p){
this.year_p=year_p;
}
*/
public long getkick_score(){
return kick_score;
}
public void setkick_score(long kick_score){
this.kick_score = kick_score;
}
public float getkick_avg(){
return kick_avg;
}
public void setkick_avg(float kick_avg){
this.kick_avg = kick_avg;
}
public void write(DataOutput out) throws IOException {
//out.writeLong(year_p);
out.writeLong(kick_score);
out.writeFloat(kick_avg);
}
public void readFields(DataInput in) throws IOException {
//year_p = in.readLong();
kick_score = in.readLong();
kick_avg = in.readFloat();
}
public String toString(){
return kick_score+"\t"+kick_avg;
}
}
Then we write the map() method:
package playercount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class player_select_mapper extends Mapper<LongWritable, Text, Text, playerbean>{
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, playerbean>.Context context)
throws IOException, InterruptedException {
/*
* TODO implement
*/
String line = value.toString();
String[] lineSplit = line.split(",");
String name = lineSplit[0];
long kick_score = Long.parseLong(lineSplit[1]);
float kick_avg = Float.parseFloat(lineSplit[2]);
long year = Long.parseLong(lineSplit[23]);
String name_k = name;
//name_key.set(name_y);
//score_value.set(score);
if(year ==2017){
context.write(new Text(name_k),new playerbean(kick_score,kick_avg));
}
//else context.write(new Text(name_k),new playerbean(kick_score,kick_avg));
}
}
As you could see, the new writable class is called playerbean and it contains two field: kick_score and kick_avg Also we append a if condition on the map() method to achieve the 2017 filter.
To the end, we write the reduce() method:
package playercount;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class player_select_reducer extends Reducer<Text, playerbean, Text, playerbean>{
public void reduce(Text key, Iterable<playerbean> values, Reducer<Text, playerbean, Text, playerbean>.Context context)
throws IOException, InterruptedException {
long sum_kick_score=0;
float sum_kick_avg=0;
for (playerbean a : values){
sum_kick_score += a.getkick_score();
sum_kick_avg += a.getkick_avg();
}
context.write(key,new playerbean(sum_kick_score,sum_kick_avg));
}
}
We just sum the kick_score and kick_avg for each key and output the sum as a new writable class--playerbean.
Then we just need to write a job configuration:
public class player_select_test {
public static void main(String[] args) throws Exception {
/*
* Validate that two arguments were passed from the command line.
*/
Configuration conf = new Configuration();
//String[] otherArgs = {"/home/cloudera/workspace/playercount/player.txt", "/home/cloudera/workspace/playercount/output2"};
if (args.length != 2) {
System.out.printf("Usage: StubDriver <input dir> <output dir>\n");
System.exit(-1);
}
/*
* Instantiate a Job object for your job's configuration.
*/
Job job = Job.getInstance(conf, "player_select_test");
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(player_select_test.class);
job.setMapperClass(player_select_mapper.class);
job.setCombinerClass(player_select_reducer.class);
job.setReducerClass(player_select_reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(playerbean.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
*/
/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
/*
* TODO implement
*/
/*
* Start the MapReduce job and wait for it to finish.
* If it finishes successfully, return 0. If not, return 1.
*/
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sometimes we could pre-define the input and output in the code, but as we gonna run it in hadoop, so we leave it as Commented-Out code.
Now we run it in hadoop: We also could run it in eclipse if you configured eclipse with hadoop:
We could get the result in the output directory in HDFS!