【大数据基础】使用Maven编写第一个MapReduce程序

大数据 Alan 9年前 (2016-02-21) 6346次浏览 0个评论 扫描二维码

前面讲过了如何使用和配置Maven,那么接下来就使用Maven编写一个MapReduce程序,这里参照某视频教程中的程序进行编写。

首先在前面创建的datacount项目的src/main/java下创建一个类,基本配置如下:

【大数据基础】使用Maven编写第一个MapReduce程序

以下例子来自于传智播客某视频教程:

使用MapReduce程序分析一段手机访问日志,获取对应的上行流量、下行流量和总流量,以下是日志片段:

【大数据基础】使用Maven编写第一个MapReduce程序

关于日志的字段说明如下:

【大数据基础】使用Maven编写第一个MapReduce程序

因此我们只需获取列序号为1,8和9的三列数据(第一列序号为0),由于包含多项数据,我们封装了一个DataBean对象来做为Mapper的输出和以及Reducer的输入、输出。

编写文件如下:

DataBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.alanhou.hadoop.mr.dc;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
 
public class DataBean implements Writable {
    private String telNo;
    private long upPayLoad;
    private long downPayLoad;
    private long totalPayLoad;
     
    //add a construct method without params, otherwise error will show up during execution
    public DataBean(){}
     
    public DataBean(String telNo, long upPayLoad, long downPayLoad) {
        this.telNo = telNo;
        this.upPayLoad = upPayLoad;
        this.downPayLoad = downPayLoad;
        this.totalPayLoad = this.upPayLoad + this.downPayLoad;
    }
    //serialize
    //Notice on type & order
    public void write(DataOutput out) throws IOException {
        out.writeUTF(telNo);
        out.writeLong(upPayLoad);
        out.writeLong(downPayLoad);
        out.writeLong(totalPayLoad);
    }
    //deserialize
    public void readFields(DataInput in) throws IOException {
        this.telNo = in.readUTF();
        this.upPayLoad = in.readLong();
        this.downPayLoad = in.readLong();
        this.totalPayLoad = in.readLong();
    }
     
    @Override
    public String toString() {
        return this.upPayLoad + "\t" + this.downPayLoad + "\t" +this.totalPayLoad;
    }
    public String getTelNo() {
        return telNo;
    }
    public void setTelNo(String telNo) {
        this.telNo = telNo;
    }
    public long getUpPayLoad() {
        return upPayLoad;
    }
    public void setUpPayLoad(long upPayLoad) {
        this.upPayLoad = upPayLoad;
    }
    public long getDownPayLoad() {
        return downPayLoad;
    }
    public void setDownPayLoad(long downPayLoad) {
        this.downPayLoad = downPayLoad;
    }
    public long getTotalPayLoad() {
        return totalPayLoad;
    }
    public void setTotalPayLoad(long totalPayLoad) {
        this.totalPayLoad = totalPayLoad;
    }
 
}

DataCount.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.alanhou.hadoop.mr.dc;
 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
 
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 
public class DataCount {
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(DataCount.class);
        job.setMapperClass(DCMapper.class);
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(DataBean.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
         
        job.setReducerClass(DCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DataBean.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
         
        job.waitForCompletion(true);
    }
     
    public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
 
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataBean>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\t");
            String telNo = fields[1];
            long up = Long.parseLong(fields[8]);
            long down = Long.parseLong(fields[9]);
            DataBean bean = new DataBean(telNo, up, down);
            context.write(new Text(telNo), bean);
        }
         
    }
     
    public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
 
        @Override
        protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
                throws IOException, InterruptedException {
            long upSum =0;
            long downSum =0;
            for(DataBean bean : v2s){
                upSum += bean.getUpPayLoad();
                downSum += bean.getDownPayLoad();
            }
            DataBean bean = new DataBean("", upSum, downSum);
            context.write(key, bean);
        }
         
    }
 
}

完成代码编写后导出jar包

在Eclipse项目上右击选择Export,然后在弹出窗口中选择JAR file:

【大数据基础】使用Maven编写第一个MapReduce程序

点击Next在接下来的窗口中勾选Export all output folders for checked projects并指定导出jar包的名称和路径。点击Finish完成即可:

【大数据基础】使用Maven编写第一个MapReduce程序

执行程序:
上传HTTP_20130313143750.dat文件,并启动hdfs和yarn服务。接下来上传日志文件到hdfs上并执行程序

1
2
hadoop fs -put HTTP_20130313143750.dat /data.doc
hadoop jar example.jar com.alanhou.hadoop.mr.dc.DataCount /data.doc /dataout

通过hadoop fs -cat /dataout/part-r-00000可查看执行结果如下:

【大数据基础】使用Maven编写第一个MapReduce程序

注:其中的WARN信息不影响程序运行,为安装文件的编译问题,如需解决,请参考本地安装Hadoop完整笔记部分。

喜欢 (1)
[]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址