前面讲过了如何使用和配置Maven,那么接下来就使用Maven编写一个MapReduce程序,这里参照某视频教程中的程序进行编写。
首先在前面创建的datacount项目的src/main/java下创建一个类,基本配置如下:
以下例子来自于传智播客某视频教程:
使用MapReduce程序分析一段手机访问日志,获取对应的上行流量、下行流量和总流量,以下是日志片段:
关于日志的字段说明如下:
因此我们只需获取列序号为1,8和9的三列数据(第一列序号为0),由于包含多项数据,我们封装了一个DataBean对象来做为Mapper的输出和以及Reducer的输入、输出。
编写文件如下:
DataBean.java
package com.alanhou.hadoop.mr.dc; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class DataBean implements Writable { private String telNo; private long upPayLoad; private long downPayLoad; private long totalPayLoad; //add a construct method without params, otherwise error will show up during execution public DataBean(){} public DataBean(String telNo, long upPayLoad, long downPayLoad) { this.telNo = telNo; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; this.totalPayLoad = this.upPayLoad + this.downPayLoad; } //serialize //Notice on type & order public void write(DataOutput out) throws IOException { out.writeUTF(telNo); out.writeLong(upPayLoad); out.writeLong(downPayLoad); out.writeLong(totalPayLoad); } //deserialize public void readFields(DataInput in) throws IOException { this.telNo = in.readUTF(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); this.totalPayLoad = in.readLong(); } @Override public String toString() { return this.upPayLoad + "\t" + this.downPayLoad + "\t" +this.totalPayLoad; } public String getTelNo() { return telNo; } public void setTelNo(String telNo) { this.telNo = telNo; } public long getUpPayLoad() { return upPayLoad; } public void setUpPayLoad(long upPayLoad) { this.upPayLoad = upPayLoad; } public long getDownPayLoad() { return downPayLoad; } public void setDownPayLoad(long downPayLoad) { this.downPayLoad = downPayLoad; } public long getTotalPayLoad() { return totalPayLoad; } public void setTotalPayLoad(long totalPayLoad) { this.totalPayLoad = totalPayLoad; } }
DataCount.java
package com.alanhou.hadoop.mr.dc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DataCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DataCount.class); job.setMapperClass(DCMapper.class); //job.setMapOutputKeyClass(Text.class); //job.setMapOutputValueClass(DataBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(DCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String telNo = fields[1]; long up = Long.parseLong(fields[8]); long down = Long.parseLong(fields[9]); DataBean bean = new DataBean(telNo, up, down); context.write(new Text(telNo), bean); } } public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{ @Override protected void reduce(Text key, Iterable<DataBean> v2s, Context context) throws IOException, InterruptedException { long upSum =0; long downSum =0; for(DataBean bean : v2s){ upSum += bean.getUpPayLoad(); downSum += bean.getDownPayLoad(); } DataBean bean = new DataBean("", upSum, downSum); context.write(key, bean); } } }
完成代码编写后导出jar包
在Eclipse项目上右击选择Export,然后在弹出窗口中选择JAR file:
点击Next在接下来的窗口中勾选Export all output folders for checked projects并指定导出jar包的名称和路径。点击Finish完成即可:
执行程序:
上传HTTP_20130313143750.dat文件,并启动hdfs和yarn服务。接下来上传日志文件到hdfs上并执行程序
hadoop fs -put HTTP_20130313143750.dat /data.doc hadoop jar example.jar com.alanhou.hadoop.mr.dc.DataCount /data.doc /dataout
通过hadoop fs -cat /dataout/part-r-00000可查看执行结果如下:
注:其中的WARN信息不影响程序运行,为安装文件的编译问题,如需解决,请参考本地安装Hadoop完整笔记部分。