作者: Zzay
目的: 本文章记录
hadoop 3.x使用过程中常用的配置、命令、脚本、重要图示、实现代码,以方便未来查看使用。
以下是
Hadoop 3.x常用的端口号。
以下是
Hadoop 3.x常用的生产环境参数配置。(root:$HADOOP_HOME$/etc/hadoop/)
core-site.xml:配置 Hadoop 相关的全局信息
fs.defaultFShadoop.tmp.dirhadoop.http.staticuser.userworkers:工作结点的相关配置信息
hdfs-default.xml:HDFS 默认配置信息
dfs.namenode.checkpoint.perioddfs.namenode.checkpoint.txnsdfs.namenode.checkpoint.check.periodhdfs-site.xml:HDFS 相关配置信息
dfs.namenode.http-addressdfs.namenode.secondary.http-addressmapred-site.xml:MapReduce 相关配置信息yarn-site.xml:YARN 相关配置信息
yarn.nodemanager.aux-servicesyarn.resourcemanager.hostnameyarn.nodemanager.env-whitelistyarn.log-aggregation-enableyarn.log.server.urlyarn.log-aggregation.retain-secondsResourceManager 相关的核心参数:
yarn.resourcemanager.scheduler.classyarn.resourcemanager.scheduler.client.thread-countNodeManager 相关的核心参数:
yarn.nodemanager.resource.detect-hardware-capabilitiesyarn.nodemanager.resource.cpu-vcoresyarn.nodemanager.resource.count-logical-processors-as-coresyarn.nodemanager.resource.pcores-vcores-multiplieryarn.nodemanager.resource.memory-mbyarn.nodemanager.resource.system-reserved-memory-mbyarn.nodemanager.pmem-check-enabledyarn.nodemanager.vmem-check-enabledyarn.nodemanager.vmem-pmem-ratioContainer 相关的核心参数:
yarn.scheduler.minimum-allocation-mbyarn.scheduler.maximum-allocation-mbyarn.scheduler.minimum-allocation-vcoresyarn.scheduler.maximum-allocation-vcores以下是
Hadoop 3.x使用过程中一些常用的命令。
启动/停止 Hadoop 集群:
$HADOOP_HOME$/sbin/start-dfs.sh
$HADOOP_HOME$/sbin/start-yarn.sh
$HADOOP_HOME$/sbin/stop-dfs.sh
$HADOOP_HOME$/sbin/stop-yarn.sh
启动历史服务器 historyserver:
$HADOOP_HOME$/bin/mapred --daemon start historyserver
$HADOOP_HOME$/bin/mapred --daemon stop historyserver
$ hdfs dfs COMMAND
-help:查询命令细节
$ hdfs dfs -help 命令名称
-mkdir:创建文件夹
$ hdfs dfs -mkdir 文件夹名称
-ls:显示目录信息
$ hdfs dfs -ls 目录地址
-cat:显示文件内容
$ hdfs dfs -cat 文件地址
-chmod / -chown:修改文件权限 / 所属
$ hdfs dfs -chmod 777 文件地址
$ hdfs dfs -chown zzay:zzay 文件地址
-cp:将文件从 HDFS 的一个位置拷贝到 HDFS 的另一个位置
$ hdfs dfs -cp 源文件地址 目的地址
-mv:在 HDFS 目录中移动文件
$ hdfs dfs -mv 源文件地址 目的地址
-tail:显示一个文件末尾 1KB 的数据
$ hdfs dfs -tail 文件地址
-rm:删除文件或文件夹
$ hdfs dfs -rm 文件地址
-rm -r:递归删除目录及目录内的内容
$ hdfs dfs -rm -r 目录地址
-du -s -h:统计文件夹/文件的大小信息
# 统计所给文件夹的大小信息
$ hdfs dfs -du -s -h 文件夹地址
# 统计所给文件夹内各文件的大小信息
$ hdfs dfs -du -h 文件夹地址
-setrep:设置 HDFS 中文件的副本数量
$ hdfs dfs -setrep 副本数量 文件地址
-copyFromLocal:将文件从本地复制粘贴到 HDFS
$ hdfs dfs -copyFromLocal 源文件地址 目的地址
-put:将文件从本地复制粘贴到 HDFS(等同于copyFromLocal,生产环境更习惯用put)
$ hdfs dfs -put 源文件地址 目的地址
-moveFromLocal:将文件从本地剪切到 HDFS
$ hdfs dfs -moveFromLocal 源文件地址 目的地址
-appendToFile:追加一个文件到已存在的文件末尾
$ hdfs dfs -appendToFile 源文件地址 目的地址
copyToLocal:从 HDFS 下载文件到本地
$ hdfs dfs -copyToLocal 源文件地址 目的地址
get:从 HDFS 下载文件到本地(等同于copyToLocal,生产环境更习惯用get)
$ hdfs dfs -get 源文件地址 目的地址
oiv:查看 Fsimage 镜像文件(正常cat无法查看镜像文件,需要oiv协助转换)
$ hdfs oiv -p 转换后文件类型 -i 镜像文件 -o 转换后文件输出路径
# Example(将镜像文件转换为xml文件 -> 直接查看/下载到本地查看)
$ hdfs oiv -p XML -i fsimage_000000000000000025 -o /opt/module/hadoop-3.1.3/fsimage.xml
$ cat /opt/module/hadoop-3.1.3/fsimage.xml
$ sz /opt/module/hadoop-3.1.3/fsimage.xml
oev:查看 Edits 编辑日志(正常cat无法查看 Edits 编辑日志,需要oev协助转换)
$ hdfs oev -p 转换后文件类型 -e 编辑日志 -o 转换后文件输出路径
# Example(将Edits编辑日志转换为xml文件 -> 直接查看/下载到本地查看)
$ hdfs oev -p XML -i edits_000000000000000012-000000000000000013 -o /opt/module/hadoop-3.1.3/edits.xml
$ cat /opt/module/hadoop-3.1.3/edits.xml
$ sz /opt/module/hadoop-3.1.3/edits.xml
$ yarn COMMAND
yarn application:查看任务相关信息
# 列出所有的Application
$ yarn application -list
# 根据Application状态,列出状态匹配的所有Application
# (ALL, NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED)
$ yarn application -list -appStates
# 杀死对应的Application
$ yarn application -kill
# 修改applicationId对应的Application的优先级
$ yarn application -applicationid -updatePriority
yarn logs:查看日志信息(Application日志、Container日志)
# 查看applicationId对应的Application的日志
$ yarn logs -applicationId
# 查看applicationId和containerId共同对应的Container的日志
$ yarn logs -applicationId -containerId
yarn applicationattempt:查看尝试运行的任务的相关信息
# 查看applicationId对应的Application的所有运行尝试
$ yarn applicationattempt -list
# 查看applicationAttemptId对应的ApplicationAttempt的状态
$ yarn applicationattempt -status
yarn container:查看容器相关信息
# 查看与某次ApplicationAttempt相关的容器的信息,以及这次尝试的开始和结束时间
$ yarn container -list
# 查看某个Container的状态(有任务运行时才能够显示)
$ yarn container -status
yarn node:查看结点相关信息
# 列出所有结点
$ yarn node -list -all
yarn rmadmin:更新配置信息
# 加载队列配置
$ yarn rmadmin -refreshQueues
yarn queue:查看队列相关信息
# 根据所给队列名称,打印对应队列的状态信息
$ yarn queue -status
# 打印默认队列的状态信息
$ yarn queue -status default
以下包含
Hadoop 3.x的一些重要图示(原理、流程)。
以下是
Hadoop 3.x常用的脚本。
xsync:集群分发,向其他结点同步指定的文件(底层通过rsync实现)。
#!/bin/bash
# 1.判断参数个数
if [ $# -lt 1 ]
then
echo "Not Enough Arguement!"
exit;
fi
# 2.遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
echo "==================== $host ===================="
# 3.遍历参数所给的所有目录,逐个分发
for file in $@
do
# 4.判断文件是否存在
if [ -e $file ]
then
# 5.获取父目录
pdir=$(cd -P $(dirname $file); pwd)
# 6.获取当前文件
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir/
else
echo "$file does not exist!"
fi
done
done
myhadoop.sh:同时控制集群的启动与关闭。
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Arguements Error..."
exit;
fi
case $1 in
"start")
echo "===================== 启动 hadoop 集群 ===================="
echo "--------------------- 启动 HDFS ---------------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
echo "--------------------- 启动 YARN ---------------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
echo "--------------------- 启动 historyserver ---------------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")
echo "==================== 关闭 hadoop 集群 ===================="
echo "--------------------- 关闭 historyserver ---------------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
echo "--------------------- 关闭 YARN ---------------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
echo "--------------------- 关闭 HDFS ---------------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
echo "Input Arguements Error..."
;;
esac
jpsall:所有结点同时调用jps,用于获取当前各结点具体信息。
#!/bin/bash
for host in hadoop102 hadoop103 hadoop104
do
echo "==================== $host ===================="
ssh $host jps
done
以下包含对
Hadoop3.x在Java中相关API的调用操作演示实例。
Maven配置:
org.apache.hadoop
hadoop-client
3.1.3
junit
junit
4.12
org.slf4j
slf4j-log4j12
1.7.25
maven-compiler-plugin
3.6.1
1.8
1.8
maven-assembly-plugin
jar-with-dependencies
make-assembly
package
single
log4j.properties配置:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
loq4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
以下包含 HDFS 在Java中相关API的调用操作。
导入包:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
// HDFS client instance
private FileSystem fileSystem;
/**
* Initialization.
*/
@Before
public void init() throws URISyntaxException, IOException, InterruptedException {
// Address of the NameNode of the cluster connected
URI nameNodeURI = new URI("hdfs://hadoop102:8020");
// Hadoop Configuration
Configuration configuration = new Configuration();
// User
String user = "zzay";
// Get a HDFS client instance
fileSystem = FileSystem.get(nameNodeURI, configuration, user);
}
/**
* Close the hadoop file system.
*/
@After
public void close() throws IOException {
fileSystem.close();
}
/**
* Create a directory on HDFS.
*/
@Test
public void testMkdir() throws IOException {
// File path string
String dstPathStr = "hdfs://hadoop102/xiyou/huaguoshan";
// File path
Path dstPath = new Path(dstPathStr);
fileSystem.mkdirs(dstPath);
}
/**
* Put a file onto HDFS.
*
* 参数说明:
* (1) boolean delSrc: 是否删除源数据
* (2) boolean overwrite: 若有目的路径有同名文件,是否覆盖
* (3) Path/Path[] srcs/src: 源文件路径
* (4) Path dst: 目标路径
*/
@Test
public void testPut() throws IOException {
// File path string
String srcPathStr = Objects.requireNonNull(this.getClass().getClassLoader().getResource("./texts/sunwukong.txt")).getPath();
String dstPathStr = "hdfs://hadoop102//xiyou/huaguoshan";
// File path
Path srcPath = new Path(srcPathStr);
Path dstPath = new Path(dstPathStr);
fileSystem.copyFromLocalFile(false, false, srcPath, dstPath);
}
/**
* Get a file from HDFS.
*
* 参数说明:
* (1) boolean delSrc: 是否删除源数据
* (2) Path src: 源文件路径
* (3) Path dst: 目标路径
* (4) boolean useRawLocalFileSystem: 基本用不到
*/
@Test
public void testGet() throws IOException {
// File path string
String srcPathStr = "hdfs://hadoop102//xiyou/huaguoshan";
String dstPathStr = "C:/Users/Dal-Z41/Desktop/sunwukong_get.txt";
// File path
Path srcPath = new Path(srcPathStr);
Path dstPath = new Path(dstPathStr);
fileSystem.copyToLocalFile(false, srcPath, dstPath);
}
/**
* Remove a file on HDFS.
*
* 参数说明:
* (1) Path f: 目标文件路径
* (2) boolean recursive: 是否递归删除
*/
@Test
public void testRm() throws IOException {
// File path string
String filepathStr = "hdfs://hadoop102/jdk-8u212-linux-x64.tar.gz";
// File path
Path filepath = new Path(filepathStr);
fileSystem.delete(filepath, false);
}
/**
* Rename a file/directory; move a file.
*
* 参数说明:
* (1) Path src: 源文件路径
* (2) Path dst: 目标文件路径
* (2) boolean recursive: 是否递归删除
*/
@Test
public void testRenameAndMove() throws IOException {
// File path string
String srcPathStr = "hdfs://hadoop102/input/word.txt";
String newNameStr = "hdfs://hadoop102/input/ss.txt";
String dstPathStr = "hdfs://hadoop102/cls.txt";
// File path
Path srcPath = new Path(srcPathStr);
Path newNamePath = new Path(newNameStr);
Path dstPath = new Path(dstPathStr);
// Rename
fileSystem.rename(srcPath, newNamePath);
// Move
fileSystem.rename(newNamePath, dstPath);
}
查看某个文件的详细信息(权限,所属,所在组,大小,最近修改时间,副本数,块大小,名称,块信息):
/**
* Get detailed information of a file.
*
* 参数说明:
* (1) Path f: 目标文件路径
* (2) boolean recursive: 是否递归查询
*/
@Test
public void getFileDetails() throws IOException {
// File path string
String filepathStr = "hdfs://hadoop102/";
// File path
Path filepath = new Path(filepathStr);
// Get a list of files and traverse them
RemoteIterator listFiles = fileSystem.listFiles(filepath, true);
while (listFiles.hasNext()) {
// Each file stored in HDFS
LocatedFileStatus fileStatus = listFiles.next();
// Block locations of each file
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println("==========" + fileStatus.getPath() + "==========");
System.out.println("Permission: " + fileStatus.getPermission());
System.out.println("Owner: " + fileStatus.getOwner());
System.out.println("Group: " + fileStatus.getGroup());
System.out.println("Size: " + fileStatus.getLen());
System.out.println("Last Modified: " + fileStatus.getModificationTime());
System.out.println("Replication: " + fileStatus.getReplication());
System.out.println("Block Size: " + fileStatus.getBlockSize());
System.out.println("Name: " + fileStatus.getPath().getName());
System.out.println("Block locations: " + Arrays.toString(blockLocations));
}
}
判断给定对象是文件还是文件夹:
/**
* Judge whether the given object is a file or a directory.
*
* 参数说明:
* (1) Path/Path[] f/files: 目标文件路径
* (2) PathFilter filter: 路径过滤器
*/
@Test
public void testFileOrDir() throws IOException {
// File path string
String filepathStr = "hdfs://hadoop102/";
// File path
Path filepath = new Path(filepathStr);
// Get file statuses and traverse them
FileStatus[] fileStatuses = fileSystem.listStatus(filepath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
System.out.println("File: " + fileStatus.getPath().getName());
} else if (fileStatus.isDirectory()) {
System.out.println("Directory: " + fileStatus.getPath().getName());
}
}
}
以下包含 MapReduce 在Java中相关API的调用操作,以举例形式记录。
WordCountMapper: 读取数据文件,对单词进行切割,并以(xxx,1)的形式存储到最终要输出到Reducer的K-V对中。
package com.zzay.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author zzay
* @className WordCountMapper
* @description 读取数据文件,对单词进行切割,并以(xxx,1)的形式存储到最终要输出到Reducer的K-V对中。
* 【Mapper泛型参数说明】
* KEYIN: map阶段输入的key的类型:LongWritable
* VALUEIN: map阶段输入value类型:Text
* KEYOUT: map阶段输出的Key类型:Text
* VALUEOUT: map阶段输出的value类型:IntWritable
* @create 2022/03/29 13:06
*/
public class WordCountMapper extends Mapper {
// Output key of the final K-V pair sent to Reducer
private final Text outKey = new Text();
// Output value of the final K-V pair sent to Reducer
private final IntWritable outValue = new IntWritable(1);
/**
* 读取数据文件,对单词进行切割,并存储到最终要输出到Reducer的K-V对中。
*
* @param key 该单词在所给数据中的偏移量
* @param value 单词字符串
* @param context 关联Mapper和Reducer及系统代码的上下文对象
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// (1) Read a line of data
String lineData = value.toString();
// (2) Split words
String[] words = lineData.split(" ");
// (3) Traverse and write out to the final output K-V pair sent to Reducer
for (String word: words) {
outKey.set(word);
context.write(outKey, outValue);
}
}
}
WordCountReducer: 根据上下文对象,获得Mapper的最终输出。根据输出,统计各个单词的出现次数,并存入到最终输出的K-V对中。
package com.zzay.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author zdz
* @className WordCountReducer
* @description 根据上下文对象,获得Mapper的最终输出。根据输出,统计各个单词的出现次数,并存入到最终输出的K-V对中。
* 【Reducer泛型参数说明】
* KEYIN: reduce阶段输入的key的类型:Text
* VALUEIN: reduce阶段输入value类型:IntWritable
* KEYOUT: reduce阶段输出的Key类型:Text
* VALUEOUT: reduce阶段输出的value类型:IntWritable
* @create 2022/03/29 13:06
*/
public class WordCountReducer extends Reducer {
// Record the occurrence times of each word
IntWritable outValue = new IntWritable();
/**
* 根据上下文对象,获得Mapper的最终输出。根据输出,统计各个单词的出现次数,并存入到最终输出的K-V对中。
*
* @param key 单词字符串
* @param values 输入value固定为1
* @param context 关联Mapper和Reducer及系统代码的上下文对象
*/
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outValue.set(sum);
context.write(key, outValue);
}
}
WordCountDriver: 关联Mapper、Reducer和系统代码,定义输入输出格式,实现业务逻辑。
package com.zzay.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author zdz
* @className WordCountDriver
* @description 关联Mapper、Reducer和系统代码,定义输入输出格式,实现业务逻辑。
* @create 2022/03/29 13:06
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// (1) 获取配置信息,获取job对象实例
Job job = Job.getInstance(new Configuration());
// (2) 指定本程序的jar包所在的本地路径
job.setJarByClass(WordCountDriver.class);
// (3) 关联Mapper/Reducer业务类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// (4) 指定Mapper输出数据的K-V类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// (5) 指定最终输出数据的K-V类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// (6) 指定job的输入原始文件所在目录,job的输出结果所在目录
FileInputFormat.setInputPaths(job, new Path("..."));
FileOutputFormat.setOutputPath(job, new Path("..."));
// (7) 提交作业
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
本例子主要演示:
ProvincePartitioner: 根据省份对信息进行分区。
/**
* @author zzay
* @className ProvincePartitioner
* @description 根据省份对信息进行分区
* [参数说明]
* KEY:根据KEY的内部细节,制定相应的分区策略
* VALUE:该KEY对应的Value数据
* @create 2022/03/29 21:32
*/
public class ProvincePartitioner extends Partitioner {
/**
* 根据所给数据信息,判断该数据应分配的分区,并返回相应的分区ID。
*
* @param flowBean 一条数据
* @param text 手机号
* @param numPartitions 分区总数目
* @return 给这条数据所分配的分区ID
*/
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
// Original phone number
String phone = text.toString();
// Prefix of the phone number
String phonePrefix = phone.substring(0, 3);
// ID of the partition the current data should be allocated
int partition;
if ("136".equals(phonePrefix)) {
partition = 0;
} else if ("137".equals(phonePrefix)) {
partition = 1;
} else if ("138".equals(phonePrefix)) {
partition = 2;
} else if ("139".equals(phonePrefix)) {
partition = 3;
} else {
partition = 4;
}
return partition;
}
}
FlowBean: 模拟自定义的类实例对象。
1、定义类实现WritableComparable接口,从而能够进行序列化和反序列化,以及自定义比较策略。
2、重写序列化方法write和反序列化方法readFields。
3、重写空参构造。
4、toString方法。
/**
* @author zzay
* @className FlowBean
* @description 模拟需要序列化传输的Bean对象
* 1、定义类实现writable接口
* 2、重写序列化和反序列化方法
* 3、重写空参构造
* 4、toString方法
* @create 2022/03/29 16:06
*/
public class FlowBean implements WritableComparable {
// Up Flow
private long upFlow;
// Down flow
private long downFlow;
// Sum of upFlow and downFlow
private long sumFlow;
// No-args constructor
public FlowBean() {}
/**
* Serialization.
*
* @param out DataOutput
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* DeSerialization.
*
* @param in DataInput
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public int compareTo(FlowBean o) {
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
if (this.upFlow > o.upFlow) {
return 1;
} else if (this.upFlow < o.upFlow) {
return -1;
} else {
return 0;
}
}
}
@Override
public String toString() {
return upFlow + " " +
downFlow + " " +
sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
// ...
}
FlowMapper: 负责序列化业务的Mapper类。
/**
* @author zzay
* @className FlowMapper
* @description 序列化业务的Mapper类
* @create 2022/03/29 16:11
*/
public class FlowMapper extends Mapper {
// Output value: FlowBean instance
private final FlowBean outKey = new FlowBean();
// Output key: String phone
private final Text outValue = new Text();
/**
* Map过程。
*
* @param key 数据的行ID
* @param value 一行数据
* @param context 关联Mapper,Reducer和系统代码的上下文对象
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// (1) Get a line of data
String lineData = value.toString();
// (2) Split the lineData
// [id, phone, ip, domainName, upFlow, downFlow, status]
String[] data = lineData.split(" ");
// (3) Get data expected (phone, flows)
String phone = data[0];
String upFlow = data[1];
String downFlow = data[2];
// (4) Encapsulate data
outKey.setUpFlow(Long.parseLong(upFlow));
outKey.setDownFlow(Long.parseLong(downFlow));
outKey.setSumFlow();
outValue.set(phone);
// (5) Write out
context.write(outKey, outValue);
}
}
FlowReducer: 负责序列化业务的Reducer类
Mapper 输出的 Key 为FlowBean对象,即一行数据;输出的 Value 为手机号。
通常来说,一个 key 可能对应多个 value 。但是这种情况下,一个 key(一行数据)只能对应一个 value(手机号),因为基本不可能有两行数据完全一样而不属于一个手机号。
所以说,此时的 Reducer 相当于没有做额外操作,只是将 key 和 value 的位置进行反转。
/**
* @author zzay
* @className FlowReducer
* @description 负责序列化业务的Reducer类
* @create 2022/03/29 16:21
*/
public class FlowReducer extends Reducer {
/**
* Reduce过程。
* Mapper输出的Key为FlowBean对象,即一行数据;输出的Value为手机号。
* 通常来说,一个key可能对应多个value。但是这种情况下,一个key(一行数据)只能对应一个value(手机号),因为基本不可能有两行数据完全一样而不属于一个手机号。
* 所以说,此时的Reducer相当于没有做额外操作,只是将key和value的位置进行反转。
*
* @param key 一行数据(手机号,流量)
* @param values Mapper提取出的手机号
* @param context 关联Mapper,Reducer和系统代码的上下文对象
*/
@Override
protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
// 实际上只会进行一轮
context.write(value, key);
}
}
}
FlowDriver: 关联Mapper、Reducer和系统代码,定义最终输入输出格式,实现业务逻辑。
需要注意自定义分区时,需要加入job.setPartitionerClass来设置自定义的分区类配置。
此外,还需要记得设置 ReduceTask 的数目,以合理匹配分区数目。
/**
* @author zzay
* @className FlowDriver
* @description 关联Mapper、Reducer和系统代码,定义最终输入输出格式,实现业务逻辑。
* @create 2022/03/29 16:26
*/
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// ...
// 设置自定义的分区策略
job.setPartitionerClass(ProvincePartitioner.class);
// 设置ReduceTask的数目,以合理匹配分区数目
job.setNumReduceTasks(5);
// ...
}
}
基于先前的WordCount例子延伸。
- 核心:引入Combiner类 / 以 Reducer 类自身作为 CombinerClass。
- 主要功能:对每个 MapTask 的输出做局部汇总,以减轻最终 Reducer 处理输入时所需的磁盘IO。
- 注意点:不影响最终的业务逻辑;在每个 MapTask 所在的结点运行。
WordCountCombiner: WordMapper 和 WordReducer 之间的 Combiner。
/**
* @author zzay
* @className WordCountCombiner
* @description WordMapper和WordReducer之间的Combiner类
* @create 2022/03/30 00:13
* @see com.zzay.mapreduce.combiner.normal.WordCountMapper
* @see com.zzay.mapreduce.combiner.normal.WordCountReducer
*/
public class WordCountCombiner extends Reducer {
private final IntWritable outValue = new IntWritable();
/**
*
* @param key 单词字符串
* @param values 输入value固定为1
* @param context 关联Mapper和Reducer及系统代码的上下文对象
*/
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value: values) {
sum += value.get();
}
outValue.set(sum);
context.write(key, outValue);
}
}
WordCountDriver: 关联Mapper、Reducer和系统代码,定义输入输出格式,实现业务逻辑。
/**
* @author zdz
* @className WordCountDriver
* @description 关联Mapper、Reducer和系统代码,定义输入输出格式,实现业务逻辑。
* @create 2022/03/29 13:06
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// ...
// 配置Combiner类
job.setCombinerClass(WordCountCombiner.class);
// ...
}
}
: