按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。
选择TailDirSource和KafkaChannel的原因如下:
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
采用Kafka Channel,省去了Sink,提高了效率。
日志采集Flume关键配置如下:
[summer@hadoop102 flume-1.9.0]$ mkdir job
[summer@hadoop102 flume-1.9.0]$ cd job/
[summer@hadoop102 job]$ vim file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.summer.gmall.flume.interceptor.ETLInterceptor$Builder
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#组装
a1.sources.r1.channels = c1
这个是将拦截器加上了,类名和Builder要用$连接
如果没有ETL清洗,则数据有好多残缺,但是只能是轻度清洗,过于重的话则数据会堵塞到管道里,导致数据传输变慢
<dependencies>
<dependency>
<groupId>org.apache.flumegroupId>
<artifactId>flume-ng-coreartifactId>
<version>1.9.0version>
<scope>providedscope>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.62version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-pluginartifactId>
<version>2.3.2version>
<configuration>
<source>1.8source>
<target>1.8target>
configuration>
plugin>
<plugin>
<artifactId>maven-assembly-pluginartifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependenciesdescriptorRef>
descriptorRefs>
configuration>
<executions>
<execution>
<id>make-assemblyid>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
package com.summer.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
/**
* @author Redamancy
* @create 2022-10-25 16:38
*/
public class JSONUtil {
public static boolean isJSONValidata(String log) {
//1.JSONObject.parseObject(log);判断json,如果是json,则返回JSON的值,如果不是JSON则会报错。
// 因此使用try,catch来捕捉,如果报错,则该log不是我们想要的JSON格式,就返回false,正确的话就返回true
try {
JSONObject.parseObject(log);
return true;
} catch (Exception e) {
return false;
}
}
}
package com.summer.gmall.flume.interceptor;
import com.summer.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
/**
* @author Redamancy
* @create 2022-10-25 16:38
*/
public class ETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//1.获取body当中的数据并转成字符串
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
//2.判断字符串是否是一个合法的json,是:返回当前的event;不是:返回null
if(JSONUtil.isJSONValidata(log)){
return event;
}else{
return null;
}
}
@Override
public List<Event> intercept(List<Event> list) {
//不能使用for循环来删,因为在list里面删除一个数据,则后面的数据会补上来,index会减少,使用使用这个来remove数据是不可行的,到后面会报错。
//可以使用迭代器来删数据
/*
for (int i = 0; i < list.size(); i++) {
Event event = list.get(i);
if(intercept(event) == null){
list.remove(i);
}
}*/
Iterator<Event> iterator = list.iterator();
while(iterator.hasNext()){
Event next = iterator.next();
if(intercept(next) == null) {
iterator.remove();
}
}
return list;
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
@Override
public void close() {
}
}
先点击clean,然后点击package。
将带依赖的放到/opt/module/flume/lib文件夹下面