Flume是 Cloucera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flune基于流式架构。灵活简单
Console、RPC、Text、Tail、Syslog、Exec等
磁盘,hdfs,hbase, 经过网络传输kafka
data->flume->Kafka->spark streaming/ storm / flink -> HBase, MySQL
Flume中最核心的角色是agent,flume采集系统就是由一个个agent连接起来所形成的一个或简单或复杂的数据传输通道。
对于每一个Agent来说,它就是一个独立的守护进程(JVM),它负责从数据源接收数据,并发往下一个目的地,如下图所示:

每一个agent相当于一个数据(被封装成Event对象)传递员,内部有3个核心组件:
Source: 采集组件,用于跟数据源对接,以获取数据;它有各种各样的内置实现;
Sink: 下沉组件,用于往下一级agent传递数据或者向最终存储系统传递数据
Channel: 传输通道组件,用于从source将数据传递到sink
当数据传输完成之后,该event才从通道中进行移除–(可靠性)
两部分组成:Event是由一个转载数据的字节数组+一个可选头部构成
Event 由零个或者多个header和正文body组成
Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。
Body是一个字节数组,包含了实际的内容

Flume内部有一个或者多个Agent
每一个Agent是一个独立的守护进程(JVM)
从客户端哪儿接收收集,或者从其他的Agent哪儿接收,然后迅速的将获取的数据传给下一个目的节点Agent
Agent主要由source、channel、sink三个组件组成。

一个Flume源
负责一个外部源(数据发生器),如一个web服务器传递给他的事件
该外部源将它的事件以Flume可以识别的格式发送到Flume中
当一个Flume源接收到一个事件时,其将通过一个或者多个通道存储该事件
通道:采用被动存储的形式,即通道会缓存该事件直到该事件被sink组件处理
所以Channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接
可以通过参数设置event的最大个数
Flume通常选择FileChannel,而不使用Memory Channel
Memory Channel: 内存存储事务,吞吐率极高,但存在丢数据风险
File Channel: 本地磁盘的事务实现模式,保证数据不会丢失(WAL实现)write ahead log ( 将日志预写先写到磁盘)
例如:通过Flume HDFS Sink将数据放置到HDFS中,或者放置到下一个Flume的Source,等到下一个Flume处理。
对于缓存在通道中的事件,Source和Sink采用异步处理的方式
Sink成功取出Event后,将Event从Channel中移除
Sink必须作用于一个确切的Channel
不同类型的Sink:
存储Event到最终目的的终端:HDFS、Hbase
自动消耗:Null Sink
用于Agent之间通信:Avro

flume中的事务和关系型数据库中的事务是不同的,只是名称一样!
Flume中的事务代表的是一批要以原子性写入到channel或要从channel中移除的events
put事务指source将封装好的event,交给ChannelProcessor,ChannelProcessor在处理这批events时,
doPut: 将批处理数据先写入到临时缓冲区putList
doCommit : 检查channle内存队列是否足够合并
doRollback : channle内存队列空间不足,回滚数据
先把events放入到putList中,放入完成后,一次性commit(),这批events就可以成功写入到channel!
写入成功后,执行清空putList操作!
如果在过程中,发生任何异常,此时执行rollback(),rollback()会回滚putList,回滚也会直接清空putList!
take事务指sink不断从channel中获取event!,每获取一批event中的一个,都会将这个event放入takeList中!
doTake : 将数据取到临时缓冲区takeList.并将数据发送到HDFS
doCommit : 如果数据全部发送成功,则清除临时缓冲区takeList
doRollback :数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channle内存队列
一般一批evnet全部写入,执行commit()方法,这个方法会清空takeList!如果在此期间,发生了异常,执行rollback(),此时会回滚takeList中的这批event到channel!
假如一个事务中,一部分event已经写入到目的地,但是随着事务的回滚,这些event可能重复写入!
如果一个source对接多个channel , 可能出现,一批数据 某些channelput完了,但是另一些失败了!
此时channel Processor会重试对此批数据的put,包括之前已经写成功的channel!此时,之前写成功的channel,就会出现重复!
//每个source和sink都可以配置batchSize,batchSize代表一批数据的数量。
batchSize:
//putList和takeList的大小!(在channel的参数中配置)
transcationCapacity:
//指channel中存储event的容量!
capacity:
batchSize <= transcationCapacity <= capacity
at least once : 至少一次,在异常时有可能出现数据重复!
at most once : 最多一次,在异常时有可能出现数据丢失!
exactly once : 精准一次,不管是什么情况,数据只精准一次,不会重复,也不会丢失!
https://archive.apache.org/dist/flume/
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent.sources = s1
agent.channels = c1
agent.sinks = r1
# 设置 source从指定文件夹读取文件
agent.sources.s1.type =spooldir
agent.sources.s1.spoolDir = 指定文件夹
agent.sources.s1.channels = c1
agent.sources.s1.deserializer.maxLineLength = 8192
# 设置 sink 下游为kafuka ,并设置kafuka 服务ip和端口
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.r1.kafka.bootstrap.servers = IP:port
agent.sinks.r1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.r1.kafka.topic = kafka的topic名称
agent.sinks.r1.request.required.acks=0
agent.sinks.r1.max.message.size=1000
agent.sinks.r1.channel = c1
# Each channel's type is defined.
# 设置渠道类别为内存
agent.channels.c1.type = memory
# 通道中存储的event事件的最大数量
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 10000
nohup /data/software/flume/bin/flume-ng agent --conf conf --conf-file 启动的配置文件 --name agent -Dflume.root.logger=INFO,console