Hadoop中最重要的两个部分就要属HDFS和MapReduce了,本文我们一起来探讨一下如何来实现MapReduce的入门级程序Word Count。
测试数据
Hello Tom
Hello Jerry
Hello Kitty
Hello World
Hello Tom
如何编写MapReduce Word Count代码?
本例中我们手动写入了输入输出文件路径(setInputPaths, setOutputPath)
编写main方法
package com.alanhou.hadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static void main(String[] args) throws Exception { //抽象出一个job对象 Job job = Job.getInstance(new Configuration()); //传递main方法所在的类 !important job.setJarByClass(WordCount.class); //设置我们自定义的mapper及相关属性 job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //指令输入文件 FileInputFormat.setInputPaths(job, new Path("/words.txt")); //设置我们自定义的reducer及相关属性 job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //指定输出路径 FileOutputFormat.setOutputPath(job, new Path("/wcout")); //传入true执行过程中打印进度和详情 job.waitForCompletion(true); } }
继承Mapper类
package com.alanhou.hadoop.mr; import java.io.IOException; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Mapper; public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收数据V1 String line = value.toString(); //切分数据 String[] words = line.split(" "); //循环 for(String w:words){ //出现一次,计作一个1 context.write(new Text(w), new LongWritable(1)); } } }
继承Reducer类
package com.alanhou.hadoop.mr; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //接收数据 //定义一个计数器 long counter = 0; //循环v2s for(LongWritable i : v2s){ counter += i.get(); } //输出 context.write(key, new LongWritable(counter)); } }
如何导出jar包?
1.在project名称上右击在弹出窗口中点击Export,然后选择Java下的JAR file下一步
2.在JAR File Specification窗口中选择一个jar包导出路径和文件名,点击Next
3.紧接中进入JAR Packaging Options窗口,暂不进行任何操作点击Next进入下一步,在下面的JAR Manifest Specification页面Main class部分选择main方法所在类,然后点击Finish完成即可
将jar包上传到Linux服务器上,然后将测试数据上传到hdfs上(hadoop fs -put xx /words.txt)。由于我们在程序中写入了输入文件和输出路径,并且在导出jar包时指定了main方法所在类,仅需输入hadoop jar wordcount.jar等待程序执行即可。
执行过程中会显示如下进度
16/02/03 07:47:10 INFO mapreduce.Job: map 0% reduce 0% 16/02/03 07:47:18 INFO mapreduce.Job: map 100% reduce 0% 16/02/03 07:47:28 INFO mapreduce.Job: map 100% reduce 100% 16/02/03 07:47:29 INFO mapreduce.Job: Job job_1454500331908_0001 completed successfully 16/02/03 07:47:30 INFO mapreduce.Job: Counters: 49 ... ... BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=56 File Output Format Counters Bytes Written=38
输出结果
Hello 5 Jerry 1 Kitty 1 Tom 2 World 1