getBackOffSleepIncrement()//暂不用
getMaxBackOffSleepInterval()//暂不用
configure(Context context)//初始化context,即读取配置文件中的信息,每个信息对应一个配置项
process()//获取数据,封装成Event并写入Channel,这个方法被循环调用
stop()//关闭相关的资源
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
/**
* @program: Flume-MyISS
* @description:
* @author: 作者
* @create: 2022-06-26 21:21
*/
public class MySource extends AbstractSource implements Configurable, PollableSource {
//前缀,后缀
private String prefix;
private String subfix;
private Long delay;
@Override//初始化配置信息,获取.conf中的配置
public void configure(Context context) {
prefix = context.getString("pre", "pre-");//第二个参数是给出的默认值
subfix = context.getString("sub");//未给出则没有默认值,即配置文件中未配置时默认没有
delay = context.getLong("delay", 2000L);//延迟时间
}
@Override
public Status process() throws EventDeliveryException {
Event event = new SimpleEvent();
Map<String, String> headers = new HashMap<>();
try {//ctrl+alt+t
//循环创建Event,输出给Channel
for (int i = 0; i < 5; i++) {
event.setHeaders(headers);
event.setBody((prefix + "hello" + subfix).getBytes());
getChannelProcessor().processEvent(event);
/**
* source提交event会先经过拦截器--->读源码processEvent
* event = this.interceptorChain.intercept(event);//经过拦截器拦截
* if (event != null) {//如果拦截器返回的不是null才会进入if,继续执行
* List<Channel> requiredChannels = this.selector.getRequiredChannels(event);//selector是一个channel选择器,返回list是因为可能需要传给多个channel
* Iterator var3 = requiredChannels.iterator();//需要遍历channel
* while(var3.hasNext()) {
* Channel reqChannel = (Channel)var3.next();
* Transaction tx = reqChannel.getTransaction();//获取事务
* Preconditions.checkNotNull(tx, "Transaction object must not be null");
* try {
* tx.begin();//开启事务
* reqChannel.put(event);
* tx.commit();//提交事务
* } catch (Throwable var17) {
* tx.rollback();//发生异常则会回滚
* ......
* }....
*/
}
Thread.sleep(delay);//一旦得到返回值是READY,又立马调用process函数
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
vim mysource-flume-logger.conf
---------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.source.MySource
a1.sources.r1.delay = 4000
#a1.sources.r1.pre = pre-pre
#a1.sources.r1.sub = sub
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent --conf conf --conf-file jobs/t7/mysource-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务
批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务
事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件
configure(Context context)//初始化context,即读取配置文件中的信息,每个信息对应一个配置项
process()//获取数据,封装成Event并写入Channel,这个方法被循环调用
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @program: Flume-MyISS
* @description:
* @author: 作者
* @create: 2022-06-26 22:39
*/
public class MySink extends AbstractSink implements Configurable {
//创建 Logger 对象
private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
private String prefix;
private String subfix;
@Override
public void configure(Context context) {
//读取配置文件内容,有默认值
prefix = context.getString("pre", "pre-hello:");
//读取配置文件内容,无默认值
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
//1.获取channel,开启事务
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
//2.从channel中抓去数据,打印到控制台
try {
//2.1抓取数据
Event event;
while (true) {//抓取不到数据就一直循环的抓取
event = channel.take();
if (event != null) {
break;
}
}
//2.2处理数据
LOG.info(new String(prefix + event.getBody() + subfix));
//2.3提交事务
transaction.commit();
return Status.READY;
} catch (Exception e) {
//回滚
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
vim netcat-flume-mysink.conf
------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.yc.sink.MySink
#a1.sinks.k1.pre = pre-pre
#a1.sinks.k1.sub = sub
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent --conf conf --conf-file jobs/t8/netcat-flume-mysink.conf --name a1 -Dflume.root.logger==INFO,console
telnet localhost 44444