• day01(Flume)


    简介
    一、概述

    1. Flume是Apache提供的一套用于进行日志收集、汇聚和传输的框架
      2.Flume的版本∶(Flume-ng 和Flume-og 不兼容)
      a.Flume1.x Flume-ng
      b. Flume0.X Flume-og

      https://flume.apache.org/
      Flume是一种分布式、可靠和可用的服务,可以高效地收集、聚合和移动大量的日志数据。它具有基于流数据流的简单灵活的体系结构。它具有鲁棒性和容错性,具有可调的可靠性机制和许多故障转移和恢复机制。它使用了一个简单的、可扩展的数据模型,允许在线分析应用程序。

    二、基本概念

    1. Event
      a.在Flume将收集的日志封装成Event对象来进行传输和汇聚
      b.每一条日志对应一个Event
      c. Event的形式就是一个json串,json包含2个部分: headers和body
    2. Agent
      a. Flume是以一个或者多个Agent来构成的
      b.包含3个部分∶
      i. Source 采集数据
      ii. Channel 缓存数据
      iii. Sink 将数据写到目的地

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    flume下载

     http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz 
    
    • 1

    NetCat Source
    一、概述
    1.一个NetCat Source用来监听一个指定端口,并接收监听到的数据
    2.接收的数据是字符串形式

    [root@hadoop01 apache-flume-1.6.0-bin]# mkdir data
    [root@hadoop01 apache-flume-1.6.0-bin]# ls
    bin        conf  DEVNOTES  lib      NOTICE  RELEASE-NOTES
    CHANGELOG  data  docs      LICENSE  README  tools
    [root@hadoop01 apache-flume-1.6.0-bin]# cd data/
    [root@hadoop01 data]# vim basic.txt
    
    
    
    #给Agent起名
    #随意
    #绑定source ,并给source起名
    a1.sources=s1
    #绑定channel 并且给channel起名
    a1.channels=c1
    #绑定sink 并且给sink起名
    a1.sinks = k1
    
    #配置source
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    #配置channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100
    
    #配置sink
    a1.sinks.k1.type=logger
    
    #绑定source和channel
    a1.sources.s1.channels=c1
    #将sinks和channel绑定
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    执行命令

    [root@hadoop01 data]# ../bin/flume-ng agent -n a1 -c ../conf -f basic.txt -Dflume.root.logger=INFO,console
    
    • 1

    退出 CTRL+c

    [root@hadoop01 apache-flume-1.6.0-bin]# nc hadoop01 8090
    
    • 1

    NetCat Source
    一、概述
    1.一个NetCat Source用来监听一个指定端口,并接收监听到的数据
    2接收的数据是字符串形式

    Avro Source
    一、概述
    1.监听Avro端口来接收外部avro客户端的事件流
    2. avro-source接收到的是经过avro序列化后的数据,然后反序列化数据继续传输。
    3.源数据必须是经过avro序列化后的数据
    4.利用Avro source可以实现多级流动、扇出流、扇入流等效果
    5.可以接收通过flume提供的avro客户端发送的日志信息

    二、配置项说明

    配置项

    说明

    channels

    绑定的通道

    type

    exec,avro,netcat

    command

    要执行的命令

    selector.*

    选择器配置

    interceptors.*

    拦截器列表配置

    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8888
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    
    
    [root@hadoop01 home]# vim a.txt
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    执行

    [root@hadoop01 home]# presoftware/apache-flume-1.6.0-bin/bin/flume-ng avro-client -H 0.0.0.0 -p 8888 -c presoftware/apache-flume-1.6.0-bin/conf  -F a.txt
    
    • 1

    在这里插入图片描述

    exec

    [root@hadoop01 home]# cd presoftware/apache-flume-1.6.0-bin/data/
    [root@hadoop01 data]# vim execresource.txt
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=exec
    a1.sources.s1.command=ls /home
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述
    Spooling Directory Source
    一、概述

    1. flume会持续监听指定的目录,把放入这个目录中的文件当做source来处理
      2.注意:一旦文件被放到“自动收集”目录中后,便不能修改,如果修改,flume会报错
      3.此外,也不能有重名的文件,如果有,flume也会报错

      [root@hadoop01 data]# cp execresource.txt spolldirsource.conf
      [root@hadoop01 data]# vi spolldirsource.conf

      a1.sources=s1
      a1.channels=c1
      a1.sinks=k1

      a1.sources.s1.type=spooldir
      a1.sources.s1.spoolDir=/home/flumedir

      a1.channels.c1.type=memory
      a1.channels.c1.capacity=1000
      a1.channels.c1.transactionCapacity=100

      a1.sinks.k1.type=logger

      a1.sources.s1.channels=c1
      a1.sinks.k1.channel=c1

      [root@hadoop01 home]# mv a.txt flumedir/

    在这里插入图片描述
    Squence Generator Source
    一、概述
    1.一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1
    2.主要用来测试

    [root@hadoop01 data]# cp execresource.txt seqsource.conf
    [root@hadoop01 data]# vim seqsource.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=seq
    
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    HTTP Source
    一、概述
    1.此Source接受HTTP的GET和POST请求作为Flume的事件
    I
    2. GET方式只用于试验,所以实际使用过程中以POST请求居多
    3.如果想让flume正确解析Http协议信息,比如解析出请求头、请求体等信息,
    4.这个处理器接受一个HttpServletRequest对象,并返回一个Flume Envent对象集合
    二、常用HandlerJSONHandler
    1.可以处理JSON格式的数据,并支持UTF-8 UTF-16 UTF-32字符集
    2该handler接受Event数组,并根据请求头中指定的编码将其转换为Flume Event
    3.如果没有指定编码,默认编码为UTF-8

    BlobHandlerI

    1. BlobHandler是一种将请求中上传文件信息转化为event的处理器2. BlobHandler适合大文件的传输

      [root@hadoop01 data]# cp execresource.txt httpsource.conf
      [root@hadoop01 data]# vim httpsource.conf

      a1.sources=s1
      a1.channels=c1
      a1.sinks=k1

      a1.sources.s1.type=http
      a1.sources.s1.port=8090

      a1.channels.c1.type=memory
      a1.channels.c1.capacity=1000
      a1.channels.c1.transactionCapacity=100

      a1.sinks.k1.type=logger

      a1.sources.s1.channels=c1
      a1.sinks.k1.channel=c1

    测试(curl测试)

    [root@hadoop01 flumedir]# curl -X POST -d '[{"headers":{"class":"bigdata"},"body":"hello bigdata22"}]' http://0.0.0.0:8090
    
    • 1

    在这里插入图片描述

    [root@hadoop01 flumedir]# cd /home
    [root@hadoop01 home]# vim test.log
    
    [10.36.25.162 2019-08-24 19:58:49]
    info: /a.mp3 get
    [10.36.25.162 2019-08-24 19:58:49]
    info: /bg.css get
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    三、Source
    1 AVRO:接收序列化数据
    2 SpoolDir:监听给一个指定的目录,只要目录下的文件发生变化,会自动收集文件中的内容
    3 HTTP:监听HTTP请求,注意只能接收GET和POST,但是GET请求仅用于测试
    4.自定义Source:
    a. Flume中所有的Source的顶级接口就是Source,但是需要注意的是如果自定义Source实现这个接口,Flume并不认这个实现类
    b.如果需要自定义Source,那么需要实现PollableSource或者是EventDrivenSource

    c. PollableSource:拉取型source,这个Source会主动的访问数据源然后去获取数据,所以在PollableSource中已经定义好了获取数据的线程

    d. EventDrivenSource:事件驱动型source,这个Source是在数据源发生变化的时候才会获取数据,EventDrivenSource不会预定义线程

    //resource字处理案例
    package cn.tedu.flume;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDrivenSource;
    import org.apache.flume.channel.ChannelProcessor;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.EventBuilder;
    //import org.apache.flume.PollableSource;
    //import org.apache.flume.channel.ChannelProcessor;
    //import org.apache.flume.lifecycle.LifecycleState;
    import org.apache.flume.source.AbstractSource;
    
    public class AuthSource  extends AbstractSource 
    			implements EventDrivenSource,Configurable{
    
    	
    	private String path;
    	ExecutorService es;
    	//获取flume配置文件中指定属性的值
    		@Override
    		public void configure(Context context) {
    			
    			//表示获取属性怕path的值
    			 path = context.getString("path");
    			
    			
    		}
    	@Override
    	public void start() {
    		
    		//创建线程池
    		es=Executors.newFixedThreadPool(5);
    		//获取到Channel
    		ChannelProcessor cp=super.getChannelProcessor();
    		
    		es.submit(new ReadLog(path,cp));
    		
    	}
    
    	@Override
    	public void stop() {
    		//关闭线程池
    		es.shutdown();
    	}
    
    	
    }
    
    class ReadLog implements Runnable{
    
    	private BufferedReader reader;
    	private ChannelProcessor cp;
    	public ReadLog(String path,ChannelProcessor cp) {
    		File file=new File(path);
    		if(!file.exists()){
    			//判断路径是否存在
    			throw new NullPointerException();
    		}
    		//判断这是否是一个文件
    		if(file.isDirectory()){
    			throw new IllegalArgumentException();
    		}
    		//约定日志文件的类型
    		if(!path.endsWith(".log")){
    			throw new IllegalArgumentException();
    		}
    		//创建流对象,指向要读取的文件
    		try {
    			 reader=new BufferedReader(new FileReader(file));
    		} catch (FileNotFoundException e) {
    			e.printStackTrace();
    		}
    		
    		this.cp=cp;
    	}
    
    	@Override
    	public void run() {
    		while(true){
    			
    			try {
    				//读取的第一行
    				String line1=reader.readLine();
    				if(line1==null){
    					return;
    				}
    				//读取的第二行
    				String line2=reader.readLine();
    				if(line2==null){
    					return;
    				}
    				// line1 = [10.36.25.162 2019-08-24 19:58:49]
    				// line2 = info: / a.mp3 get
    				
    				//封装header
    				String[] arr = line1.split(" ");
    				Map header=new HashMap<>();
    				header.put("host", arr[0].substring(1));
    				header.put("date", arr[1]);
    				header.put("time", arr[2].substring(0,arr[2].length()-1));
    				
    				
    				//创建Event 对象来封装数据
    				
    				Event e=EventBuilder.withBody(line2.getBytes(),header);
    				
    				//将Event对象放入channel
    				cp.processEvent(e);
    				
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    		
    	}
    	
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129

    在这里插入图片描述

    安装sz 和rz

    [root@hadoop01 presoftware]# yum install -y lrzsz
    
    
    [root@hadoop01 lib]# rz
    [root@hadoop01 lib]# ll authsource.jar 
    -rw-r--r--. 1 root root 3982 7月  16 2022 authsource.jar
    [root@hadoop01 lib]# pwd
    /home/presoftware/apache-flume-1.6.0-bin/lib
    
    
    [root@hadoop01 data]# cp seqsource.conf authsource.conf
    [root@hadoop01 data]# vi authsource.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    #如果是自定义source 需要指定这个source的全路径名
    a1.sources.s1.type=cn.tedu.flume.AuthSource
    a1.sources.s1.path=/home/test.log
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    执行

    [root@hadoop01 data]# ../bin/flume-ng agent -n a1 -c ../conf -f authsource.conf -Dflume.root.logger=INFO,console
    
    • 1

    在这里插入图片描述
    Memory Channel
    一、概述
    1.事件将被存储在内存中(指定大小的队列里)
    2.非常适合那些需要高吞吐量且允许数据丢失的场景下

    配置项

    说明

    type

    mergory

    capacity

    100事件存储在信道中的最大数量建议实际工作调节∶10万首先估算出每个event的大小,然后再服务的内

    transactionCapacity

    100每个事务中的最大事件数建议实际工作调节:1000~3000

    File Channel
    一、概述
    1.将数据临时存储到计算机的磁盘的文件中
    2.性能比较低,但是即使程序出错数据不会丢失

    配置项

    说明

    type

    file

    dataDirs

    指定存放的目录,逗号分隔的目录列表,用以存放日志文件。

    [root@hadoop01 home]# mkdir flumechannel
    [root@hadoop01 home]# cd flumechannel/
    
    
    [root@hadoop01 data]# cp basic.txt filechannel.conf
    [root@hadoop01 data]# vi filechannel.conf 
    
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks = k1
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    a1.channels.c1.type=file
    a1.channels.c1.dataDirs=/home/flumechannel     
    a1.sinks.k1.type=logger
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    
    
    [root@hadoop01 ~]# cd /home/flumechannel/
    [root@hadoop01 flumechannel]# ls
    in_use.lock  log-1  log-1.meta  log-2  log-2.meta
    [root@hadoop01 flumechannel]# ls
    log-1  log-1.meta  log-2  log-2.meta
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    其他Channel

    一、JDBC Channel
    1.事件会被持久化(存储)到可靠的数据库里
    2.目前只支持嵌入式Derby数据库。但是Derby数据库不太好用,所以JDBC Channel目前仅用于测试,不能用于生产环境,

    二、内存溢出通道
    1.优先把Event存到内存中,如果存不下,在溢出到文件中
    2.目前处于测试阶段,还未能用于生产环境

    Logger Sink
    一、概述
    1.记录指定级别(比如INFO,DEBUG,ERROR等)的日志,通常用于调试
    2.要求,在–conf ( -c )参数指定的目录下有log4j的配置文件
    3.根据设计,logger sink将body内容限制为16字节,从而避免屏幕充斥着过多的内容。如果想要查使用file_roll sink,它会将日志写到本地文件系统中

    File_roll Sink
    一、概述
    1.在本地系统中存储事件
    2.每隔指定时长生成文件保存这段时间内收集到的日志信息
    二、可配置选项说明

    配置项

    说明

    channel

    绑定通道

    type

    file_roll

    sink.directory

    文件被存储的目录

    sink.rollInterval

    30记录日志到文件里,每隔30秒生成一个新日志文件。如果设置为0,则禁止滚动,从而导致所有数技

    [root@hadoop01 data]# cp execresource.txt  filerollsink.conf
    [root@hadoop01 data]# vi filerollsink.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=file_roll
    a1.sinks.k1.sink.directory=/home/flumedir
    a1.sinks.k1.sink.rollInterval=3600
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    
    
    [root@hadoop01 home]# nc hadoop01 8090
    hi he he
    OK
    nihao
    OK
    hello world 
    OK
    www.baidu.com
    OK
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    HDFS Sink

    一、概述
    1.此Sink特事件写入到Hadoop分布式文件系统HDFS中
    2目前它支持创建文本文件和序列化文件,并且对这两种格式都支持压缩
    3.这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础
    4.它还通过类似时间戳或机器属性对数据进行buckets/partitions操作
    5. HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名
    6.使用这个Sink要求haddop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信

    二、可配置选项说明配置项

    channel

    绑定的通道

    type

    hdfs

    hdfs.path

    HDFS目录路径( hdfs://namenode/flume/webdata/)

    hdfs.inUseSuffix

    .tmpFlume正在处理的文件所加的后缀

    hdfs.rolInterval

    文件生成的间隔事件,默认是30,单位是秒

    hdfs.rollSize

    生成的文件大小,默认是1024个字节,0表示不开启此项

    hdfs.rollCount

    每写几条数据就生成一个新文件,默认数量为10每写几条数据就生成一个新文件,

    hdfs.fileType

    SequenceFile/DataStream/CompressedStream

    hdfs.retryInterval

    80 Time in seconds between consecutive attempts to close a file. Each close call costsNamenode, so setting this too low can cause a lot of load on the name node.

    修改Hadoop完全分布式文件名,还原伪分布式文件名

    [root@hadoop01 presoftware]# mv hadoop-2.7.1 hadoop-dis
    [root@hadoop01 presoftware]# mv hadoop-standalone/ hadoop-2.7.1
    启动伪分布式
    [root@hadoop01 presoftware]# start-all.sh
    
    
    [root@hadoop01 data]# cp filerollsink.conf hdfssink.conf
    [root@hadoop01 data]# vi hdfssink.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/flume
    a1.sinks.k1.hdfs.rollInterval=3600
    a1.sinks.k1.hdfs.fileType=DataStream
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    执行,nc编辑
    展示
    在这里插入图片描述

    流动模型
    1.单级流动(一个agent)
    2.多级流动
    在这里插入图片描述
    第一个节点

    [root@hadoop01 data]# cp hdfssink.conf  duoji.conf
    [root@hadoop01 data]# vi duoji.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=hadoop02
    a1.sinks.k1.port=8070
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    第二个节点

    [root@hadoop02 apache-flume-1.6.0-bin]# mkdir data
    [root@hadoop02 apache-flume-1.6.0-bin]# vi duoji.conf
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8070
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=hadoop03
    a1.sinks.k1.port=8070
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    第三个节点

    [root@hadoop03 apache-flume-1.6.0-bin]# cd data/
    [root@hadoop03 data]# vi duoji.conf
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8070
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    #a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/flume
    #a1.sinks.k1.hdfs.rollInterval=3600
    #a1.sinks.k1.hdfs.fileType=DataStream
    
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    启动顺序,3-2-1

    [root@hadoop03 data]# ../bin/flume-ng agent -n a1 -c ../conf -f duoji.conf -Dflume.root.logger=INFO,console
    
    • 1

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述
    3.扇入流动

    在这里插入图片描述

    第一个节点

    [root@hadoop01 data]# cp duoji.conf shanru.conf
    [root@hadoop01 data]# vi shanru.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=hadoop03
    a1.sinks.k1.port=8070
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    第二个节点(和第一个一样)

    [root@hadoop02 data]# cp duoji.conf shanru.conf
    [root@hadoop02 data]# vi shanru.conf 
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=hadoop03
    a1.sinks.k1.port=8070
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    第三个节点

    [root@hadoop03 data]# cp duoji.conf shanru.conf
    [root@hadoop03 data]# vi shanru.conf 
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8070
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    启动顺序3-2-1
    在这里插入图片描述

    扇出
    在这里插入图片描述

    第一个节点

    [root@hadoop01 data]# cp shanru.conf shanchu.conf
    [root@hadoop01 data]# vi shanchu.conf 
    
    
    a1.sources=s1
    a1.channels=c1 c2
    a1.sinks=k1 k2
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=1000
    a1.channels.c3.transactionCapacity=100
    
    
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=hadoop02
    a1.sinks.k1.port=8090
    
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname=hadoop03
    a1.sinks.k2.port=8090
    
    a1.sources.s1.channels=c1 c2
    a1.sinks.k1.channel=c1 
    a1.sinks.k2.channel=c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    第二个节点

    [root@hadoop02 data]# cp shanru.conf  shanchu.conf
    [root@hadoop02 data]# vi shanchu.conf 
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    第三个节点

    [root@hadoop03 data]# cp shanru.conf shanchu.conf
    [root@hadoop03 data]# vi shanchu.conf 
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks=k1
    
    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    启动顺序3-2-1
    在这里插入图片描述
    在这里插入图片描述

    复制模式
    一、概述

    1. Selector默认是复制模式treplicating),即把source复制,然后分发给多个节点

    五、Selector

    1. Selector是Source的子组件
      2.如果不指定,Selector默认是复制模式(replicating),这种默认下所有的扇出节点收到的数据是一样的
      3.如果根据header中指定字段进行分发,那么需要将selector改成路由模式(multiplexing) -多路复用模式
      4.无论是复制模式还是路由模式,都是在扇出流动的基础上进行改动

    改动第一个节点,剩下两个节点还是扇出

    [root@hadoop01 data]# cp shanchu.conf  multi.conf
    [root@hadoop01 data]# vim multi.conf 
    
    
    a1.sources=s1
    a1.channels=c1 c2
    a1.sinks=k1 k2
    
    a1.sources.s1.type=http
    a1.sources.s1.port=8090
    a1.sources.s1.selector.type=multiplexing
    a1.sources.s1.selector.header=class
    a1.sources.s1.selector.mapping.big2022=c1
    a1.sources.s1.selector.mapping.big2023=c2
    a1.sources.s1.selector.default=c2
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=1000
    a1.channels.c3.transactionCapacity=100
    
    
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=hadoop02
    a1.sinks.k1.port=8090
    
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname=hadoop03
    a1.sinks.k2.port=8090
    
    a1.sources.s1.channels=c1 c2
    a1.sinks.k1.channel=c1
    a1.sinks.k2.channel=c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    测试数据

    [root@hadoop01 data]# curl -X POST -d '[{"headers":{"class":"big2022"},"body":"hello big2022"}]' http://0.0.0.0:8090
    
    • 1

    六、拦截器(Interceptor)

    1. Interceptor是属于Source的子组件
      2.在Flume中,一个Source可以配置多个Interceptor,需要注意的是配置顺序决定了拦截顺序
    2. timestamp:向数据中添加一个时间戳。在添加的时候,向headers中添加timestamp字段。注意,如果headers已经有timestamp,那么不再添加。结合HDFS Sink可以实现数据进行按天收集的效果

    Timestamp Interceptor
    一、概述
    1.这个拦截器在事件头中插入以毫秒为单位的当前处理时间
    2头的名字为timestamp,值为当前处理的时间戳
    3.如果在之前已经有这个时间戳,则保留原有的时间戳

    [root@hadoop01 data]# cp basic.txt timein.conf
    [root@hadoop01 data]# vi timein.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks = k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    a1.sources.s1.interceptors=i1
    a1.sources.s1.interceptors.i1.type=timestamp
    
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    日志按年月日生成

    [root@hadoop01 data]# cp timein.conf timehdfs.conf
    [root@hadoop01 data]# vi timehdfs.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks = k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    a1.sources.s1.interceptors=i1
    a1.sources.s1.interceptors.i1.type=timestamp
    
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/flume/date=%Y-%m-%d
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.rollInterval=3600
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这里插入图片描述

    1. host:闻headers中添加一个字段host,值是发送数据节点的ip

      [root@hadoop01 data]# cp timein.conf hostin.conf
      [root@hadoop01 data]# vim hostin.conf

      a1.sources=s1
      a1.channels=c1
      a1.sinks = k1

      a1.sources.s1.type=netcat
      a1.sources.s1.bind=0.0.0.0
      a1.sources.s1.port=8090
      a1.sources.s1.interceptors=i1
      a1.sources.s1.interceptors.i1.type=host

      a1.channels.c1.type=memory
      a1.channels.c1.capacity=10000
      a1.channels.c1.transactionCapacity=100

      a1.sinks.k1.type=logger

      a1.sources.s1.channels=c1
      a1.sinks.k1.channel=c1

    在这里插入图片描述

    Static Interceptor
    一、概述
    1.此拦截器允许用户增加静态头信息使用静态的值到所有事件
    2目前的实现中不允许—次指定多个头
    3.如果需要增加多个静态头可以指定多个Static interceptors

    [root@hadoop01 data]# cp hostin.conf staticin.conf
    [root@hadoop01 data]# vim staticin.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks = k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    a1.sources.s1.interceptors=i1
    a1.sources.s1.interceptors.i1.type=static
    a1.sources.s1.interceptors.i1.key=serverkinds
    a1.sources.s1.interceptors.i1.value=video
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述
    UUID Interceptor
    一、概述
    1.这个拦截器在所有事件头中增加一个全局一致性标志,其实就是UUID

    [root@hadoop01 data]# cp hostin.conf uuidin.conf
    [root@hadoop01 data]# vi uuidin.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks = k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    a1.sources.s1.interceptors=i1
    a1.sources.s1.interceptors.i1.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在这里插入图片描述

    Search And Replace Interceptor
    一、概述
    1.这个拦截器提供了简单的基于字符串的正则搜索和替换功能

    [root@hadoop01 data]# cp uuidin.conf searchrepin.conf
    [root@hadoop01 data]# vi searchrepin.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks = k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    a1.sources.s1.interceptors=i1a1.sources.s1.interceptors.i1.type=search_replace
    a1.sources.s1.interceptors.i1.searchPattern=[0-9]
    a1.sources.s1.interceptors.i1.replaceString=*
    
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    Regex Filtering Interceptor

    一、概述
    1.此拦截器通过解析事件体去匹配给定正则表达式来筛选事件
    2.所提供的正则表达式即可以用来包含或刨除事件

    [root@hadoop01 data]# cp searchrepin.conf regexfilterin.conf
    [root@hadoop01 data]# vi regexfilterin.conf 
    
    
    a1.sources=s1
    a1.channels=c1
    a1.sinks = k1
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    a1.sources.s1.interceptors=i1
    a1.sources.s1.interceptors.i1.type=regex_filter
    a1.sources.s1.interceptors.i1.regex=.*[0-9].*
    a1.sources.s1.interceptors.i1.excludeEvents=true
    
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=100
    
    a1.sinks.k1.type=logger
    
    a1.sources.s1.channels=c1
    a1.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    在这里插入图片描述

    Precess概述

    1. Sink Group允许用户将多个Sink组合成一个实体
    2. Flume Sink Processor可以通过切换组内Sink用来实现负载均衡的效果,或在一个Sink故障时切换到另一个Sink

    Failover Sink Processor
    个人总结:1. Failover Sink Processor:需要这是节点的优先级,数据会先发送给高优先级的节点,当高优先级的节点宕机之后才会发送给低优先级的节点

    详细概述
    1.维护一个sink们的优先表。确保只要一个是可用的就事件就可以被处理
    2.失败处理原理是,为失效的sink指定一个冷却时间,在冷却时间到达后再重新使用
    3. sink们可以被配置—个优先级,数字越大优先级越高
    4.如果sink发送事件失败,则下一个最高优先级的sink将会尝试接着发送事件
    5.如果没有指定优先级,则优先级顺序取决于sink们的配置顺序,先配置的默认优先级高于后配置的
    6.在配置的过程中,设置一个group processor,并且为每个sink都指定一个优先级
    7.优先级必须是唯一的
    8.另外可以设置maxpenalty属性指定限定失败时间

    在这里插入图片描述
    改动第一个节点

    [root@hadoop01 data]# cp shanchu.conf filover.conf
    [root@hadoop01 data]# vi filover.conf 
    
    
    a1.sources=s1
    a1.channels=c1 c2
    a1.sinks=k1 k2
    #给sink组起名
    a1.sinkgroups=g1
    #将多个sink绑定在一个组中
    a1.sinkgroups.g1.sinks=k1 k2
    #设置group的类型
    a1.sinkgroups.g1.processor.type=failover
    #给sink指定优先级
    a1.sinkgroups.g1.processor.priority.k1=1
    a1.sinkgroups.g1.processor.priority.k2=4
    
    a1.sources.s1.type=netcat
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=8090
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=1000
    a1.channels.c3.transactionCapacity=100
    
    
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=hadoop02
    a1.sinks.k1.port=8090
    
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname=hadoop03
    a1.sinks.k2.port=8090
    
    a1.sources.s1.channels=c1 c2
    a1.sinks.k1.channel=c1
    a1.sinks.k2.channel=c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    第三个节点效果
    在这里插入图片描述

    停掉第三个节点,第二个节点效果在这里插入图片描述
    Load Balancing Sink Processor

    一、概述
    1.提供了在多个sink之间实现负载均衡的能力
    2.它维护了一个活动sink的索引列表
    3.它支持轮询或随机方式的负载均衡,默认值是轮询方式,可以通过配置指定
    4.也可以通过实现AbstractSinkSelector接口实现自定义的选择机制

    二、可配置项说明

    配置项

    说明

    processor.sinks

    绑定的sink

    processor.type

    load_balance

    processor.selector

    round_robin(轮叫调度) random (随机)

    八、Flume中的事务

    在这里插入图片描述

    在这里插入图片描述

  • 相关阅读:
    【python基础】变量
    python pandas query用法
    S-Clustr(影子集群)新增Nets3e插件,实现一对多主机拍照
    20.0、C语言——字符串函数的使用和剖析
    学习JAVA的第六天(基础)
    数字化转型导师坚鹏:数字化时代银行网点厅堂营销5大特点分析
    git 把master分支代码合并到自己分支(常用git 命令)
    用这个免费CDN,治愈WordPress网站加载缓慢的大难题
    cmip6数据如何下载?您是利用官方网站手动人工下载还是半自动购物车方式?
    C++模板进阶:SFINAE
  • 原文地址:https://blog.csdn.net/m0_67402013/article/details/126327055