• 数仓日记 - 数据采集平台


    世间万事,风云变幻,苍黄翻覆。纵使波谲云诡,但制心一处,便无事不办👨‍💻

    目录

    一、埋点数据生成模块
    1. 事件日志格式及字段含义
    2. 启动日志格式及字段含义
    3. 说明

    二、采集平台准备
    1. 框架版本选型
    2. 集群部署规划
    3. 数据流程图

    三、用户行为数据采集模块
    1. 环境准备
    2. JDK安装
    3. Hadoop安装
       • 项目经验之HDFS存储多目录
       • 项目经验之支持LZO压缩配置
       • 项目经验之LZO创建索引
       • 项目经验之Hadoop基准测试
       • 项目经验之Hadoop参数调优
    4. Zookeeper安装
    5. 日志生成
    6. 采集日志Flume
    7. kafka安装
       • 项目经验之Kafka压力测试
       • 项目经验之Kafka机器数量计算
    8. Flume消费Kafka数据到HDFS
       • 项目经验之Flume组件详解
       • 项目经验之Flume内存优化

    9. 采集通道启动/停止脚本

    四、业务数据采集模块
    1. MySQL安装
    2. Sqoop安装
    3. 业务数据生成
    4. 业务数据导入HDFS
       • 项目经验
    5. Hive安装部署

    一、埋点数据生成模块

    1.事件日志格式:
    1667544719686 | {
    	"cm": {		//公共字段
    		"ln": "-35.5",	// (double) lng经度
    		"sv": "V2.3.0",	// (String) sdkVersion sdk版本
    		"os": "8.2.6",	// (String) Android系统版本
    		"g": "J4025Y72@gmail.com",	// (String) gmail
    		"mid": "994",	// (String) 设备唯一标识
    		"nw": "3G",	// (String) 网络模式
    		"l": "pt",	// (String) language系统语言
    		"vc": "13",	// (String) versionCode,程序版本号
    		"hw": "640*1136",	// (String) heightXwidth,屏幕宽高
    		"ar": "MX",	// (String) area区域
    		"uid": "994",	// (String) 用户标识
    		"t": "1667508769684",	// (String) 客户端日志产生时的时间
    		"la": "-34.3",	// (double) lat 纬度
    		"md": "sumsung-15",	// (String) model手机型号
    		"vn": "1.0.1",	// (String) versionName,程序版本名
    		"ba": "Sumsung",	// (String) brand手机品牌
    		"sr": "G"	// (String) 渠道号,应用从哪个渠道来的。
    	},
    	"ap": "app",	//项目数据来源 app pc
    	"et": [{	//事件
    		"ett": "1667527012297",	//客户端事件产生时间
    		"en": "ad",	//事件名称
    		"kv": {	//事件结果,以key-value形式自行定义
    			"activityId": "1",
    			"displayMills": "96469",
    			"entry": "2",
    			"action": "1",
    			"contentType": "0"
    		}
    	}, {
    		"ett": "1667504023634",
    		"en": "notification",
    		"kv": {
    			"ap_time": "1667542746000",
    			"action": "2",
    			"type": "3",
    			"content": ""
    		}
    	}, {
    		"ett": "1667514981776",
    		"en": "active_background",
    		"kv": {
    			"active_source": "3"
    		}
    	}, {
    		"ett": "1667500071675",
    		"en": "error",
    		"kv": {
    			//errorDetail	错误详情
    			"errorDetail": "java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound",
    			//errorBrief	错误摘要
    			"errorBrief": "at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"
    		}
    	}, {
    		"ett": "1667515331033",
    		"en": "favorites",
    		"kv": {
    			"course_id": 2,
    			"id": 0,
    			"add_time": "1667486897821",
    			"userid": 4
    		}
    	}]
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    事件类型:商品列表页(loading)、商品点击(display)、商品详情页(newsdetail)、广告(ad)、消息通知(notification)、用户后台活跃(active_background)、评论(comment)、收藏(favorites)、点赞(praise)、错误(error)

    2.启动日志格式:
    {
    	"action": "1",	//状态:成功=1  失败=2
    	"ar": "MX",
    	"ba": "Sumsung",
    	"detail": "",	//失败码(没有则上报空)
    	"en": "start",	//日志类型start
    	"entry": "3",	//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5
    	"extend1": "",	//失败的message(没有则上报空)
    	"g": "H488C631@gmail.com",
    	"hw": "640*960",
    	"l": "es",
    	"la": "-4.7",
    	"ln": "-45.0",
    	"loading_time": "16",	//加载时长:计算下拉开始到接口返回数据的时间,(开始加载报0,加载成功或加载失败才上报时间)
    	"md": "sumsung-13",
    	"mid": "995",
    	"nw": "3G",
    	"open_ad_type": "2",	//开屏广告类型:  开屏原生广告=1, 开屏插屏广告=2
    	"os": "8.1.7",
    	"sr": "M",
    	"sv": "V2.6.4",
    	"t": "1667455282969",
    	"uid": "995",
    	"vc": "18",
    	"vn": "1.0.2"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    3.说明:
    用Java生成上述格式的日志,并存储在 /tmp/logs/目录下,日志文件名为: app-年-月-日.log,单个日志文件最大大小为10MB,日志默认保留30天,30天后自动删除。

    程序已打包,上传到资源。
    logcollector-1.0-SNAPSHOT.jar
    logcollector-1.0-SNAPSHOT-jar-with-dependencies.jar

    二、采集平台准备

    1. 框架版本选型

    在这里插入图片描述

    2. 集群部署规划

    请添加图片描述

    3. 数据流程图

    请添加图片描述

    三、用户行为数据采集模块

    1. 环境准备

    1. 安装必要环境

      sudo yum install -y epel-release
      sudo yum install -y psmisc nc net-tools rsync vim lrzsz ntp libzstd openssl-static tree iotop git
      
      • 1
      • 2
    2. 修改静态IP

      sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33
      
      将BOOTPROTO修改为static
      BOOTPROTO=static
      最后一行ONBOOT改为yes
      ONBOOT=yes
      添加如下内容:
      IPADDR=填IP地址
      NETMASK=子网掩码
      GATEWAY=网关IP
      DNS1=8.8.8.8
      DNS2=8.8.4.4
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    3. 修改主机名及映射

      修改主机名:将文件内容修改为主机名
      sudo vim /etc/hostname
      
      添加映射:
      sudo vim /etc/hosts
      添加如下内容:
      192.168.176.101 hadoop101
      192.168.176.102 hadoop102
      192.168.176.103 hadoop103
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    4. 关闭防火墙

      关闭防火墙:
      sudo systemctl stop firewalld
      永久关闭防火墙:
      sudo systemctl disable firewalld
      查看防火墙状态:
      systemctl status firewalld
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    5. 创建普通用户

      sudo useradd atguigu
      sudo passwd atguigu
      
      • 1
      • 2
    6. 重启虚拟机后,配置普通用户具有root权限。

      sudo vim /etc/sudoers
      
      在root所在的行(100行)后,添加一行
      ## Allow root to run any commands anywhere
      root    ALL=(ALL)       ALL
      atguigu ALL=(ALL)       NOPASSWD:ALL
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    7. /opt目录下创建软件安装文件夹和存放安装包的文件夹并修改所有者。

      sudo mkdir module
      sudo mkdir software
      sudo mkdir /opt/module /opt/software
      sudo chown atguigu:atguigu /opt/module /opt/software
      
      • 1
      • 2
      • 3
      • 4

    2. JDK安装

    1. 解压JDK并配置环境变量

      tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
      sudo vim /etc/profile.d/my_env.sh
      
      添加如下内容:
      #JAVA_HOME
      export JAVA_HOME=/opt/module/jdk1.8.0_212
      export PATH=$PATH:$JAVA_HOME/bin
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    2. 测试JDK是否安装成功

      java -version
      
      • 1

      看到如下结果就证明安装成功:

      java version "1.8.0_212"
      Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
      Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)
      
      • 1
      • 2
      • 3

    3. Hadoop安装

    1. 解压

      tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
      
      • 1
    2. 添加环境变量

      sudo vim /etc/profile.d/my_env.sh
      
      添加如下内容:
      ##HADOOP_HOME
      export HADOOP_HOME=/opt/module/hadoop-3.1.3
      export PATH=$PATH:$HADOOP_HOME/bin
      export PATH=$PATH:$HADOOP_HOME/sbin
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    3. 让修改后的文件生效

      source /etc/profile.d/my_env.sh
      
      • 1
    4. 测试是否安装成功

      hadoop version
      
      • 1

      出现如下结果证明安装成功:

      Hadoop 3.1.3
      Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r 		ba631c436b806728f8ec2f54ab1e289526c90579
      Compiled by ztang on 2019-09-12T02:47Z
      Compiled with protoc 2.5.0
      From source with checksum ec785077c385118ac91aadde5ec9799
      This command was run using /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3.jar
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    5. 集群配置
      core-site.xml

      <configuration>
      <property>
          <name>fs.defaultFSname>
          <value>hdfs://hadoop101:8020value>
      property>
      <property>
          <name>hadoop.tmp.dirname>
          <value>/opt/module/hadoop-3.1.3/datavalue>
      property>
      <property>
          <name>hadoop.proxyuser.atguigu.hostsname>
          <value>*value>
      property>
      <property>
          <name>hadoop.proxyuser.atguigu.groupsname>
          <value>*value>
      property>
      <property>
          <name>hadoop.http.staticuser.username>
          <value>atguiguvalue>
      property>
      
      <property>
          <name>io.compression.codecsname>
          <value>
          org.apache.hadoop.io.compress.GzipCodec,
          org.apache.hadoop.io.compress.DefaultCodec,
          org.apache.hadoop.io.compress.BZip2Codec,
          org.apache.hadoop.io.compress.SnappyCodec,
          com.hadoop.compression.lzo.LzoCodec,
          com.hadoop.compression.lzo.LzopCodec
          value>
      property>
      
      <property>
      	<name>io.compression.codec.lzo.classname>
      	<value>com.hadoop.compression.lzo.LzoCodecvalue>
      property>
      
      configuration>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40

      hdfs-site.xml

      <configuration>
      <property>
          <name>dfs.namenode.secondary.http-addressname>
          <value>hadoop103:9868value>
      property>
      
      <property>
      		<name>dfs.replicationname>
      		<value>1value>
      property>
      configuration>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

      mapred-site.xml

      <configuration>
      <property>
          <name>mapreduce.framework.namename>
          <value>yarnvalue>
      property>
      configuration>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

      yarn-site.xml

      <configuration>
      
      	<property>
      		<name>yarn.nodemanager.aux-servicesname>
      		<value>mapreduce_shufflevalue>
      	property>
      	<property>
      		<name>yarn.resourcemanager.hostnamename>
      		<value>hadoop102value>
      	property>
      	<property>
      		<name>yarn.nodemanager.env-whitelistname>
      		<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOMEvalue>
      	property>
      	<property>
      		<name>yarn.scheduler.minimum-allocation-mbname>
      		<value>512value>
      	property>
      	<property>
      		<name>yarn.scheduler.maximum-allocation-mbname>
      		<value>4096value>
      	property>
      	<property>
      		<name>yarn.nodemanager.resource.memory-mbname>
      		<value>4096value>
      	property>
      	<property>
      		<name>yarn.nodemanager.pmem-check-enabledname>
      		<value>falsevalue>
      	property>
      	<property>
      		<name>yarn.nodemanager.vmem-check-enabledname>
      		<value>falsevalue>
      	property>
      
      configuration>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36

      workers

      hadoop101
      hadoop102
      hadoop103
      
      • 1
      • 2
      • 3
    项目经验之HDFS存储多目录

    当HDFS存储空间紧张的时候,需要对DataNode进行磁盘扩展

    1)在DataNode节点增加磁盘并进行挂载

    挂载:fdisk -l | grep FAT32
    在mnt目录下建立挂载目录:mkdir /mnt/usb
    挂载:mount -t vfat /dev/sdb1 /mnt/usb/
    卸载:umount /mnt/usb/
    
    • 1
    • 2
    • 3
    • 4

    2)在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题

    <property>
    	<name>dfs.datanode.data.dirname>
    	<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4value>
    property>
    
    • 1
    • 2
    • 3
    • 4

    3)增加磁盘后,保证每个目录数据均衡

    开启数据均衡命令:
    bin/start-balancer.sh -threshold 10
    对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。
    
    停止数据均衡:
    bin/stop-balancer.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    项目经验之支持LZO压缩配置

    1)hadoop本身并不支持lzo压缩,故需要使用twitter提供的hadoop-lzo开源组件。hadoop-lzo需依赖hadoop和lzo进行编译。

    2)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-3.1.3/share/hadoop/common/

    3)同步hadoop-lzo-0.4.20.jar到hadoop102、hadoop103

    4)core-site.xml增加配置支持LZO压缩

    <configuration>
    	<property>
    		<name>io.compression.codecsname>
    		<value>
    		org.apache.hadoop.io.compress.GzipCodec,
    		org.apache.hadoop.io.compress.DefaultCodec,
    		org.apache.hadoop.io.compress.BZip2Codec,
    		org.apache.hadoop.io.compress.SnappyCodec,
    		com.hadoop.compression.lzo.LzoCodec,
    		com.hadoop.compression.lzo.LzopCodec
    		value>
    	property>
    
    	<property>
        	<name>io.compression.codec.lzo.classname>
        	<value>com.hadoop.compression.lzo.LzoCodecvalue>
    	property>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    5)同步core-site.xml到hadoop102、hadoop103

    6)启动及查看集群

    项目经验之LZO创建索引

    1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。

    hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo
    
    • 1

    2)测试

    (1)将bigtable.lzo(150M)上传到集群的根目录

    (2)执行wordcount程序

    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output1
    
    • 1

    (3)对上传的LZO文件建索引

    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
    
    • 1

    (4)再次执行WordCount程序

    项目经验之Hadoop基准测试

    1) 测试HDFS写性能

    测试内容:向HDFS集群写10个128M的文件

    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
    
    • 1

    2)测试HDFS读性能

    测试内容:读取HDFS集群10个128M的文件

    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
    
    • 1

    测试生成的数据在HDFS中存在,要记得删除,不然占地方。

    3)使用Sort程序评测MapReduce
    (1)使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数

    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data
    
    • 1

    (2)执行Sort程序

    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data
    
    • 1

    (3)验证数据是否真正排好序了

    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data
    
    • 1
    项目经验之Hadoop参数调优

    1)HDFS参数调优hdfs-site.xml

    dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为8台时,此参数设置为60

    The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
    NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。
    
    • 1
    • 2

    2)YARN参数调优yarn-site.xml
    (1)情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive
    面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
    (2)解决办法:
       内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
      (a)yarn.nodemanager.resource.memory-mb
    表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
      (b)yarn.scheduler.maximum-allocation-mb
    单个任务可申请的最多物理内存量,默认是8192(MB)。

      备注:mapreduce.map.memory.mb一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。
    mapreduce.reduce.memory.mb一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。

    单任务内存怎么调:根据输入端数据的大小

      128m数据对应 1g内存(maptask)
      比如,有1G数据,那么1G/128m=8 ,也就是需要8个maptask = 8g,就将yarn.scheduler.maximum-allocation-mb设置为8;如果2G数据那就是单任务需要16G

    3)Hadoop宕机
      (1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
      (2)如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。或者修改flume的bathsize大小,默认一次拉取100个/s,快的话就减小,控制写入过快,导致的宕机。再不行,就加机器。

    4. Zookeeper安装

    1. 安装步骤

    2. zookeeper群起脚本

      zk.sh

      #!/bin/bash
      
      case $1 in
      "start"){
      
              for i in hadoop101 hadoop102 hadoop103
              do
                      echo "--------------- $i zookeeper启动---------------"
                      ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
              done
      };;
      "stop"){
      
              for i in hadoop101 hadoop102 hadoop103
              do
                      echo "--------------- $i zookeeper停止---------------"
                      ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
              done
      };;
      "status"){
      
              for i in hadoop101 hadoop102 hadoop103
              do
                      echo "--------------- $i zookeeper状态---------------"
                      ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
              done
      };;
      esac
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
    3. 增加执行权限

      chmod 777 zk.sh
      
      • 1
    4. Zookeeper集群启动、停止

      集群启动:
      zk.sh start
      
      集群停止:
      zk.sh stop
      
      • 1
      • 2
      • 3
      • 4
      • 5

    5. 日志生成

      说明:如果jar包用到的环境在集群上有,那就选不带环境的,如果没有,那就选带环境的将jar包上传到集群

    1. 第一种执行方式:

      这种执行方式会把运行日志打印到控制台
      java -classpath logcollector-1.0-SNAPSHOT-jar-with-dependencies.jar com.qcln.appclient.AppMain
      这种执行方式会把运行日志收集起来,存到当前目录的test.log文件中
      java -classpath logcollector-1.0-SNAPSHOT-jar-with-dependencies.jar com.qcln.appclient.AppMain > /opt/module/test.log
      
      • 1
      • 2
      • 3
      • 4

        运行后生成的日志文件在/tmp/logs目录下,文件名字为app-2022-10-02.log,这个都在代码的logback.xml中配置的

    2. 第二种执行方式:

      java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar  >/opt/module/test.log 
      
      • 1

        这种执行方式的前提是,你解压后看你的jar包META-INF/MANIFEST.MF文件中Main-Class是否有全类名,如果有那就可以,否则只能用第一种方式指定主类名

    3. 企业中一般用这种写法:

      java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar  >/dev/null 2>&1
      
      • 1

      标准输入0:从键盘获得输入 /proc/self/fd/0

      标准输出1:输出到屏幕(即控制台) /proc/self/fd/1

      错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

      这种写法的含义是:往黑洞里面扔,把错误输出2扔到标准输出1里面,再把1扔到黑洞里面,他是下面这种的简写:

      java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 2>/dev/null 1>/dev/null
      
      • 1

      集群日志生成启动脚本

      #!/bin/bash
      
      for i in hadoop101 hadoop102
      do
              echo "---------- $i 生成日志 ----------"
              ssh $i "java -jar /opt/module/logcollector-1.0-SNAPSHOT-jar-with-dependencies.jar >/dev/null 2>&1"
      done
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      集群时间同步修改脚本(仅作测试用)

      注意:该脚本仅仅是测试使用,生产环境勿用!!!

      #!/bin/bash
      	
      for i in hadoop101 hadoop102 hadoop103
      do
      	echo "---------- $i ----------"
      	ssh -t $i "sudo date -s $1"
      done
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      说明 -t参数是解决sudo报错:没有终端存在,且未指定askpass程序。用的,含义是创建一个终端

      集群同步执行命令脚本

      #!/bin/bash
      	
      for i in hadoop101 hadoop102 hadoop103
      do
      	echo "---------- $i ----------"
      	ssh $i "$*"
      done
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      先用时间同步修改脚本统一把集群时间修改为2020-xx-xx,然后运行日志生成脚本生成当天的用户行为数据

    6. 采集日志Flume

    Flume安装
    1. 解压、重命名

      tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
      mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
      
      • 1
      • 2
    2. 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

      rm /opt/module/flume/lib/guava-11.0.2.jar
      
      • 1
    3. 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

      mv flume-env.sh.template flume-env.sh
      vim flume-env.sh
      
      export JAVA_HOME=/opt/module/jdk1.8.0_212
      
      • 1
      • 2
      • 3
      • 4

    类型选择
    1)Source
      (1)source选择 TailDir Source,他的优点是:支持断点续传、多目录。flume1.6后支持
      (2)batchSize大小如何设置?这个就是Kafka读取数据的数据,当Event1k左右的时候,500-1000合适(默认为100)

    2)Channel
      采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中

      Flume1.7以前Kafka Channel很少有人使用,因为 每一行数据都有个前缀(topic+数据内容),而parseAsFlumeEvent 设置为false去不掉这个前缀,但是1.7之后就修改好了

    请添加图片描述

    复习回忆:Channel Selectors,可以让不同的项目日志通过不同的Channel到不同的Sink中去。官方文档上Channel Selectors有两种类型:Replicating Channel Selector (default)和Multiplexing Channel selector
    这两种selector的区别是:Replicating 会将source过来的events发往所有channel,而Multiplexing可以选择该发往哪些channel 。

    flume配置文件file-flume-kafka.conf

    a1.sources=r1
    a1.channels=c1 c2
    
    # configure source
    a1.sources.r1.type = TAILDIR
    # 断点续传的时候持久化到磁盘的时候的索引位置
    a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
    # 支持多文件目录的读取,定义第一个目录f1
    a1.sources.r1.filegroups = f1
    # .+是正则表达式,.是任意单个字符,+是前面的子表达式出现一次或多次
    a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
    # 添加一个头部,为文件的绝对路径
    a1.sources.r1.fileHeader = true
    # 这个source发往c1和c2
    a1.sources.r1.channels = c1 c2
    
    #interceptor
    # 定义两个拦截器,需要根据用户的逻辑自己定义
    a1.sources.r1.interceptors =  i1 i2
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
    a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
    
    a1.sources.r1.selector.type = multiplexing
    # 一个Event是有header和body,就是靠头区分数据发往那个channel
    a1.sources.r1.selector.header = topic
    a1.sources.r1.selector.mapping.topic_start = c1
    a1.sources.r1.selector.mapping.topic_event = c2
    
    # configure channel
    # channel c1的配置,topic类型是start
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
    a1.channels.c1.kafka.topic = topic_start
    a1.channels.c1.parseAsFlumeEvent = false
    # 定义一个消费者组
    a1.channels.c1.kafka.consumer.group.id = flume-consumer
    
    # channel c2的配置,topic类型是event
    a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
    a1.channels.c2.kafka.topic = topic_event
    a1.channels.c2.parseAsFlumeEvent = false
    # 定义一个消费者组
    a1.channels.c2.kafka.consumer.group.id = flume-consumer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    flume自定义拦截器步骤:定义类、实现interceptor接口、重写四个方法(初始化、单Event、多Event、关闭)

    Java知识:将字节数组转换成字符串:
    String s = new String(byte[],Charset.forName("UTF-8"));
    
    • 1
    • 2

    ETL拦截器

    LogETLInterceptor类

    package com.qcln.flume.interceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.List;
    	
    public class LogETLInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
         // 将event 转换为string 方便处理
         byte[] body = event.getBody();
    
         String log = new String(body, Charset.forName("UTF-8"));
    
         if(log.contains("start")){
             // 清洗启动日志
             if(LogUtils.vaildateStart(log)){
                return event;
             }
    
         }else{
             // 清洗事件日志
             if(LogUtils.vaildateEvent(log)){
                 return event;
             }
         }
    
         return null;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
    
         ArrayList<Event> interceptors = new ArrayList<>();
    
         // 遍历event
         for (Event event : events) {
             // 调用上面的单event方法进行清洗
             Event intercept1 = intercept(event);
             if(intercept1 != null){
                 interceptors.add(intercept1);
             }
         }
         return interceptors;
        }
    
        @Override
        public void close() {
    
        }
    
        // 静态内部类
        public static class Builder implements Interceptor.Builder{
    
         @Override
         public Interceptor build() {
             // new 一个自己
             return new LogETLInterceptor();
         }
    
         @Override
         public void configure(Context context) {
    
         }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    LogUtils类

    package com.qcln.flume.interceptor;
    
    import org.apache.commons.lang.math.NumberUtils;
    
    public class LogUtils {
    	public static boolean vaildateStart(String log) {
    
    		if(log == null){
    			return false;
    		}
    
    		// 是否是大括号开头和结尾,不是的话就干掉
    		if(!log.trim().startsWith("{") || !log.trim().endsWith("}")){
    			return false;
    		}
    
    		return true;
    	}
    
    	public static boolean vaildateEvent(String log) {
    
    		if(log == null){
    			return false;
    		}
    
    		// 时间 | json
    		// 切割
    		String[] logConents = log.split("\\|");  //正则表达式中 \| 表示 | ,所以要以|分隔的话就转义一下 \\|
    
    		// 判断长度
    		if(logConents.length != 2){
    			return false;
    		}
    
    		// 判断服务器时间  长度和都是数字,工具类,不等于13位和不全是数字就干掉
    		if(logConents[0].length() != 13 || !NumberUtils.isDigits(logConents[0])){
    			return false;
    		}
    
    		// 判断json完整性
    		if(!logConents[1].trim().startsWith("{") || !logConents[1].trim().endsWith("}")){
    			return false;
    		}
    
    		return true;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    日志类型拦截器

    LogTypeInterceptor类

    package com.qcln.flume.interceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class LogTypeInterceptor implements Interceptor {
    	@Override
    	public void initialize() {
    
    	}
    
    	@Override
    	public Event intercept(Event event) {
    
    		// 去除body数据
    		byte[] body = event.getBody();
    		String log = new String(body, Charset.forName("UTF-8"));
    
    		// 取出header
    		Map<String, String> headers = event.getHeaders();
    
    		if(log.contains("start")){
    			headers.put("topic","topic_start");
    		}else{
    			headers.put("topic","topic_event");
    		}
    		return event;
    	}
    
    	@Override
    	public List<Event> intercept(List<Event> events) {
    
    		ArrayList<Event> resultEvents = new ArrayList<>();
    
    		for (Event event : events) {
    			// 不用判断因为只是添加了一个标记
    			resultEvents.add(event);
    		}
    		return resultEvents;
    	}
    
    	@Override
    	public void close() {
    
    	}
    
    	public static class Builder implements 	Interceptor.Builder{
    
    		@Override
    		public Interceptor build() {
    			return new LogTypeInterceptor();
    		}
    
    		@Override
    		public void configure(Context context) {
    
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    完成后打包上传到服务器,flume目录下的lib包下。
    注意配置文件中拦截器的定义和选择器的定义,一定要和代码中的相对应

    7. kafka安装

    kafka安装
    1. 解压、重命名
      tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
      mv kafka_2.11-2.4.1/ kafka
      
      • 1
      • 2
    2. 在/opt/module/kafka目录下创建logs文件夹
      mkdir logs
      
      • 1
    3. 修改配置文件
      cd config/
      vim server.properties
      
      修改以下内容:
      #broker的全局唯一编号,不能重复
      broker.id=0
      #增加删除topic功能
      delete.topic.enable=true
      #kafka运行日志存放的路径
      log.dirs=/opt/module/kafka/logs
      #配置连接Zookeeper集群地址
      zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    4. 配置环境变量
      sudo vim /etc/profile.d/my_env.sh
      
      添加如下内容:
      #KAFKA_HOME
      export KAFKA_HOME=/opt/module/kafka
      export PATH=$PATH:$KAFKA_HOME/bin
      
      刷新使环境变量生效:
      source /etc/profile.d/my_env.sh
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    kafka群起脚本

    kf.sh

    1. #!/bin/bash
      
      case $1 in
      "start"){
          for i in hadoop101 hadoop102 hadoop103
          do
              echo "---------- $i Kafka启动----------"
              ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
          done
      };;
      "stop"){
          for i in hadoop101 hadoop102 hadoop103
          do
              echo "---------- $i Kafka停止----------"
              ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
          done
      };;
      esac
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    先启动zookeeper,然后启动Kafka。然后在Hadoop101上执行命令bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf启动flume,然后执行命令bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --from-beginning --topic topic_start启动一个Kafka消费者,消费topic_start中的数据。最后执行日志生成启动脚本生成日志,可以看到Kafka消费到了topic_start中的数据。

    zk.sh
    
    bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf
    
    bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --from-beginning --topic topic_start
    
    • 1
    • 2
    • 3
    • 4
    • 5

    flume群起脚本

    f1.sh

    1. #!/bin/bash
      
      case $1 in
      "start"){
      
              for i in hadoop101 hadoop102
              do
                      echo "---------- 启动 $i 采集flume"
                      ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.9.0/test1 2>&1  &"
              done
      
      };;
      
      "stop"){
      
              for i in hadoop101 hadoop102
              do
                      echo "---------- 停止 $i 采集flume"
                      ssh $i "ps -ef | grep flume | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
              done
      };;
      esac
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

      备注:

      grep -v grep:意思是去掉grep那个进程

      awk ‘{print $2}’:取出第二列,awk的默认分割符就是空格,也可以修改,反斜线的含义是转义,因为在shell中$2含义是第二个参数,而这里的含义是前面输出结果的第二列,所以需要转义

      xargs -n1 kill -9:xargs将前面的运行结果作为下一个命令的参数传递过去,-n1是因为有时候前面截取到的那一列有空行,而我们只想要第一行,所以加个-n1

    项目经验之Kafka压力测试

    使用官方自带的脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)

    1. 测试命令(往Kafka写):

      kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
      
      • 1

      说明:
      record-size是一条信息有多大,单位是字节。

      num-records是总共发送多少条信息。

      throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。

      请添加图片描述

    2. 测试命令(从Kafka读):

      bin/kafka-consumer-perf-test.sh --broker-list hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
      
      • 1

      参数说明:
      –zookeeper 指定zookeeper的链接信息

      –topic 指定topic的名称

      –fetch-size 指定每次fetch的数据的大小

      –messages 总共要消费的消息个数

      请添加图片描述

    项目经验之Kafka机器数量计算

    Kafka机器数量(经验公式)=2*(峰值生产速度副本数/100)+1
    先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
    比如我们的峰值生产速度是50M/s。副本数为2。
    Kafka机器数量=2
    (50*2/100)+ 1=3台

    8. Flume消费Kafka数据到HDFS

    flume配置-channel技术选型

      (1)file Channel基于磁盘速度慢可靠性高100万event

      (2)memory channel基于内存速度快可靠性差100个event

    生产环境怎么选择?

      如果是普通的日志﹐追求效率,丢一点数据不影响大局,选memory channel
      如果是金融的数据或者和钱有关系的数据,数据比较重要不允许丢,只能牺牲速度换取安全性,选file Channel

    kafka-flume-hdfs.conf配置文件
    因为用户行为日志分为两类,一类启动日志,一类事件日志,要分别存到HDFS上的不同路径下,所以要两个source、channel、sinks,分别采集启动日志和事件日志。在这个项目中我们选用KafkaSource、file channel、hdfs sink

    ## 组件
    a1.sources=r1 r2
    a1.channels=c1 c2
    a1.sinks=k1 k2
    
    ## source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    # sources每次拉取多少个event
    a1.sources.r1.batchSize = 5000
    # 延迟时间,条数没够,时间够了也会拉取
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
    a1.sources.r1.kafka.topics=topic_start
    
    ## source2
    a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r2.batchSize = 5000
    a1.sources.r2.batchDurationMillis = 2000
    a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
    a1.sources.r2.kafka.topics=topic_event
    
    ## channel1
    a1.channels.c1.type = file
    # 检查点
    a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
    # 数据存储目录
    a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    ## channel2
    a1.channels.c2.type = file
    a1.channels.c2.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior2
    a1.channels.c2.dataDirs = /opt/module/flume-1.9.0/data/behavior2/
    a1.channels.c2.maxFileSize = 2146435071
    a1.channels.c2.capacity = 1000000
    a1.channels.c2.keep-alive = 6
    
    ## sink1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
    # 存储文件的前缀
    a1.sinks.k1.hdfs.filePrefix = logstart-
    
    ##sink2
    a1.sinks.k2.type = hdfs
    a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
    a1.sinks.k2.hdfs.filePrefix = logevent-
    
    ## 不要产生大量小文件
    # 10秒滚动下一个文件,企业中常用3600,一个小时
    a1.sinks.k1.hdfs.rollInterval = 10
    # 当文件的大小到达128m的时候滚动
    a1.sinks.k1.hdfs.rollSize = 134217728
    # 不按照event的个数滚动
    a1.sinks.k1.hdfs.rollCount = 0
    
    a1.sinks.k2.hdfs.rollInterval = 10
    a1.sinks.k2.hdfs.rollSize = 134217728
    a1.sinks.k2.hdfs.rollCount = 0
    
    ## 控制输出文件是原生文件。
    # 是否启用压缩流
    a1.sinks.k1.hdfs.fileType = CompressedStream 
    a1.sinks.k2.hdfs.fileType = CompressedStream 
    # 压缩的方式 lzo plus
    a1.sinks.k1.hdfs.codeC = lzop
    a1.sinks.k2.hdfs.codeC = lzop
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    
    a1.sources.r2.channels = c2
    a1.sinks.k2.channel= c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    启动消费flume命令:

    bin/flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE
    
    • 1

    消费flume启动脚本

    f2.sh

    后续启动消费flume总不能每次都敲那么长一个命令吧,直接搞个脚本一键启动停止,一劳永逸。

    #! /bin/bash
    
    case $1 in
    "start"){
            for i in hadoop103
            do
                    echo " --------启动 $i 消费flume-------"
                    ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.9.0/log.txt   2>&1 &"
            done
    };;
    "stop"){
            for i in hadoop103
            do
                    echo " --------停止 $i 消费flume-------"
                    ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
            done
    
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    项目经验之Flume组件详解
    1)FileChannel和MemoryChannel区别

      Memory Channel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

      File Channel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

    2)File Channel优化

      通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
    官方说明如下:

    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
    
    • 1

      checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

    3)Sink:HDFS Sink

    (1)HDFS存入大量小文件,有什么影响?

      元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
      计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

    (2)HDFS小文件处理

      官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
    基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:

    (1)文件在达到128M时会滚动生成新文件

    (2)文件创建超3600秒时会滚动生成新文件

    (3)hdfs.rollCount=0是不启用的意思,因为每个event的大小不一样,不好控制。

    项目经验之Flume内存优化

    1)问题描述:如果启动消费Flume抛出如下异常

    ERROR hdfs.HDFSEventSink: process failed
    java.lang.OutOfMemoryError: GC overhead limit exceeded
    
    • 1
    • 2

    2)解决方案步骤:

    1. 在hadoop101服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

      export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
      
      • 1

      -Xms:启动flume所需要的内存,内存上限是100m
      -Xmx:flume正常运行后,能使用的内存上限是2000m

    2. 同步配置到hadoop102、hadoop103服务器

      [atguigu@hadoop102 conf]$ xsync flume-env.sh
      
      • 1
    3. Flume内存参数设置及优化

      JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
      -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
      -Xms表示JVM Heap(堆内存)最小尺寸,初始分配。
      -Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。
      如果设置不一致,容易在初始化时,由于内存不够,频繁触发fullgc。

    9. 采集通道启动/停止脚本

    zookeeper集群启动脚本:zk.sh
    flume采集集群启动脚本:f1.sh
    Kafka集群启动脚本:kf.sh
    flume消费集群启动脚本:f2.sh

    #!/bin/bash
    
    case $1 in
    "start"){
            echo " -------- 启动 集群 -------"
    
            echo " -------- 启动 hadoop集群 -------"
            /opt/module/hadoop-3.1.3/sbin/start-dfs.sh
            ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
    
            #启动 Zookeeper集群
            zk.sh start
    
            sleep 6s;
    
            #启动 Flume采集集群
            f1.sh start
    
            #启动 Kafka采集集群
            kf.sh start
    
            sleep 8s;
    
            #启动 Flume消费集群
            f2.sh start
    
            };;
    "stop"){
        echo " -------- 停止 集群 -------"
    
    
        #停止 Flume消费集群
            f2.sh stop
    
            #停止 Kafka采集集群
            kf.sh stop
    
        sleep 8s;
    
            #停止 Flume采集集群
            f1.sh stop
    
            #停止 Zookeeper集群
            zk.sh stop
    
            echo " -------- 停止 hadoop集群 -------"
            ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
            /opt/module/hadoop-3.1.3/sbin/stop-dfs.sh
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    四、业务数据采集模块

    1. MySQL安装

    1)卸载自带的MySQL-libs

    rpm -qa | grep -i -E mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps
    
    • 1
    grep -i 不区分大小写    -E 给grep增加and语义,a或b
    
    • 1

    2)将安装包和JDBC驱动上传到服务器,一共6个

    01_mysql-community-common-5.7.29-1.el7.x86_64.rpm
    02_mysql-community-libs-5.7.29-1.el7.x86_64.rpm
    03_mysql-community-libs-compat-5.7.29-1.el7.x86_64.rpm
    04_mysql-community-client-5.7.29-1.el7.x86_64.rpm
    05_mysql-community-server-5.7.29-1.el7.x86_64.rpm
    mysql-connector-java-5.1.48.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3)安装mysql依赖

    sudo rpm -ivh 01_mysql-community-common-5.7.29-1.el7.x86_64.rpm
    sudo rpm -ivh 02_mysql-community-libs-5.7.29-1.el7.x86_64.rpm
    sudo rpm -ivh 03_mysql-community-libs-compat-5.7.29-1.el7.x86_64.rpm
    
    • 1
    • 2
    • 3

    4)安装mysql-client

    sudo rpm -ivh 04_mysql-community-client-5.7.29-1.el7.x86_64.rpm
    
    • 1

    5)安装mysql-server

    sudo rpm -ivh 05_mysql-community-server-5.7.29-1.el7.x86_64.rpm
    
    • 1

    6)启动mysql

    sudo systemctl start mysqld
    
    • 1

    7)查看mysql密码

    sudo cat /var/log/mysqld.log | grep password
    
    • 1

    配置MySQL

    配置只要是root用户+密码,在任何主机上都能登录MySQL数据库。

    1)用刚刚查到的密码进入mysql

    mysql -uroot -p’password’
    
    • 1

    3)更改mysql密码策略

    set global validate_password_length=4;
    set global validate_password_policy=0;
    
    • 1
    • 2

    4)设置简单好记的密码

    set password=password("000000");
    
    • 1

    5)进入msyql库

    use mysql
    
    • 1

    6)查询user表

    select user, host from user;
    
    • 1

    7)修改user表,把Host表内容修改为%

    update user set host="%" where user="root";
    
    • 1

    8)刷新

    flush privileges;
    
    • 1

    9)退出

    quit;
    
    • 1

    2. Sqoop安装

    1. 进入到/opt/module/sqoop/conf目录,重命名配置文件
    mv sqoop-env-template.sh sqoop-env.sh
    
    • 1
    1. 修改配置文件
    vim sqoop-env.sh 
    
    增加如下内容
    export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3
    export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
    export HIVE_HOME=/opt/module/hive
    export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
    export ZOOCFGDIR=/opt/module/zookeeper-3.5.7/conf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3)拷贝JDBC驱动

    因为sqoop要将MySQL中的数据导入到hdfs,所以要将MySQL驱动jar包拷贝到sqoop的lib目录下

    cp mysql-connector-java-5.1.48.jar /opt/module/sqoop/lib/
    
    • 1

    4)验证Sqoop

    sqoop help
    
    • 1

    出现一些Warning警告,并伴随有帮助命令的输出。

    5)测试Sqoop是否能够成功连接数据库

    sqoop list-databases --connect jdbc:mysql://hadoop101:3306/ --username root --password 000000
    
    • 1

    3. 业务数据生成

    1)通过MySQL可视化工具连接MySQL
    请添加图片描述

    2)创建gmall数据库
    请添加图片描述
    3)运行数据库结构脚本(gmall2020-03-16.sql)
      这个脚本会生成数据库的结构和一点数据

    4)把gmall-mock-db-2020-03-16-SNAPSHOT.jar和 application.properties上传到服务器的/opt/module/db_log路径上

    5)修改application.properties相关配置
      主要是检查下jdbc链接、用户名、密码、业务数据的时间、是否重置,其他参数都已经差不多调到最优了。

    logging.level.root=info
    
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.url=jdbc:mysql://hadoop102:3306/gmall?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
    spring.datasource.username=root
    spring.datasource.password=000000
    
    logging.pattern.console=%m%n
    
    mybatis-plus.global-config.db-config.field-strategy=not_null
    
    #业务日期
    mock.date=2020-03-10
    #是否重置,1是重置的意思
    mock.clear=1
    
    #是否生成新用户
    mock.user.count=50
    #男性比例
    mock.user.male-rate=20
    
    #收藏取消比例
    mock.favor.cancel-rate=10
    #收藏数量
    mock.favor.count=100
    
    #购物车数量
    mock.cart.count=10
    #每个商品最多购物个数
    mock.cart.sku-maxcount-per-cart=3
    
    #用户下单比例
    mock.order.user-rate=80
    #用户从购物中购买商品比例
    mock.order.sku-rate=70
    #是否参加活动
    mock.order.join-activity=1
    #是否使用购物券
    mock.order.use-coupon=1
    #购物券领取人数
    mock.coupon.user-count=10
    
    #支付比例
    mock.payment.rate=70
    #支付方式 支付宝:微信 :银联
    mock.payment.payment-type=30:60:10
    
    #评价比例 好:中:差:自动
    mock.comment.appraise-rate=30:10:10:50
    
    #退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
    mock.refund.reason-rate=30:10:20:5:15:5:5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    6)生成2020-03-10日期数据

    java -jar gmall-mock-db-2020-03-16-SNAPSHOT.jar
    
    • 1

    7)在配置文件application.properties中修改

    mock.date=2020-03-11
    mock.clear=0
    
    • 1
    • 2

    8)再次执行命令,生成2020-03-11日期数据:

     java -jar gmall-mock-db-2020-03-16-SNAPSHOT.jar
    
    • 1

    4. 业务数据导入HDFS

    1)脚本编写

    #! /bin/bash
    
    sqoop=/opt/module/sqoop/bin/sqoop
    do_date=`date -d '-1 day' +%F`
    
    if [[ -n "$2" ]]; then
    	do_date=$2
    fi
    
    import_data(){
    $sqoop import \
    --connect jdbc:mysql://hadoop101:3306/gmall \
    --username root \
    --password 000000 \
    --target-dir /origin_data/gmall/db/$1/$do_date \
    --delete-target-dir \
    --query "$2 and  \$CONDITIONS" \
    --num-mappers 1 \
    --fields-terminated-by '\t' \
    --compress \
    --compression-codec lzop \
    --null-string '\\N' \
    --null-non-string '\\N'
    
    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/gmall/db/$1/$do_date
    }
    
    import_order_info(){
      import_data order_info "select
    							id, 
    							final_total_amount, 
    							order_status, 
    							user_id, 
    							out_trade_no, 
    							create_time, 
    							operate_time,
    							province_id,
    							benefit_reduce_amount,
    							original_total_amount,
    							feight_fee      
    						from order_info
    						where (date_format(create_time,'%Y-%m-%d')='$do_date' 
    						or date_format(operate_time,'%Y-%m-%d')='$do_date')"
    }
    
    import_coupon_use(){
      import_data coupon_use "select
    						  id,
    						  coupon_id,
    						  user_id,
    						  order_id,
    						  coupon_status,
    						  get_time,
    						  using_time,
    						  used_time
    						from coupon_use
    						where (date_format(get_time,'%Y-%m-%d')='$do_date'
    						or date_format(using_time,'%Y-%m-%d')='$do_date'
    						or date_format(used_time,'%Y-%m-%d')='$do_date')"
    }
    
    import_order_status_log(){
      import_data order_status_log "select
    								  id,
    								  order_id,
    								  order_status,
    								  operate_time
    								from order_status_log
    								where date_format(operate_time,'%Y-%m-%d')='$do_date'"
    }
    
    import_activity_order(){
      import_data activity_order "select
    								id,
    								activity_id,
    								order_id,
    								create_time
    							  from activity_order
    							  where date_format(create_time,'%Y-%m-%d')='$do_date'"
    }
    
    import_user_info(){
      import_data "user_info" "select 
    							id,
    							name,
    							birthday,
    							gender,
    							email,
    							user_level, 
    							create_time,
    							operate_time
    						  from user_info 
    						  where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date' 
    						  or DATE_FORMAT(operate_time,'%Y-%m-%d')='$do_date')"
    }
    
    import_order_detail(){
      import_data order_detail "select 
    							  od.id,
    							  order_id, 
    							  user_id, 
    							  sku_id,
    							  sku_name,
    							  order_price,
    							  sku_num, 
    							  od.create_time  
    							from order_detail od
    							join order_info oi
    							on od.order_id=oi.id
    							where DATE_FORMAT(od.create_time,'%Y-%m-%d')='$do_date'"
    }
    
    import_payment_info(){
      import_data "payment_info"  "select 
    								id,  
    								out_trade_no, 
    								order_id, 
    								user_id, 
    								alipay_trade_no, 
    								total_amount,  
    								subject, 
    								payment_type, 
    								payment_time 
    							  from payment_info 
    							  where DATE_FORMAT(payment_time,'%Y-%m-%d')='$do_date'"
    }
    
    import_comment_info(){
      import_data comment_info "select
    							  id,
    							  user_id,
    							  sku_id,
    							  spu_id,
    							  order_id,
    							  appraise,
    							  comment_txt,
    							  create_time
    							from comment_info
    							where date_format(create_time,'%Y-%m-%d')='$do_date'"
    }
    
    import_order_refund_info(){
      import_data order_refund_info "select
    								id,
    								user_id,
    								order_id,
    								sku_id,
    								refund_type,
    								refund_num,
    								refund_amount,
    								refund_reason_type,
    								create_time
    							  from order_refund_info
    							  where date_format(create_time,'%Y-%m-%d')='$do_date'"
    }
    
    import_sku_info(){
      import_data sku_info "select 
    						  id,
    						  spu_id,
    						  price,
    						  sku_name,
    						  sku_desc,
    						  weight,
    						  tm_id,
    						  category3_id,
    						  create_time
    						from sku_info where 1=1"
    }
    
    import_base_category1(){
      import_data "base_category1" "select 
    								  id,
    								  name 
    								from base_category1 where 1=1"
    }
    
    import_base_category2(){
      import_data "base_category2" "select
    								  id,
    								  name,
    								  category1_id 
    								from base_category2 where 1=1"
    }
    
    import_base_category3(){
      import_data "base_category3" "select
    								  id,
    								  name,
    								  category2_id
    								from base_category3 where 1=1"
    }
    
    import_base_province(){
      import_data base_province "select
    							  id,
    							  name,
    							  region_id,
    							  area_code,
    							  iso_code
    							from base_province
    							where 1=1"
    }
    
    import_base_region(){
      import_data base_region "select
    							  id,
    							  region_name
    							from base_region
    							where 1=1"
    }
    
    import_base_trademark(){
      import_data base_trademark "select
    								tm_id,
    								tm_name
    							  from base_trademark
    							  where 1=1"
    }
    
    import_spu_info(){
      import_data spu_info "select
    							id,
    							spu_name,
    							category3_id,
    							tm_id
    						  from spu_info
    						  where 1=1"
    }
    
    import_favor_info(){
      import_data favor_info "select
    						  id,
    						  user_id,
    						  sku_id,
    						  spu_id,
    						  is_cancel,
    						  create_time,
    						  cancel_time
    						from favor_info
    						where 1=1"
    }
    
    import_cart_info(){
      import_data cart_info "select
    						id,
    						user_id,
    						sku_id,
    						cart_price,
    						sku_num,
    						sku_name,
    						create_time,
    						operate_time,
    						is_ordered,
    						order_time
    					  from cart_info
    					  where 1=1"
    }
    
    import_coupon_info(){
      import_data coupon_info "select
    						  id,
    						  coupon_name,
    						  coupon_type,
    						  condition_amount,
    						  condition_num,
    						  activity_id,
    						  benefit_amount,
    						  benefit_discount,
    						  create_time,
    						  range_type,
    						  spu_id,
    						  tm_id,
    						  category3_id,
    						  limit_num,
    						  operate_time,
    						  expire_time
    						from coupon_info
    						where 1=1"
    }
    
    import_activity_info(){
      import_data activity_info "select
    							  id,
    							  activity_name,
    							  activity_type,
    							  start_time,
    							  end_time,
    							  create_time
    							from activity_info
    							where 1=1"
    }
    
    import_activity_rule(){
    	import_data activity_rule "select
    									id,
    									activity_id,
    									condition_amount,
    									condition_num,
    									benefit_amount,
    									benefit_discount,
    									benefit_level
    								from activity_rule
    								where 1=1"
    }
    
    import_base_dic(){
    	import_data base_dic "select
    							dic_code,
    							dic_name,
    							parent_code,
    							create_time,
    							operate_time
    						  from base_dic
    						  where 1=1" 
    }
    
    case $1 in
      "order_info")
    	 import_order_info
    ;;
      "base_category1")
    	 import_base_category1
    ;;
      "base_category2")
    	 import_base_category2
    ;;
      "base_category3")
    	 import_base_category3
    ;;
      "order_detail")
    	 import_order_detail
    ;;
      "sku_info")
    	 import_sku_info
    ;;
      "user_info")
    	 import_user_info
    ;;
      "payment_info")
    	 import_payment_info
    ;;
      "base_province")
    	 import_base_province
    ;;
      "base_region")
    	 import_base_region
    ;;
      "base_trademark")
    	 import_base_trademark
    ;;
      "activity_info")
    	  import_activity_info
    ;;
      "activity_order")
    	  import_activity_order
    ;;
      "cart_info")
    	  import_cart_info
    ;;
      "comment_info")
    	  import_comment_info
    ;;
      "coupon_info")
    	  import_coupon_info
    ;;
      "coupon_use")
    	  import_coupon_use
    ;;
      "favor_info")
    	  import_favor_info
    ;;
      "order_refund_info")
    	  import_order_refund_info
    ;;
      "order_status_log")
    	  import_order_status_log
    ;;
      "spu_info")
    	  import_spu_info
    ;;
      "activity_rule")
    	  import_activity_rule
    ;;
      "base_dic")
    	  import_base_dic
    ;;
    
    "first")
       import_base_category1
       import_base_category2
       import_base_category3
       import_order_info
       import_order_detail
       import_sku_info
       import_user_info
       import_payment_info
       import_base_province
       import_base_region
       import_base_trademark
       import_activity_info
       import_activity_order
       import_cart_info
       import_comment_info
       import_coupon_use
       import_coupon_info
       import_favor_info
       import_order_refund_info
       import_order_status_log
       import_spu_info
       import_activity_rule
       import_base_dic
    ;;
    "all")
       import_base_category1
       import_base_category2
       import_base_category3
       import_order_info
       import_order_detail
       import_sku_info
       import_user_info
       import_payment_info
       import_base_trademark
       import_activity_info
       import_activity_order
       import_cart_info
       import_comment_info
       import_coupon_use
       import_coupon_info
       import_favor_info
       import_order_refund_info
       import_order_status_log
       import_spu_info
       import_activity_rule
       import_base_dic
    ;;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
    • 417
    • 418
    • 419
    • 420
    • 421
    • 422
    • 423
    • 424
    • 425
    • 426
    • 427
    • 428
    • 429
    • 430
    • 431
    • 432
    • 433
    • 434
    • 435
    • 436
    • 437

    脚本说明:

    1.  [ -n 变量值 ] 变量值不为空返回true,否则返回false
    
    2.  [ -z 变量值 ] 变量值长度为0返回true,否则返回false
    
    3.  如果日期是传进来的就直接赋值给他,如果没有传进来那就用当前日期减一
    
    4.  (` )反引号(esc键下方的那个键),当在脚本中需要执行一些指令并且将执行的结果赋给变量的时候需要使用“反引号”。
    
    5.  date +%F 提取时间,提取出来的格式为 年-月-日     date -d '-1 day' 系统当前时间减1
    
    6.	mr的输出目录必须不存在
    		--delete-target-dir \
    	
    7.  为啥全表导有where 1=1 ,因为参数2是SQL 为了语法正确  select * from 表名 where 1=1 and $CONDITIONS
    	    --query "$2 and  \$CONDITIONS" \
    	
    8.  底层是mr,map的数量1,默认四个
    	    --num-mappers 1 \
    	
    9.  列分割符号
    	    --fields-terminated-by '\t' \
    	
    10. 压缩流
    	    --compress \
    	
    11. 编码方式loz压缩
    	    --compression-codec lzop \
    	
    12. MySQL中空是null,而hive中空是\n,为了解决歧义
    	    --null-string '\\N' \
    	    --null-non-string '\\N'
    	
    13. 落盘到hdfs后立马生成loz索引文件
    	    hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/gmall/db/$1/$do_date
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    2)修改脚本权限

    chmod 777 gmall_mysql_to_hdfs.sh
    
    • 1

    3)初次导入

    gmall_mysql_to_hdfs.sh first 2020-03-10
    
    • 1

    将所有的表一次性都导入HDFS

    4)每日导入

    gmall_mysql_to_hdfs.sh all 2020-03-11
    
    • 1

    地区表和省份表没必要每次都导入HDFS,所以第一个参数为all的时间除了地区表和省份表,将其他的表都导入HDFS

    项目经验
    Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。
            在导出数据时增加如下配置:
    		        --input-null-string '\\N' \
    		        --input-null-non-string '\\N'
            导入数据时增加如下配置:
    		        --null-string
    		        --null-non-string
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5. Hive安装部署

    1)修改/etc/profile.d/my_env.sh,添加环境变量

    sudo vim /etc/profile.d/my_env.sh
    
    #HIVE_HOME
    export HIVE_HOME=/opt/module/hive
    export PATH=$PATH:$HIVE_HOME/bin
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2)解决日志Jar包冲突,进入/opt/module/hive/lib目录

    mv log4j-slf4j-impl-2.10.0.jar log4j-slf4j-impl-2.10.0.jar.bak
    
    • 1

    Hive元数据配置到MySql

    1)将MySQL的JDBC驱动拷贝到Hive的lib目录下

    cp /opt/software/mysql-connector-java-5.1.48.jar /opt/module/hive/lib/
    
    • 1

    2)在$HIVE_HOME/conf目录下新建hive-site.xml文件

      内容如下:

    
    
    <configuration>
    	<property>
    		<name>javax.jdo.option.ConnectionURLname>
    		<value>jdbc:mysql://hadoop101:3306/metastore?useSSL=falsevalue>
    	property>
    
    	<property>
    		<name>javax.jdo.option.ConnectionDriverNamename>
    		<value>com.mysql.jdbc.Drivervalue>
    	property>
    
    	<property>
    		<name>javax.jdo.option.ConnectionUserNamename>
    		<value>rootvalue>
    	property>
    
    	<property>
    		<name>javax.jdo.option.ConnectionPasswordname>
    		<value>000000value>
    	property>
    
    	<property>
    		<name>hive.metastore.warehouse.dirname>
    		<value>/user/hive/warehousevalue>
    	property>
    
    	<property>
    		<name>hive.metastore.schema.verificationname>
    		<value>falsevalue>
    	property>
    
    	<property>
    		<name>hive.metastore.urisname>
    		<value>thrift://hadoop101:9083value>
    	property>
    
    	<property>
    	<name>hive.server2.thrift.portname>
    	<value>10000value>
    	property>
    
    	<property>
    		<name>hive.server2.thrift.bind.hostname>
    		<value>hadoop101value>
    	property>
    
    	<property>
    		<name>hive.metastore.event.db.notification.api.authname>
    		<value>falsevalue>
    	property>
    	
    	<property>
    		<name>hive.cli.print.headername>
    		<value>truevalue>
    	property>
    
    	<property>
    		<name>hive.cli.print.current.dbname>
    		<value>truevalue>
    	property>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    启动Hive

    1)初始化元数据库

    1. 登陆MySQL
    	mysql -uroot -p000000
    
    
    2. 新建Hive元数据库
    	create database metastore;
    	quit;
    
    
    3. 初始化Hive元数据库
    	schematool -initSchema -dbType mysql -verbose
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2)启动metastore和hiveserver2

      Hive 2.x以上版本,要先启动这两个服务,否则会报错

      在/opt/module/hive/bin目录编写hive服务启动脚本

      hiveservices.sh内容如下:

    #!/bin/bash
    HIVE_LOG_DIR=$HIVE_HOME/logs
    
    mkdir -p $HIVE_LOG_DIR
    
    #检查进程是否运行正常,参数1为进程名,参数2为进程端口
    function check_process()
    {
    	pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}')
    	ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1)
    	echo $pid
    	[[ "$pid" =~ "$ppid" ]] && [ "$ppid" ] && return 0 || return 1
    }
    
    function hive_start()
    {
    	metapid=$(check_process HiveMetastore 9083)
    	cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2>&1 &"
    	cmd=$cmd" sleep 4; hdfs dfsadmin -safemode wait >/dev/null 2>&1"
    	[ -z "$metapid" ] && eval $cmd || echo "Metastroe服务已启动"
    	server2pid=$(check_process HiveServer2 10000)
    	cmd="nohup hive --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2>&1 &"
    	[ -z "$server2pid" ] && eval $cmd || echo "HiveServer2服务已启动"
    }
    
    function hive_stop()
    {
    	metapid=$(check_process HiveMetastore 9083)
    	[ "$metapid" ] && kill $metapid || echo "Metastore服务未启动"
    	server2pid=$(check_process HiveServer2 10000)
    	[ "$server2pid" ] && kill $server2pid || echo "HiveServer2服务未启动"
    }
    
    case $1 in
    "start")
    	hive_start
    	;;
    "stop")
    	hive_stop
    	;;
    "restart")
    	hive_stop
    	sleep 2
    	hive_start
    	;;
    "status")
    	check_process HiveMetastore 9083 >/dev/null && echo "Metastore服务运行正常" || echo "Metastore服务运行异常"
    	check_process HiveServer2 10000 >/dev/null && echo "HiveServer2服务运行正常" || echo "HiveServer2服务运行异常"
    	;;
    *)
    	echo Invalid Args!
    	echo 'Usage: '$(basename $0)' start|stop|restart|status'
    	;;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    3)添加执行权限

    chmod +x hiveservices.sh
    
    • 1

    4)启动Hive后台服务

    hiveservices.sh start
    
    • 1

    5)查看Hive后台服务运行情况

    hiveservices.sh status
    
    • 1

    6)启动Hive客户端

    bin/hive
    
    • 1
  • 相关阅读:
    C++动态内存管理
    GD32F4xx适配OpenHarmony问题踩坑记录
    SqlUtils 使用
    基于pythonGUI的图形绘图及图元编辑系统
    python 练习题——1.数字组合
    常用类05:String类(面试中用的最多)
    【AcWing16】【LeetCode】并查集Union Find-128/130/*1020-学完广度优先/深度优先要回来再看
    预约小程序新选择:强大后端管理功能一览
    基于php的宠物领养系统
    会员生日提前了一天
  • 原文地址:https://blog.csdn.net/qq_45796486/article/details/127417768