配置静态网络:
其他已经配置好
虚拟机里操作:
vi /etc/sysconfig/network-scripts/ifcfg-ens33
#修改
BOOTPROTO='static'
ONBOOT='yes'
#添加
IPADDR=192.168.51.100
NETMASK=225.225.225.0
GATEWAY=192.168.51.1
DNS1=8.8.8.8
然后重启网卡
systemctl restart network
安装常用软件
yum -y install vim
yum -y install net-tools
yum -y install openssl-devel
然后克隆三台,记得各自改ip
关闭防火墙:
systemctl stop firewalld
systemctl disable firewalld
关闭selinux:
vi /etc/sysconfig/selinux
SELINUX=disabled
更改主机名
vi /etc/hostname
node1
node2
node3
做主机名和IP地址的映射
vi /etc/hosts
192.268.51.110 node1 node01
192.268.51.120 node2 node02
192.268.51.130 node3 node03
三台机器时钟同步
必须连接到外网
yum -y install ntpdatei
阿里云时钟同步服务器
ntpdate ntp4.aliyun.com
三天机器定时任务
crontab -e
添加如下内容
*/1 * * * * /usr/sbin/ntpdate ntp4.aliyun.com;
添加普通用户,使其具有sudo权限
useradd hadoop
passwod hadoop
设置123456
添加root权限
visudo
增加内容
hadoop ALL=(ALL) ALL
三台定义统一目录
mkdir -p /kkb/soft #软件压缩包存放目录
mkdir -p /kkb/install #软件解压后存放目录
chown -R hadoop:hadoop /kkb
配置免密码登录
三台机器下各生成公钥私钥
ssh-keygen -t rsa
三台机器在hadoop用户下,执行拷贝公钥到node01服务器
ssh-copy-id node01
将node01的公钥拷贝给node02与node03
cd /home/hadoop/.ssh/
scp authorized_keys node02:$PWD
scp authorized_keys node03:$PWD
验证
ssh node02
解压,配置环境变量
上传文件,解压
配置环境变量
修改hadoop-env.sh
cd /kkb/install/hadoop/etc/hadoop/
vim hadoop-env.sh
export JAVA_HOME=/kkb/insyall/jdk
修改core-site.xml
vim core-site.xml
修改hdfs-site.xml
修改mapred-site.xml
修改yarn-site.xml
修改work文件
第一台机器执行
vim worker
#更换内容
node01
node02
node03
创建文件存放目录
node01机器下创建以下目录
mkdir -p /kkb/install/hadoop/hadoopDatas/tempDatas
mkdir -p /kkb/install/hadoop/hadoopDatas/namenodeDAtas
mkdir -p /kkb/install/hadoop/hadoopDatas/datanodeDatas
mkdir -p /kkb/install/hadoop/hadoopDatas/dfs/nn/edits
mkdir -p /kkb/install/hadoop/hadoopDatas/dfs/snn/name
mkdir -p /kkb/install/hadoop/hadoopDatas/dfs/nn/snn/edits
安装包分发与rsync
scp拷贝
scp -r 文件名 username@host:文件路径
格式化集群
在node01执行一遍就行
hdfs namenode -format
集群启动
start-dfs.sh
start-yarn.sh
mapred --daemon start historyserver
验证是否成功
hdfs集群访问地址Browsing HDFS
yarn集群访问地址 All Applications
jobhistory集群访问地址 JobHistory
Hadoop起源
hadoop的创始者是Doug Cutting,起源于Nutch项目,该项目是作者尝试构建的一个开源的Web搜索引擎。起初该项目遇到了阻碍,因为始终无法将计算分配给多台计算机。后受到谷歌发表的关于GFS和MapReduce相关的论文的启发,最终让Nutch可以在多台计算机上稳定的运行。后来雅虎对这项技术产生了很大的兴趣,并组建了团队开发,从Nutch中剥离出分布式计算模块命名为“Hadoop”。最终Hadoop在雅虎的帮助下能够真正的处理海量的Web数据。
狭义上:指hadoop软件
广义上:指Hadoop生态圈,包含 hadoop,durid,flink,zookeeper,shark,hue,ooize,elasticsearch,solr,phoenix,hive,hbase,flume,storm,sqoop,Kafka,spark,impala
版本区别:
运行模式:
单机版
无需守护进程,所有程序都在一个jvm上执行。在独立模式下调试mr比较方便。一般用于学习或者调试
伪分布式模式
守护进程运行在本地机器上,模拟一个小规模集群,可以配置一台机器的集群。
完全分布式模式
守护进程在一个集群上,需要多台机器
架构介绍:
2.0版本以后Hadoop由三个模块组成:分布式文件存储hdfs,分布式计算mapreduce,资源管理调度引擎yarn
主从架构
hdfs:namenode(管理集群,存元数据信息),SecondaryNameNode,datanode(存数据)
yarn:REsourceManager(主要资源调度分配),NodeManager(提供具体资源)
分块存储
block块:默认128mb,具有三个副本
文件保存到hdfs会按128mb切分为一个个block块
以block块的形式保存在hdfs中
hadoop1.x默认64mb
Hadoop2以后,默认为128mb,也可在hdfs-site.xml中设定
<property>
<name>dfs.blocksizename>
<value>字节数value>
property>
hdfs参考属性:https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml
其他属性类似位置
block元数据
抽象成块的好处
机架感知
通过一个机架感知的过程,NameNode可以确定每一个 DataNode所属的机架id(这也是NameNode采用NetworkTopology数据结构来存储数据节点的原因)。
一个简单但没有优化的策略就是将副本存放在不同的机架上,这样可以防止当整个机架失效时数据
的丢失,并且允许读数据的时候充分利用多个机架的带宽。这种策略设置可以将副本均匀分布在集
群中,有利于当组件失效的情况下的均匀负载,但是,因为这种策略的一个写操作需要传输到多个
机架,这增加了写的代价。
HDFS的存放策略是将一个副本存放在本地机架节点上,一个副本存放在同一个机架的另一个节点上,最后一个副本放在不同机 架的节点上。这种策略减少了机架间的数据传输,提高了写操作的效率。机架的错误远远比节点的错误少,所以这种策略不会影响到数据的可靠性和可用性
hdfs架构
包括:
扩展
块缓存:
对于频繁访问的文件,其对应的块可能被缓存子啊datanode的内存中,以堆外块缓存的形式存在
文件权限验证
与linux的文件系统的机制类似。建议使用kerberos或ranger来做权限验证。
俩种风格
hadoop fs
开头hdfs dfs
开头帮助信息
hadoop fs -help ls
查看指定目录的文件列表
hadoop fs -ls /
在hdfs中创建文件
hadoop fs -touchz /a.txt
向文件追加内容
hadoop fs -appenfToFile b.txt /a.txt #将本地磁盘b.txt的内容追加到hdfs中a.txt的末尾
查看文件内容
hadoop fs -cat /a.txt
本地文件上传到hdfs
hadoop fs -put ./shuguo.txt /test
hadoop fs -copyFromLocal ./shuguo.txt /test #复制上去
hadoop fs -moveFromLocal ./shuguo.txt /test #剪切上去
下载文件
hadoop fs -get /test/shuguo.txt ./
hadoop fs -copyToLocal /test/shuguo.txt ./
创建目录
hadoop fs -mkdir /shell
删除文件
hadoop fs -rm /a.txt
hadoop fs -rm -skipTrash /a #彻底删除,不放入hdfs垃圾桶
修改文件名称
hadoop fs -mv /a /b
拷贝文件
hadoop fs -cp /a.txt /b
递归删除
hadoop fs -rm -r /a
列出本地文件的内容(默认是hdfs文件系统)
hadoop fs -ls file:///home/hadoop/
查找文件
hadoop fs -find / -name asd #在hdfs的/目录下,查找asd的文件
组合查询
hdfs和getconf结合
查询帮助信息
hdfs getconf
获取namenode的节点名称
hdfs getconf -namenodes
获取hdfs最小块信息
hdfs getconf -confKey dfs.namenode.fs-limits.min-block-size
相同命令可以获取其他属性值
hdfs的namenode的RPC地址
hdfs getconf -nnRpcAddresses
hdfs和dfsadmin结合使用
查询帮助信息
hdfs dfsadmin
查看当前的模式
hdfs dfsadmin -safemode get
进入/退出安全模式
hdfs dfsadmin -safemode enter
hdfs dfsadmin -safemode leave
hdfs和fsck结合使用
显示hdfs块信息
hdfs fsck /02-041-0029.mp4 -file -blocks -locations #查看文件块信息
其他命令
检查压缩库本地安装情况
hadoop checknative
格式化名称节点(一般只在初始搭建集群后用)
hadoop namenode -format
执行jar包
hadoop jar a.jar pi 3 3
yarn jar a.jar pi 3 3
优点
高容错
自动保存多个副本。都一个副本丢失后,可以自动恢复
适合批处理
把数据位置暴露给计算框架,通过移动计算而不是移动数据,提高效率
是和大数据处理
流式数据访问
一次写入,多次读取,不能随机修改,只能追加,保证数据的一致性
可构建在廉价机器上
多副本机制,提高可靠性。提供容错和回复机制。
缺点
hdfs安全模式
是hdfs的一种特殊状态,只允许读,不允许写
例子
//创建configuration
Configuration configuration= new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.51.110:8020");
//创建文件系统流
FileSystem fileSystem = FileSystem.get(configuration);
//操作
boolean mkdir= fileSystem.mkdirs(new Path("/gao/dir2"));
System.out.println(mkdir);
//关闭文件系统流
fileSystem.close();
datanode工作机制
数据完整性
当客户端向hdfs写数据时候
当DataNode读取Block的时候,
DataNode在其文件创建后周期验证CheckSum。
掉线时限参数设置
需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。
datanode的目录结构
/usr/local/study/hadoop/data/datanodedata/current
可以查看版本号
datanode的多目录配置
datanode也可以配置多个目录,每个目录存储的数据不一样,数据不是副本
修改配置文件hdfs-site.xml
<property>
<name>dfs.datanode.data.dirname>
<value>
file:///${hadoop.tmp.dir}/dfs/data1,
file:///${hadoop.tmp.dir}/dfs/data2
value>
property>
HDFS写流程
客户端通过对FileSystem.create() 对象创建建文件,DistributedFileSystem会创建输出流FSDataOutputStream。
DistributedFileSystem 通过RPC远程调用名称节点,在文件系统的命名空间中创建一个新的文件,此时该文件中还没有相应的数据块。
名称节会执行一些检查,比如文件是否已经存在、客户端是都有权限创建文件等。检查通过后,名称节点会构造一个新文件,并添加文件信息。如果检查不通过,文件创建失败会向客户端抛一个 IOException 异常。
DistributedFileSystem利用DFSOutputStream来实例化FSDataOutputStream,返回给客户端,客户端使用这个输出流写入数据(new DFSDataOutputStream)。
了解:DFSOutputStream负责处理 DataNode 和 NameNode 之间的通信
客户端向输出流FSDataOutputStream中写入的数据会被分成一个个的分包,这些分包被放入DFSOutputStream对象的内部队列“dataQueue”
输出流FSDataOutputStream会向名称节点申请保存文件和副本数据块的若干个数据节点,这些数据节点(DataNode)形成一个数据流管道(Pipeline)。队列中的分包最后被打包成数据包,发往数据流管道的第一个数据节点, 第一个数据节点将数据包发送给第二个节点,第二个数据节点发送给第三个数据节点,数据包流经管道上个各个数据节点(即流水线复制策略)
接受确认包,因为各个数据节点位于不同的机器,数据需要通过网络发送。为了保证所有数据节点的数据都是准确的,接收到数据的数据节点要向发送这发送"确认包"(ACK Packet)。确认包沿着数据流管道逆流而上,从数据流管道一次经过各个数据节点并最终发往客户端(DataStreamer),客户端收到应答时,它将u四月 那个的分包从内部队列移除。不断执行(4)~(7)步,直到数据全部写完。
客户端调用close()方法关闭输出流。
了解:此时,客户端不会再向输出流中写入数据,所有,当DFSOutputStream对象内部队列中的分包都收到应答后,就可以使用ClientProtocal.complete()方法通知名称节点关闭文件,完成一次正常的写文件的过程。
HDFS读流程
注意:在读取数据的过程中,如果客户端与数据节点通信时出现错误,就会尝试连接包含此数据块的下一个数据节点
hdfs 读写容错
2.2 一文了解HDFS数据读、写原理及容错机制 - 知乎 (zhihu.com)
namenode负责集群的元数据管理,因为元素据随机访问,所有元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage。
当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。==因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。==这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。
但是,如果长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。
checkpoint流程
namenode的多目录配置
namenode也可以配置多个目录,每个目录存储的数据不一样,数据不是副本
修改配置文件hdfs-site.xml
<property>
<name>dfs.namenode.data.dirname>
<value>
file:///${hadoop.tmp.dir}/dfs/namenodedata1,
file:///${hadoop.tmp.dir}/dfs/namenodedata2
value>
property>
出现问题原因:
解决方案
(1) HAR文件方案
· 本质启动mr程序,所以需要启动yarn
用法:archive -archiveName
第一步:创建归档文件
注意:归档文件一定要保证yarn集群启动
hadoop archive -archiveName myhar.har -p /user/hadoop /user
第二步:查看归档文件内容
hdfs dfs -ls -R /user/myhar.har
hdfs dfs -ls -R har:///user/myhar.har
第三步:解压归档文件
hdfs dfs -mkdir -p /user/har
hdfs dfs -cp har:///user/myhar.har/* /user/har/
hdfs dfs -R /user/har
(2) Sequence Files方案
SequenceFile文件,主要由一条条record记录组成;
具体结构(如上图):
SequenceFile文件可以作为小文件的存储容器;
一个SequenceFile是可分割的,所以MapReduce可将文件切分成块,每一块独立操作。
不像HAR,SequenceFile支持压缩。记录的结构取决于是否启动压缩
支持两类压缩:
在大多数情况下,以block(注意:指的是SequenceFile中的block)为单位进行压缩是最好的选择
因为一个block包含多条记录,利用record间的相似性进行压缩,压缩效率更高
把已有的数据转存为SequenceFile比较慢。比起先写小文件,再将小文件写入SequenceFile,一个更好的选择是直接将数据写入一个SequenceFile文件,省去小文件作为中间媒介.
向SequenceFile写入数据
package com.kaikeba.hadoop.sequencefile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import java.io.IOException;
import java.net.URI;
public class SequenceFileWriteNewVersion {
//模拟数据源;数组中一个元素表示一个文件的内容
private static final String[] DATA = {
"The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.",
"It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.",
"Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer",
"o delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.",
"Hadoop Common: The common utilities that support the other Hadoop modules."
};
public static void main(String[] args) throws IOException {
//输出路径:要生成的SequenceFile文件名
String uri = "hdfs://node01:8020/writeSequenceFile";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
//向HDFS上的此SequenceFile文件写数据
Path path = new Path(uri);
//因为SequenceFile每个record是键值对的
//指定key类型
IntWritable key = new IntWritable(); //key数字 -> int -> IntWritable
//指定value类型
Text value = new Text();//value -> String -> Text
//创建向SequenceFile文件写入数据时的一些选项
//要写入的SequenceFile的路径
SequenceFile.Writer.Option pathOption = SequenceFile.Writer.file(path);
//record的key类型选项
SequenceFile.Writer.Option keyOption = SequenceFile.Writer.keyClass(IntWritable.class);
//record的value类型选项
SequenceFile.Writer.Option valueOption = SequenceFile.Writer.valueClass(Text.class);
//SequenceFile压缩方式:NONE | RECORD | BLOCK三选一
//方案一:RECORD、不指定压缩算法
// SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
// SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
//方案二:BLOCK、不指定压缩算法
// SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK);
// SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
//方案三:使用BLOCK、压缩算法BZip2Codec;压缩耗时间
//再加压缩算法
BZip2Codec codec = new BZip2Codec();
codec.setConf(conf);
SequenceFile.Writer.Option compressAlgorithm = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, codec);
//创建写数据的Writer实例
SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressAlgorithm);
for (int i = 0; i < 100000; i++) {
//分别设置key、value值
key.set(100000 - i);
value.set(DATA[i % DATA.length]); //%取模 3 % 3 = 0;
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
//在SequenceFile末尾追加内容
writer.append(key, value);
}
//关闭流
IOUtils.closeStream(writer);
}
}
命令查看SequenceFile内容
hadoop fs -text /writeSequenceFile
读取SequenceFile文件
package com.kaikeba.hadoop.sequencefile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
public class SequenceFileReadNewVersion {
public static void main(String[] args) throws IOException {
//要读的SequenceFile
String uri = "hdfs://node01:8020/writeSequenceFile";
Configuration conf = new Configuration();
Path path = new Path(uri);
//Reader对象
SequenceFile.Reader reader = null;
try {
//读取SequenceFile的Reader的路径选项
SequenceFile.Reader.Option pathOption = SequenceFile.Reader.file(path);
//实例化Reader对象
reader = new SequenceFile.Reader(conf, pathOption);
//根据反射,求出key类型对象
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
//根据反射,求出value类型对象
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
System.out.println(position);
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
//移动到下一个record开头的位置
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}
多个集群之间数据拷贝
在我们实际工作当中,极有可能会遇到将测试集群的数据拷贝到生产环境集群,或者将生产环境集群的数据拷贝到测试集群,那么就需要我们在多个集群之间进行数据的远程拷贝,hadoop自带也有命令可以帮我们实现这个功能
cd /kkb/soft
scp -r jdk-8u141-linux-x64.tar.gz hadoop@node02:/kkb/soft
hadoop distcp hdfs://node01:8020/jdk-8u141-linux-x64.tar.gz hdfs://cluster2:8020/
hdfs快照snapshot管理
快照顾名思义,就是相当于对我们的hdfs文件系统做一个备份,我们可以通过快照对我们指定的文件夹设置备份,但是添加快照之后,并不会立即复制所有文件,而是指向同一个文件。当写入发生时,才会产生新文件。快照的管理一般是运维人员来做。
基本用法:
开启快照权限:
1、开启指定目录的快照功能(创建快照之前要执行次步骤)
hdfs dfsadmin -allowSnapshot 路径
2、禁用指定目录的快照功能(默认就是禁用状态)
hdfs dfsadmin -disallowSnapshot 路径
创建快照:
3、给某个路径创建快照snapshot
hdfs dfs -createSnapshot 路径
4、指定快照名称进行创建快照snapshot
hdfs dfs -createSanpshot 路径 名称
5、给快照重新命名
hdfs dfs -renameSnapshot 路径 旧名称 新名称
6、列出当前用户所有可快照目录
hdfs lsSnapshottableDir
7、比较两个快照的目录不同之处
hdfs snapshotDiff 路径1 路径2
8、删除快照snapshot
hdfs dfs -deleteSnapshot
hdfs回收站
任何一个文件系统,基本上都会有垃圾桶机制,也就是删除的文件,不会直接彻底清掉,我们一把都是将文件放置到垃圾桶当中去,过一段时间之后,自动清空垃圾桶当中的文件,这样对于文件的安全删除比较有保证,避免我们一些误操作,导致误删除文件或者数据
回收站配置俩个参数
fs.trash.interval 默认值为0,0表示禁用回收站,不是0就表示启动了回收站,这个代表回收站的文件的存活时间,过了这个时间文件就会被删掉。
fs.trash.checkpoint.interval=0 默认值也为0,表示检查回收站的间隔时间。
要求fs .interval <=fs.trash.interval。
启用回收站
修改core-site.xml
fs.trash.interval
10080
通过javaAPI删除的数据,不会进入回收站,需要调用moveToTrash()方法才会进入回收站
Trash trash = New Trash(conf);
trash.moveToTrash(path);
查看回收站
回收站在集群的/user/hadoop/.Trash/ 这个路径下
回复回收站数据
hdfs dfs -mv trashFileDir hdfsdir
//trashFileDir :回收站的文件路径
//hdfsdir :将文件移动到hdfs的哪个路径下
清空回收站
hdfs dfs -expunge
定义
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
思想
思想核心为分而治之
Map负责分,把复杂任务分解为简单任务(这些任务可以并行计算,彼此之间几乎没有依赖关系)
Reduce负责合,即对map阶段的结果进行全局汇总
编程模型
map阶段:
map阶段有一个关键的map()函数
此函数的输入是键值对
输出是一系列键值对,输出写入本地磁盘
reduce阶段
Map&Reduce
图
mapreduce开发的八个步骤
hadoop没有沿用java当中的基本的数据类型,而是自己进行了一套数据封装,常用的如下:
java中的类型 | hadoop writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
byte[] | BytesWritable |
map:
public class myMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//获得当前行数据
String line=value.toString();
//获得一个个单词
String[] words = line.split(",");
//每个单词编程kv对
for (String word : words) {
//将kv对输出出去
context.write(new Text(word),new IntWritable(1));
}
}
}
reduce:
public class myReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
int count = value.get();
sum+=count;
}
context.write(key,new IntWritable(sum));
}
}
主函数:
public class wordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new wordCount(), args);
System.exit(run);
}
@Override
public int run(String[] args) throws Exception {
Job job=Job.getInstance(super.getConf(),"wordCount");
job.setJarByClass(wordCount.class);
//第一步:读取文件,解析成key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(args[0]));
//第二步:自定义map逻辑,对kv转换成新的kv输出
job.setMapperClass(myMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//第三步:分区:相同的k数据发送到同一个reduce里面去,key合并,value形成一个集合
//第四步:排序 对key2排序,字典顺序排序
//第五步:规约 combiner过程,调优步骤 可选
//第六步:分组
//第七步:自定义reduce逻辑,转换kv
job.setReducerClass(myReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//第八步:输出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path(args[1]));
job.setNumReduceTasks(Integer.parseInt(args[2]));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
jar包运行方式
//hadoop jar jar包 主类位置 输入文件位置(hdfs中) 输出文件位置(hdfs中) 进程数目
hadoop jar MR-1.0-SNAPSHOT.jar com.wordcount.wordCount /a.txt /wordcount02 3
在mapreduce中,每个maptask处理一个切片split
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
分片大小
Math.max(minSize, Math.min(maxSize, blockSize));
如何空值maptask数量
job提交流程源码
waitForCompletion()
submit();
//1建立连接
connect();
//1)创建提交Job的代理
newCluster(getConfiguration());
//(1)判断是本地yarn还是远程
initialize(jobTrackAddr,conf);
//2提交job
submitter.submitJobInternal(Job.this,cluster)
//1)创建给集群提交数据的Stag路径
PathjobStagingArea=
JobSubmissionFiles.getStagingDir(cluster,conf);
//2)获取jobid,并创建Job路径
JobIDjobId=submitClient.getNewJobID();
//3)拷贝jar包到集群
copyAndConfigureFiles(job,submitJobDir);
rUploader.uploadFiles(job,jobSubmitDir);
//4)计算切片,生成切片规划文件
writeSplits(job,submitJobDir);
maps=writeNewSplits(job,jobSubmitDir);
input.getSplits(job);
//5)向Stag路径写XML配置文件
writeConf(conf,submitJobFile);
conf.writeXml(out);
//6)提交Job,返回提交状态
status=submitClient.submitJob(jobId,
submitJobDir.toString(),job.getCredentials());
FileInputFormat切片源码解析
FileInputFormat切片机制
FileInputFormat中默认的切片机制
FileInputFormat切片大小的参数配置
获取切片信息API
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();
FileInputFormat子类
输入类FileInputFormat(切片)及其4个实现类(kv)的用法
分区指的是:将MapReduce统计的结果按照条件输出到不同的文件中。
public int getPartition(K key,V value,int numReduceTasks){
return (key.hashCode()&Intger.MAX_VALUE)%numReduceTasks;
}
**默认分区:**是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
自定义Partitioner步骤:
分区总结:
Writble是hadoop的序列化格式
Writabley有一个子接口是WritableComparable
排序是 MR 中非常重要的操作之一,
MapTask 中,
它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率到一定的阈值,再对缓冲区数据进行一次快排,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
ReduceTask 中,
它从每个 MapTask 上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则储存在内存上。
如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件。
如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。
Combiner
合并也属于Shuffle
机制Combiner
的父类是Reducer
Combiner
是在每一个MapTask
所在的节点运行的Reducer
接收全局所有Mapper
的输出结果Combiner
的意义就是对每一个MapTask
的输出进行局部汇总,主要目的是为了减小网络的传输量Combiner
,一般主要用于求和操作实现Combiner
的步骤就是继承Reducer
,最后在Driver
类通过setCombinerClass(类.class)
设置进去就好了
分组是mapreduce中shuffle组件当中reduce端的一个功能组件,主要的作用是决定哪些数据作 为一组
OutputFormat
可以说是MapReduce
处理过程的最后一步,由它负责把输出的信息输送到哪个文件或者哪个数据库,等等
OutputFormat
是MapReduce
输出的基类,所有实现MapReduce
输出都实现了OutputFormat
接口,下图为OutputFormat
的几个常见实现类(请忽略画横线的,那是我自定义的)
它的默认输出格式为TextOutputFormat
自定义output
FileOutputFormat
,实现RecordWriter
方法,实际上只需要调用RecordWriter
最终返回就可以了RecordWriter
方法,需要实现write
和close
方法压缩的好处和坏处:
压缩方式:
代码中压缩,如链接
修改mapred-site.xml进行压缩
#map数据压缩
<property>
<name>mapreduce.map.output.compressname>
<value>truevalue>
property>
<property>
<name>mapreduce.map.output.compress.codecname>
<value>org.apache.hadoop.io.compress.SnappyCodecvalue>
property>
#reduce端压缩
<property>
<name>mapreduce.output.fileoutputformat.compressname>
<value>truevalue>
property>
<property>
<name>apreduce.output.fileoutputformat.compress.typename>
<value>RECODEvalue>
property>
<property>
<name>apreduce.output.fileoutputformat.compress.codecname>
<value>org.apache.hadoop.io.compress.SnappyCodecvalue>
property>
MapReduce Join应用 | ReduceJoin案例实操
MapReduce Join应用 | MapJoin案例实操
(6条消息) Hadoop MapReduce 内核源码解析 | MapTask与ReduceTask工作机制_lesileqin的博客-CSDN博客