前面讲过了如何使用和配置Maven,那么接下来就使用Maven编写一个MapReduce程序,这里参照某视频教程中的程序进行编写。
首先在前面创建的datacount项目的src/main/java下创建一个类,基本配置如下:
以下例子来自于传智播客某视频教程:
使用MapReduce程序分析一段手机访问日志,获取对应的上行流量、下行流量和总流量,以下是日志片段:
关于日志的字段说明如下:
因此我们只需获取列序号为1,8和9的三列数据(第一列序号为0),由于包含多项数据,我们封装了一个DataBean对象来做为Mapper的输出和以及Reducer的输入、输出。
编写文件如下:
DataBean.java
package com.alanhou.hadoop.mr.dc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DataBean implements Writable {
private String telNo;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;
//add a construct method without params, otherwise error will show up during execution
public DataBean(){}
public DataBean(String telNo, long upPayLoad, long downPayLoad) {
this.telNo = telNo;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = this.upPayLoad + this.downPayLoad;
}
//serialize
//Notice on type & order
public void write(DataOutput out) throws IOException {
out.writeUTF(telNo);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
//deserialize
public void readFields(DataInput in) throws IOException {
this.telNo = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
@Override
public String toString() {
return this.upPayLoad + "\t" + this.downPayLoad + "\t" +this.totalPayLoad;
}
public String getTelNo() {
return telNo;
}
public void setTelNo(String telNo) {
this.telNo = telNo;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
DataCount.java
package com.alanhou.hadoop.mr.dc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DataCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataBean>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String telNo = fields[1];
long up = Long.parseLong(fields[8]);
long down = Long.parseLong(fields[9]);
DataBean bean = new DataBean(telNo, up, down);
context.write(new Text(telNo), bean);
}
}
public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
@Override
protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
throws IOException, InterruptedException {
long upSum =0;
long downSum =0;
for(DataBean bean : v2s){
upSum += bean.getUpPayLoad();
downSum += bean.getDownPayLoad();
}
DataBean bean = new DataBean("", upSum, downSum);
context.write(key, bean);
}
}
}
完成代码编写后导出jar包
在Eclipse项目上右击选择Export,然后在弹出窗口中选择JAR file:
点击Next在接下来的窗口中勾选Export all output folders for checked projects并指定导出jar包的名称和路径。点击Finish完成即可:
执行程序:
上传HTTP_20130313143750.dat文件,并启动hdfs和yarn服务。接下来上传日志文件到hdfs上并执行程序
hadoop fs -put HTTP_20130313143750.dat /data.doc hadoop jar example.jar com.alanhou.hadoop.mr.dc.DataCount /data.doc /dataout
通过hadoop fs -cat /dataout/part-r-00000可查看执行结果如下:
注:其中的WARN信息不影响程序运行,为安装文件的编译问题,如需解决,请参考本地安装Hadoop完整笔记部分。







