Skip to content

Run MapReduce on hadoop 3 Group sum the value with conditions in MapReduce

chris_podorsiki edited this page Jan 1, 2018 · 2 revisions

Let's review back our question in last page --- What if i want to output a group sum on two values with each player in 2017 season only?

In normal SQL, we will do it like:

select playername, sum(kick_score), sum(kick_avg) from player
where year = 2017
group by playername

Seems easy, but in Map-reduce, how to do it? let's write down some ideas:

1 The key is still playername;
2 for map() method, for the new_value, we need to generate something like: <kick_score,key_avg>
3 the output of the map() will be like: {K:playername, V:<kick_score,key_avg>}
4 we need to let the Mapper class could recognize a new class format, which will contain two fields like: class(field1,field2).
5 So we need to re-write the writable class to make it suitable for the Mapper and Reducer.
6 For each key in reduce(), we just need to sum the kick_score and the key_avg and store them in new writable class
7 The reduce() will output the same key but new writable class

At here, we will understand, why the input for Mapper is always containing a Longwritable / Intwritable / Nullwritable... That is because hadoop provide us some re-defined writable class for us to use

The pre-defined writable class in hadoop.io is listed as below:

So let's write the new writable class:

public class playerbean implements Writable {
	//private long year_p;
	private long kick_score;
	private float kick_avg;
	
	public playerbean(){
		
	}
	
	public playerbean(long kick_score,float kick_avg){
		//this.year_p=year_p;
		this.kick_avg = kick_avg;
		this.kick_score = kick_score;
		
	}
	/*
	public long getyear(){
		return year_p;
	}
	
	public void setyear(long year_p){
		this.year_p=year_p;
	}
	*/
	public long getkick_score(){
		return kick_score;
	}
	
	public void setkick_score(long kick_score){
		this.kick_score = kick_score;
	}
	
	public float getkick_avg(){
		return kick_avg;
	}
	
	public void setkick_avg(float kick_avg){
		this.kick_avg = kick_avg;
	}
	
	public void write(DataOutput out) throws IOException {
		//out.writeLong(year_p);
		out.writeLong(kick_score);
		out.writeFloat(kick_avg);
		
	}
	
	public void readFields(DataInput in) throws IOException {
		//year_p = in.readLong();
		kick_score = in.readLong();
		kick_avg = in.readFloat();
	}
	
	public String toString(){
		return kick_score+"\t"+kick_avg;
	}
}

Then we write the map() method:

package playercount;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;


public class player_select_mapper extends Mapper<LongWritable, Text, Text, playerbean>{
	
	 public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, playerbean>.Context context)
		      throws IOException, InterruptedException {

		    /*
		     * TODO implement
		     */
			  String line = value.toString();
			  String[] lineSplit = line.split(",");
			  String name = lineSplit[0];
			  long kick_score = Long.parseLong(lineSplit[1]);
			  float kick_avg = Float.parseFloat(lineSplit[2]);
			  long year = Long.parseLong(lineSplit[23]);
			  String name_k = name;
			  //name_key.set(name_y);
			  //score_value.set(score);
			  if(year ==2017){
			  context.write(new Text(name_k),new playerbean(kick_score,kick_avg));
			  }
			  //else context.write(new Text(name_k),new playerbean(kick_score,kick_avg));
			  
		  }

}

As you could see, the new writable class is called playerbean and it contains two field: kick_score and kick_avg Also we append a if condition on the map() method to achieve the 2017 filter.

To the end, we write the reduce() method:

package playercount;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;


public class player_select_reducer extends Reducer<Text, playerbean, Text, playerbean>{
	
	public void reduce(Text key, Iterable<playerbean> values, Reducer<Text, playerbean, Text, playerbean>.Context context)
		      throws IOException, InterruptedException {
			  long sum_kick_score=0;
			  float sum_kick_avg=0;
			  for (playerbean a : values){
				  sum_kick_score += a.getkick_score();
				  sum_kick_avg += a.getkick_avg();
			  }
			  
			  
			  context.write(key,new playerbean(sum_kick_score,sum_kick_avg));
		    
		  }

}

We just sum the kick_score and kick_avg for each key and output the sum as a new writable class--playerbean.

Then we just need to write a job configuration:

public class player_select_test {
	public static void main(String[] args) throws Exception {

	    /*
	     * Validate that two arguments were passed from the command line.
	     */
		 Configuration conf = new Configuration();

		//String[] otherArgs = {"/home/cloudera/workspace/playercount/player.txt", "/home/cloudera/workspace/playercount/output2"};
	    if (args.length != 2) {
	      System.out.printf("Usage: StubDriver <input dir> <output dir>\n");
	      System.exit(-1);
	    }

	    /*
	     * Instantiate a Job object for your job's configuration. 
	     */
	    Job job = Job.getInstance(conf, "player_select_test");

	    /*
	     * Specify the jar file that contains your driver, mapper, and reducer.
	     * Hadoop will transfer this jar file to nodes in your cluster running 
	     * mapper and reducer tasks.
	     */
	    job.setJarByClass(player_select_test.class);
	    job.setMapperClass(player_select_mapper.class);
	    job.setCombinerClass(player_select_reducer.class);
	    job.setReducerClass(player_select_reducer.class);

	    
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(playerbean.class);
	    FileInputFormat.addInputPath(job, new Path(args[0]));
	    FileOutputFormat.setOutputPath(job, new Path(args[1]));

	    /*
	    for (int i = 0; i < otherArgs.length - 1; ++i) {

	        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

	       }

	       FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
	*/
	    /*
	     * Specify an easily-decipherable name for the job.
	     * This job name will appear in reports and logs.
	     */

	    /*
	     * TODO implement
	     */
	    
	    /*
	     * Start the MapReduce job and wait for it to finish.
	     * If it finishes successfully, return 0. If not, return 1.
	     */
	       System.exit(job.waitForCompletion(true) ? 0 : 1);
	    
	  }
}

Sometimes we could pre-define the input and output in the code, but as we gonna run it in hadoop, so we leave it as Commented-Out code.

Now we run it in hadoop: We also could run it in eclipse if you configured eclipse with hadoop:

We could get the result in the output directory in HDFS!