MapReduce Programming Hello World Job

In the Hadoop and MapReduce tutorial we will see how to create hello world job and what are the steps to creating a mapreduce program. There are following steps to creating mapreduce program.

Step1- Creating a file
J$ cat>file.txt
hi how are you
how is your job
how is your family
how is your brother
how is your sister
what is the time now
what is the strength of hadoop

Step2- loading file.txt from local file system to HDFS
J$ hadoop fs -put file.txt  file

Step3- Writing programs
  1.  DriverCode.java
  2.  MapperCode.java
  3.  ReducerCode.java

Step4- Compiling all above .java files

J$ javac -classpath $HADOOP_HOME/hadoop-core.jar *.java

Step5- Creating jar file

J$ jar cvf job.jar *.class

Step6- Running above job.jar on file (which there in HDFS)

J$ hadoop jar job.jar DriverCode file TestOutput

Lets start with actual code for these steps above.

Hello World Job -> WordCountJob

1. DriverCode (WordCount.java)
package com.doj.hadoop.driver;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

/**
 * @author Dinesh Rajput
 *
 */
public class WordCount extends Configured implements Tool {
 @Override
 public int run(String[] args) throws Exception {
   if (args.length != 2) {
       System.err.printf("Usage: %s [generic options] <input> <output>\n",
       getClass().getSimpleName());
       ToolRunner.printGenericCommandUsage(System.err);
       return -1;
    }
   JobConf conf = new JobConf(WordCount.class);
   conf.setJobName("Word Count");
   FileInputFormat.addInputPath(conf, new Path(args[0]));
   FileOutputFormat.setOutputPath(conf, new Path(args[1]));
   conf.setMapperClass(WordMapper.class);
   conf.setCombinerClass(WordReducer.class);
   conf.setReducerClass(WordReducer.class);
   conf.setOutputKeyClass(Text.class);
   conf.setOutputValueClass(IntWritable.class);
   return conf.waitForCompletion(true) ? 0 : 1;
 }
 public static void main(String[] args) throws Exception {
   int exitCode = ToolRunner.run(new WordCount(), args);
   System.exit(exitCode);
 }
}
2. MapperCode (WordMapper.java)
package com.doj.hadoop.driver;

import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

/**
 * @author Dinesh Rajput
 *
 */
public class WordMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
{
 private final static IntWritable one = new IntWritable(1);
 
 private Text word = new Text();
 
 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
  throws IOException
  {
  String line = value.toString();
  StringTokenizer tokenizer = new StringTokenizer(line);
  while (tokenizer.hasMoreTokens())
  {
   word.set(tokenizer.nextToken());
   output.collect(word, one);
   }
  }
}


3. ReducedCode (WordReducer.java)
package com.doj.hadoop.driver;

/**
 * @author Dinesh Rajput
 *
 */
import java.io.IOException;
import java.util.Iterator;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
 
public class WordReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
{
 public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
   Reporter reporter) throws IOException
  {
  int sum = 0;
  while (values.hasNext())
  {
   sum += values.next().get();
  }
  output.collect(key, new IntWritable(sum));
  }
}

No comments:

Post a Comment