Hadoop中最重要的两个部分就要属HDFS和MapReduce了,本文我们一起来探讨一下如何来实现MapReduce的入门级程序Word Count。
测试数据
Hello Tom
Hello Jerry
Hello Kitty
Hello World
Hello Tom
如何编写MapReduce Word Count代码?
本例中我们手动写入了输入输出文件路径(setInputPaths, setOutputPath)
编写main方法
package com.alanhou.hadoop.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
//抽象出一个job对象
Job job = Job.getInstance(new Configuration());
//传递main方法所在的类 !important
job.setJarByClass(WordCount.class);
//设置我们自定义的mapper及相关属性
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指令输入文件
FileInputFormat.setInputPaths(job, new Path("/words.txt"));
//设置我们自定义的reducer及相关属性
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定输出路径
FileOutputFormat.setOutputPath(job, new Path("/wcout"));
//传入true执行过程中打印进度和详情
job.waitForCompletion(true);
}
}
继承Mapper类
package com.alanhou.hadoop.mr;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 接收数据V1
String line = value.toString();
//切分数据
String[] words = line.split(" ");
//循环
for(String w:words){
//出现一次,计作一个1
context.write(new Text(w), new LongWritable(1));
}
}
}
继承Reducer类
package com.alanhou.hadoop.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> v2s,
Context context) throws IOException, InterruptedException {
//接收数据
//定义一个计数器
long counter = 0;
//循环v2s
for(LongWritable i : v2s){
counter += i.get();
}
//输出
context.write(key, new LongWritable(counter));
}
}
如何导出jar包?
1.在project名称上右击在弹出窗口中点击Export,然后选择Java下的JAR file下一步
2.在JAR File Specification窗口中选择一个jar包导出路径和文件名,点击Next
3.紧接中进入JAR Packaging Options窗口,暂不进行任何操作点击Next进入下一步,在下面的JAR Manifest Specification页面Main class部分选择main方法所在类,然后点击Finish完成即可
将jar包上传到Linux服务器上,然后将测试数据上传到hdfs上(hadoop fs -put xx /words.txt)。由于我们在程序中写入了输入文件和输出路径,并且在导出jar包时指定了main方法所在类,仅需输入hadoop jar wordcount.jar等待程序执行即可。
执行过程中会显示如下进度
16/02/03 07:47:10 INFO mapreduce.Job: map 0% reduce 0% 16/02/03 07:47:18 INFO mapreduce.Job: map 100% reduce 0% 16/02/03 07:47:28 INFO mapreduce.Job: map 100% reduce 100% 16/02/03 07:47:29 INFO mapreduce.Job: Job job_1454500331908_0001 completed successfully 16/02/03 07:47:30 INFO mapreduce.Job: Counters: 49 ... ... BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=56 File Output Format Counters Bytes Written=38
输出结果
Hello 5 Jerry 1 Kitty 1 Tom 2 World 1