博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
快速入门MapReduce③ MapReduce综合练习之上网流量统计
阅读量:3952 次
发布时间:2019-05-24

本文共 6674 字,大约阅读时间需要 22 分钟。

目录

      

     

     

     

     

     

     


需求:

统计每个手机号的上行流量总和,下行流量总和,上行总流量之和,下行总流量之和

分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入

1.创建maven项目导入pom.xml

cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/
org.projectlombok
lombok
1.18.10
provided
org.apache.Hadoop
Hadoop-client
2.6.0-mr1-cdh5.14.0
org.apache.Hadoop
Hadoop-common
2.6.0-cdh5.14.0
org.apache.Hadoop
Hadoop-hdfs
2.6.0-cdh5.14.0
org.apache.Hadoop
Hadoop-mapreduce-client-core
2.6.0-cdh5.14.0
junit
junit
4.11
test
org.testng
testng
RELEASE
org.apache.maven.plugins
maven-compiler-plugin
3.0
1.8
1.8
UTF-8
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
true

2.自定义map输出value对象FlowBean

package com.czxy.flow;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Data@NoArgsConstructorpublic class FlowBean implements Writable {    private Integer upFlow;    private Integer downFlow;    private Integer upCountFlow;    private Integer downCountFlow;    @Override    public void write(DataOutput out) throws IOException {        out.writeInt(upFlow);        out.writeInt(downFlow);        out.writeInt(upCountFlow);        out.writeInt(downCountFlow);    }    @Override    public void readFields(DataInput in) throws IOException {        this.upFlow = in.readInt();        this.downFlow = in.readInt();        this.upCountFlow = in.readInt();        this.downCountFlow = in.readInt();    }}

3.定义map类

package com.czxy.flow;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper
{ // 创建FlowBean对象 FlowBean flowBean = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //136315798**** 13726230 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 游戏娱乐 24 27 2481 24681 200 // 类型转换 String s = value.toString(); // 字符串切割 String[] split = s.split("\t"); //给对象添加信息 flowBean.setUpFlow(Integer.parseInt(split[6])); flowBean.setDownFlow(Integer.parseInt(split[7])); flowBean.setUpCountFlow(Integer.parseInt(split[8])); flowBean.setDownCountFlow(Integer.parseInt(split[9])); // 输出 context.write(new Text(split[1]),flowBean); }}

4.定义reduce类

package com.czxy.flow;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReduce extends Reducer
{ private FlowBean flowBean = new FlowBean(); @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { // 定义变量 int UpFlow = 0; int DownFlow = 0; int UpCountFlow = 0; int DownCountFlow = 0; // 遍历 for (FlowBean value : values) { // 累加 UpFlow += value.getUpFlow(); DownFlow += value.getDownFlow(); UpCountFlow += value.getUpCountFlow(); DownCountFlow += value.getDownCountFlow(); } // 给对象添加信息 flowBean.setUpFlow(UpFlow); flowBean.setDownFlow(DownFlow); flowBean.setUpCountFlow(UpCountFlow); flowBean.setDownCountFlow(DownCountFlow); // 输出 context.write(key, flowBean); }}

5.定义启动类

package com.czxy.flow;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class FlowDriver extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        // 获取job        Job job = Job.getInstance(new Configuration());        //  设置支持jar执行        job.setJarByClass(FlowDriver.class);        // 设置执行的napper        job.setMapperClass(FlowMapper.class);        // 设置map输出的key类型        job.setMapOutputKeyClass(Text.class);        // 设置map输出value类型        job.setMapOutputValueClass(FlowBean.class);        // 设置执行的reduce        job.setReducerClass(FlowReduce.class);        // 设置reduce输出key的类型        job.setOutputKeyClass(Text.class);        // 设置reduce输出value的类型        job.setOutputValueClass(FlowBean.class);        // 设置文件输入        job.setInputFormatClass(TextInputFormat.class);        TextInputFormat.addInputPath(job, new Path("./data/flow/"));        // 设置文件输出        job.setOutputFormatClass(TextOutputFormat.class);        TextOutputFormat.setOutputPath(job, new Path("./outPut/flow/"));        // 设置启动类        boolean b = job.waitForCompletion(true);        return b ? 0 : 1;    }    public static void main(String[] args) throws Exception {        ToolRunner.run(new FlowDriver(), args);    }}

6.输入的文件及结果

          (提取码 0t53 )

执行结果 part-r-00000

 

1

转载地址:http://uakzi.baihongyu.com/

你可能感兴趣的文章
LDAP常见错误码
查看>>
linux yum安装rpm包出现问题
查看>>
idea编译报错类似xxx.java:[85,65] 错误: 找不到符号
查看>>
ArrayList复制
查看>>
idea打开项目时,文件左下角显示橙色J
查看>>
SQL注入
查看>>
linux中ldconfig的使用介绍
查看>>
ldap适合入门学习
查看>>
ldap学习参考博客
查看>>
linux学习之source命令与alias(别名)使用
查看>>
MYSQL常用查询
查看>>
安装Linux虚拟机绑定IP操作
查看>>
centos7离线安装 mysql
查看>>
mysql学习使用一(查询)
查看>>
Linux 学习之sed命令详解
查看>>
JAVA基础——常用IO使用
查看>>
spring框架pom.xml文件解析
查看>>
代码比较工具DiffMerge的下载和使用
查看>>
linux学习之vim全选,全部复制,全部删除
查看>>
linux 学习之awk命令
查看>>