Alan Hou的个人博客

【大数据基础】使用Maven编写第一个MapReduce程序

前面讲过了如何使用和配置Maven,那么接下来就使用Maven编写一个MapReduce程序,这里参照某视频教程中的程序进行编写。

首先在前面创建的datacount项目的src/main/java下创建一个类,基本配置如下:

以下例子来自于传智播客某视频教程:

使用MapReduce程序分析一段手机访问日志,获取对应的上行流量、下行流量和总流量,以下是日志片段:

关于日志的字段说明如下:

因此我们只需获取列序号为1,8和9的三列数据(第一列序号为0),由于包含多项数据,我们封装了一个DataBean对象来做为Mapper的输出和以及Reducer的输入、输出。

编写文件如下:

DataBean.java

package com.alanhou.hadoop.mr.dc;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class DataBean implements Writable {
	private String telNo;
	private long upPayLoad;
	private long downPayLoad;
	private long totalPayLoad;
	
	//add a construct method without params, otherwise error will show up during execution
	public DataBean(){}
	
	public DataBean(String telNo, long upPayLoad, long downPayLoad) {
		this.telNo = telNo;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = this.upPayLoad + this.downPayLoad;
	}
	//serialize
	//Notice on type & order
	public void write(DataOutput out) throws IOException {
		out.writeUTF(telNo);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
		out.writeLong(totalPayLoad);
	}
	//deserialize
	public void readFields(DataInput in) throws IOException {
		this.telNo = in.readUTF();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
		this.totalPayLoad = in.readLong();
	}
	
	@Override
	public String toString() {
		return this.upPayLoad + "\t" + this.downPayLoad + "\t" +this.totalPayLoad;
	}
	public String getTelNo() {
		return telNo;
	}
	public void setTelNo(String telNo) {
		this.telNo = telNo;
	}
	public long getUpPayLoad() {
		return upPayLoad;
	}
	public void setUpPayLoad(long upPayLoad) {
		this.upPayLoad = upPayLoad;
	}
	public long getDownPayLoad() {
		return downPayLoad;
	}
	public void setDownPayLoad(long downPayLoad) {
		this.downPayLoad = downPayLoad;
	}
	public long getTotalPayLoad() {
		return totalPayLoad;
	}
	public void setTotalPayLoad(long totalPayLoad) {
		this.totalPayLoad = totalPayLoad;
	}

}

DataCount.java

package com.alanhou.hadoop.mr.dc;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class DataCount {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(DataCount.class);
		job.setMapperClass(DCMapper.class);
		//job.setMapOutputKeyClass(Text.class);
		//job.setMapOutputValueClass(DataBean.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setReducerClass(DCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DataBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}
	
	public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataBean>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			String telNo = fields[1];
			long up = Long.parseLong(fields[8]);
			long down = Long.parseLong(fields[9]);
			DataBean bean = new DataBean(telNo, up, down);
			context.write(new Text(telNo), bean);
		}
		
	}
	
	public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{

		@Override
		protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
				throws IOException, InterruptedException {
			long upSum =0;
			long downSum =0;
			for(DataBean bean : v2s){
				upSum += bean.getUpPayLoad();
				downSum += bean.getDownPayLoad();
			}
			DataBean bean = new DataBean("", upSum, downSum);
			context.write(key, bean);
		}
		
	}

}

完成代码编写后导出jar包

在Eclipse项目上右击选择Export,然后在弹出窗口中选择JAR file:

点击Next在接下来的窗口中勾选Export all output folders for checked projects并指定导出jar包的名称和路径。点击Finish完成即可:

执行程序:
上传HTTP_20130313143750.dat文件,并启动hdfs和yarn服务。接下来上传日志文件到hdfs上并执行程序

hadoop fs -put HTTP_20130313143750.dat /data.doc
hadoop jar example.jar com.alanhou.hadoop.mr.dc.DataCount /data.doc /dataout

通过hadoop fs -cat /dataout/part-r-00000可查看执行结果如下:

注:其中的WARN信息不影响程序运行,为安装文件的编译问题,如需解决,请参考本地安装Hadoop完整笔记部分。

退出移动版