数据采集一般指的是将数据采集到大数据环境下进行持久化、海量化的保存,目的主要是为了我们后期的大数据处理(数据统计分析、数据挖掘等等)沉底数据基础。
不同的来源的数据我们一般有不同的数据采集方式
1、数据来源于我们的RDBMS关系型数据库:Sqoop数据迁移工具实现数据的采集
2、数据来源于我们系统运行产生的日志文件:日志文件记录的数据量特别庞大,但是日志文件不属于大数据存储系统中东西,因此日志文件记录不了海量的数据,日志文件都会有一个定期清理规则。采集日志文件数据到大数据环境中。
一般采集日志文件数据到大数据环境使用的就是Flume技术
3、数据来源于其他网站:开发一个电影网站,电影网站应该具备哪些功能,哪些类型的电影能受用户的欢迎。分析竞品数据,这种情况竟品数据都是人家别人家网站的数据,但是我们需要分析,但是人家不给你数据,通过爬虫获取数据(一不留神就犯法)。
4、数据来源于各种传感器设备:不需要我们管
5、第三方提供、购买的第三方数据、开源数据集平台提供的(阿里云的天池数据集、kaggle数据集平台、飞浆数据集平台、各个地区的政府公开数据集平台)
sqoop技术:采集RDBMS的数据到大数据环境中
Flume技术:采集系统/网站产生的日志文件数据、端口数据等等到大数据环境中
爬虫技术:采集第三方的数据,爬虫一般是把采集的数据放到一个文件或者RDBMS数据库当中
爬虫技术就是通过读取网页/网站的界面结构,获取网页中嵌套的数据
爬虫目前主要有两种类型的爬虫
Flume也是Apache开源的顶尖项目,专门用来采集海量的日志数据到指定的目的地。
Flume采集数据采用一种流式架构思想,只要数据源有数据,就可以源源不断的采集数据源的数据到目的地
Flume的组成架构
Flume的采集数据的工作流程
Flume安装部署:三部曲
1、上传解压
2、配置环境变量
export FLUME_HOME=/opt/app/flume-1.11.0
export PATH=$PATH:$FLUME_HOME/bin
3、修改配置文件
Flume支持多种数据源、管道、目的地,我们采集数据的时候,并不是所有的数据源和目的地都要使用,而是使用我们需要的源头和目的地。但是Flume不知道你需要什么数据源、需要什么目的地。
通过脚本文件指定我们采集的数据源、目的地、管道
脚本文件主要由五部分组成:
1、分析案例的组件类型
2、编写脚本文件portToConsole.conf
# 1、配置agent、source、channel、sink的别名
demo.sources=s1
demo.channels=c1
demo.sinks=k1
# 2、配置source组件连接的数据源--不同数据源的配置项都不一样 监听netcat type bind port
demo.sources.s1.type=netcat
demo.sources.s1.bind=localhost
demo.sources.s1.port=44444
# 3、配置channel组件的类型--不同类型的管道配置项也不一样 基于内存memory的管道
demo.channels.c1.type=memory
demo.channels.c1.capacity=1000
demo.channels.c1.transactionCapacity=200
# 4、配置sink组件连接的目的地--不同类型的sink配置项不一样 基于logger的下沉地
demo.sinks.k1.type=logger
# 5、配置source channel sink之间的连接 source 连接channel sink也要连接channel
# 一个source的数据可以发送给多个channel 一个sink只能拉去一个channel的数据
demo.sources.s1.channels=c1
demo.sinks.k1.channel=c1
3、根据脚本文件启动Flume采集程序
4、测试
1、案例需求
2、案例分析
3、编写脚本文件
# 1、起别名
demo01.sources=s1
demo01.channels=c1
demo01.sinks=k1
# 2、定义数据源 exec linux命令 监听一个文件 tail -f|-F 文件路径
demo01.sources.s1.type=exec
demo01.sources.s1.command=tail -F /root/a.log
# 3、定义管道
demo01.channels.c1.type=memory
demo01.channels.c1.capacity=1000
demo01.channels.c1.transactionCapacity=200
# 4、配置sink目的地 logger
demo01.sinks.k1.type=logger
# 5、关联组件
demo01.sources.s1.channels=c1
demo01.sinks.k1.channel=c1
4、启动
flume-ng agent -n demo01 -f /root/fileToConsole.cong -Dflume.root.logger=INFO,console
echo "zs" >> a.log
5、测试
1、案例需求
2、案例分析
3、编写配置文件
# 1、起别名
demo01.sources=s1
demo01.channels=c1
demo01.sinks=k1
# 2、定义数据源 Spooling Directory Source
demo01.sources.s1.type=spooldir
demo01.sources.s1.spoolDir=/root/demo
# 3、定义管道
demo01.channels.c1.type=memory
demo01.channels.c1.capacity=1000
demo01.channels.c1.transactionCapacity=200
# 4、配置sink目的地 logger
demo01.sinks.k1.type=logger
# 5、关联组件
demo01.sources.s1.channels=c1
demo01.sinks.k1.channel=c1
4、运行
5、测试
1、案例需求
2、案例分析
3、编写脚本文件
# 1、配置agent、source、channel、sink的别名
demo.sources=s1
demo.channels=c1
demo.sinks=k1
# 2、配置source组件连接的数据源--不同数据源的配置项都不一样 监听netcat type bind port
demo.sources.s1.type=netcat
demo.sources.s1.bind=localhost
demo.sources.s1.port=44444
# 3、配置channel组件的类型--不同类型的管道配置项也不一样 基于内存memory的管道
demo.channels.c1.type=memory
demo.channels.c1.capacity=1000
demo.channels.c1.transactionCapacity=200
# 4、配置sink组件连接的目的地--基于HDFS的
demo.sinks.k1.type=hdfs
# 配置采集到HDFS上的目录 数据在目录下以文件的形式进行存放 文件的格式 FlumeData.时间戳
demo.sinks.k1.hdfs.path=hdfs://single:9000/flume
# 目录下生成的文件的前缀 如果没有配置 默认就是FlumeData
demo.sinks.k1.hdfs.filePrefix=collect
# 指定生成的文件的后缀 默认是没有后缀的 生成的文件的格式collect.时间戳.txt
demo.sinks.k1.hdfs.fileSuffix=txt
# 目录下采集的数据并不是记录到一个文件中,文件是会滚动生成新的文件的
# 滚动的规则有三种:1、基于时间 2、基于文件的容量滚动 3、基于文件的记录的event数量进行滚动
# 默认值: 时间30s 容量 1024b event 10
# 时间滚动规则 如果值设置为0 那么就代表不基于时间生成新的文件
demo.sinks.k1.hdfs.rollInterval=60
# 文件容量的滚动规则 单位b 如果设置为0 代表不基于容量滚动生成新的文件
demo.sinks.k1.hdfs.rollSize=100
# event数量的滚动规则 一般设置为0 代表不急于event数量滚动生成新的文件
demo.sinks.k1.hdfs.rollCount=0
# 文件在HDFS上的默认的存储格式是SequenceFile文件格式
demo.sinks.k1.hdfs.fileType=DataStream
# 设置event的头部使用本地时间戳作为header
demo.sinks.k1.hdfs.useLocalTimeStamp=true
# 5、配置source channel sink之间的连接 source 连接channel sink也要连接channel
# 一个source的数据可以发送给多个channel 一个sink只能拉去一个channel的数据
demo.sources.s1.channels=c1
demo.sinks.k1.channel=c1
4、启动采集进程(必须先启动HDFS)
【注意】flume的依赖的guava和hadoop的guava有冲突,需要将flume的lib目录下的guava依赖删除,同时将hadoop的share/common/lib/guava依赖复制到flume的lib目录下
1、案例需求
2、案例分析
3、编写脚本文件
# 1、起别名 三个数据源 两个管道 两个sink
more.sources=s1 s2 s3
more.channels=c1 c2
more.sinks=k1 k2
# 2、定义数据源 三个
# 定义s1数据源 s1连接的是网络端口
more.sources.s1.type=netcat
more.sources.s1.bind=localhost
more.sources.s1.port=44444
# 定义s2的数据源 s2连接的是文件 /root/more.log文件
more.sources.s2.type=exec
more.sources.s2.command=tail -F /root/more.log
# 定义s3的数据源 s3监控的是一个文件夹 /root/more
more.sources.s3.type=spooldir
more.sources.s3.spoolDir=/root/more
# 3、定义channel管道 两个 基于内存的
# 定义c1管道 c1管道需要接受三个数据源的数据
more.channels.c1.type=memory
more.channels.c1.capacity=20000
more.channels.c1.transactionCapacity=5000
# 定义c2管道 c2管道只需要接收一个数据源 s2的数据
more.channels.c2.type=memory
more.channels.c2.capacity=5000
more.channels.c2.transactionCapacity=500
# 4、定义sink 两个 HDFS logger
# 定义k1这个sink 基于hdfs
more.sinks.k1.type=hdfs
# hdfs支持生成动态目录--基于时间的 /more/2023-08-25
more.sinks.k1.hdfs.path=hdfs://single:9000/more/%Y-%m-%d
# 如果设置了动态目录,那么必须指定动态目录的滚动规则-多长时间生成一个新的目录
more.sinks.k1.hdfs.round=true
more.sinks.k1.hdfs.roundValue=24
more.sinks.k1.hdfs.roundUnit=hour
more.sinks.k1.hdfs.filePrefix=collect
more.sinks.k1.hdfs.fileSuffix=.txt
more.sinks.k1.hdfs.rollInterval=0
more.sinks.k1.hdfs.rollSize=134217728
more.sinks.k1.hdfs.rollCount=0
more.sinks.k1.hdfs.fileType=DataStream
more.sinks.k1.hdfs.useLocalTimeStamp=true
# 定义k2 logger
more.sinks.k2.type=logger
# 5、组合agent的组件
more.sources.s1.channels=c1
more.sources.s2.channels=c1 c2
more.sources.s3.channels=c1
more.sinks.k1.channel=c1
more.sinks.k2.channel=c2
1、案例需求
2、案例分析
3、编写脚本文件
第一个脚本监听端口到avro的
first.sources=s1
first.channels=c1
first.sinks=k1
first.sources.s1.type=netcat
first.sources.s1.bind=localhost
first.sources.s1.port=44444
first.channels.c1.type=memory
first.channels.c1.capacity=1000
first.channels.c1.transactionCapacity=500
first.sinks.k1.type=avro
first.sinks.k1.hostname=localhost
first.sinks.k1.port=60000
first.sources.s1.channels=c1
first.sinks.k1.channel=c1
第二脚本文件监听文件数据到avro的
second.sources=s1
second.channels=c1
second.sinks=k1
second.sources.s1.type=exec
second.sources.s1.command=tail -F /root/second.txt
second.channels.c1.type=memory
second.channels.c1.capacity=1000
second.channels.c1.transactionCapacity=500
second.sinks.k1.type=avro
second.sinks.k1.hostname=localhost
second.sinks.k1.port=60000
second.sources.s1.channels=c1
second.sinks.k1.channel=c1
第三个脚本文件监听avro汇总的数据到logger的
third.sources=s1
third.channels=c1
third.sinks=k1
# avro类型当作source 需要bind和port参数 如果当作sink使用 需要hostname port
third.sources.s1.type=avro
third.sources.s1.bind=localhost
third.sources.s1.port=60000
third.channels.c1.type=memory
third.channels.c1.capacity=1000
third.channels.c1.transactionCapacity=500
third.sinks.k1.type=logger
third.sources.s1.channels=c1
third.sinks.k1.channel=c1
4、启动脚本程序
先启动第三个脚本,再启动第一个和第二脚本
flume-ng agent -n third -f /root/third.conf -Dflume.root.logger=INFO,console
flume-ng agent -n first -f /root/first.conf -Dflume.root.logger=INFO,console
flume-ng agent -n second -f /root/second.conf -Dflume.root.logger=INFO,console