• Hive学习(待续)


    1 Hive 基本概念

    1.1 Hive简介

    Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并
    提供类 SQL 查询功能
    用于解决海量结构化日志的数据统计工具

    1.2 Hive本质

    将 HQL 转化成 MapReduce 程序

    • Hive 处理的数据存储在 HDFS
    • Hive 分析数据底层的实现是 MapReduce
    • 执行程序运行在 Yarn 上

    1.3 优缺点

    1.3.1 优点

    1.3.2 缺点

    1.4 Hive架构原理

    2 Hive数据类型

    2.1 基本数据类型

    基本数据类型

    2.2 集合类型

    集合类型

    2.3 代码测试

    需求:
    json 存储 进入 hive

    {
    	 "name": "songsong",
    	 "friends": ["bingbing" , "lili"] , //列表 Array, 
    	 "children": { //键值 Map,
    	 "xiao song": 18 ,
    	 "xiaoxiao song": 19
     	}
    	 "address": { //结构 Struct,
    	 "street": "hui long guan",
    	 "city": "beijing"
     	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    本地 txt:

    songsong,bingbing_lili,xiao song:18_xiaoxiao song:19,hui long 
    guan_beijing
    yangyang,caicai_susu,xiao yang:18_xiaoxiao yang:19,chao yang_beijing
    
    • 1
    • 2
    • 3
    show databases;
    
    create database lya_test;
    
    use lya_test;
    
    show tables;
    
    create table test_create_table(id int,name string);
    
    create table test(
    name string,
    friends array<string>,
    children map<string, int>,
    address struct<street:string, city:string>
    )
    -- 如需用对应的文本文件导入 则加上
    row format delimited fields terminated by ','
    collection items terminated by '_'
    map keys terminated by ':'
    lines terminated by '\n';
    
    load data local inpath '/opt/module/hive/datas/test.txt' into table test;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    字段解释:
    row format delimited fields terminated by ‘,’ – 列分隔符
    collection items terminated by ‘_’ --MAP STRUCT 和 ARRAY 的分隔符(数据分割符号)
    map keys terminated by ‘:’ – MAP 中的 key 与 value 的分隔符
    lines terminated by ‘\n’; – 行分隔符

    2.3.1 分区测试

     create table dept_partition
     (deptno int, dname string, loc string)
    partitioned by (day string)
    row format delimited fields terminated by '\t';
    
    load data local inpath 
    '/hive/datas/dept_20200401.log' 
    into table dept_partition 
    partition(day='20200401');
    
     create table dept_partition_dy(id int, name string) 
    partitioned by (loc int) row format delimited fields terminated by '\t';
    insert into table dept_partition_dy select id ,name,loc from dept
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3. 函数

    3.1 窗口函数

    • 函数说明:

    OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化。

    CURRENT ROW:当前行
    n PRECEDING:往前n行数据
    n FOLLOWING:往后n行数据
    UNBOUNDED:无边界,

    • UNBOUNDED PRECEDING 表示起点,
    • UNBOUNDEDFOLLOWING 表示终点

    LAG(col,n,default_val):往前第n行数据
    LEAD(col,n,default_val):往后第n行数据

    3.1.1 案例数据

    jack,2022-01-01,10
    tony,2022-01-02,15
    jack,2022-02-03,23
    tony,2022-01-04,29
    jack,2022-01-05,46
    jack,2022-04-06,42
    tony,2022-01-07,50
    jack,2022-01-08,55
    mart,2022-04-08,62
    mart,2022-04-09,68
    neil,2022-05-10,12
    mart,2022-04-11,75
    neil,2022-06-12,80
    mart,2022-04-13,94
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    孙悟空,语文,87
    孙悟空,数学,95
    孙悟空,英语,68
    大海,语文,94
    大海,数学,56
    大海,英语,84
    宋宋,语文,64
    宋宋,数学,86
    宋宋,英语,84
    婷婷,语文,65
    婷婷,数学,85
    婷婷,英语,78
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.1.2 案例代码

    1. 初始化数据
        private static void initData(SparkSession spark) {
           spark.sql("drop table `business`");
           spark.sql("create table `business`(" +
                   "`name` string," +
                   "`orderdate` string," +
                   "`cost` int)" +
                   "row format delimited fields terminated by ','");
           spark.sql("load data local inpath 'datas/business.txt' into table business");
           spark.sql("create table score(" +
                   "name string,subject string, score int) " +
                   "row format delimited fields terminated by ',' ");
           spark.sql("load data local inpath 'datas/rank.txt' into table score");
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    1. 查询数据
            spark.sql("select name , count(*) from business group by name").show();
            spark.sql("select name , count(*) over() from business").show();
    
    • 1
    • 2

    在这里插入图片描述

    // 1 查询在 2022 年 4 月份购买过的顾客及总人数
            spark.sql("select name , count(*)" +
                    "from business " +
                    "where substring(orderdate,1,7) = '2022-04' " +
                    "group by name").show();
    
            spark.sql("select name , count(*)  over()" +
                    "from business " +
                    "where substring(orderdate,1,7) = '2022-04' " +
                    "group by name").show();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述
    小结
    group by相同的数据一个组
    over每条数据一个组

            // 2 查询顾客的购买明细及对应每个顾客月购买总额
            spark.sql("select name, orderdate,cost, sum(cost) over(partition by name,month(orderdate))" +
                    "from business").show();
    
    • 1
    • 2
    • 3

    在这里插入图片描述

             // 3 上述的场景, 将每个顾客的 cost 按照日期顺序进行累加
            spark.sql("select name, orderdate,cost, sum(cost) over(partition by name,month(orderdate) order by orderdate) " +
                    "from business").show();
    
    • 1
    • 2
    • 3

    在这里插入图片描述

            // 4 查询每个顾客上次的购买时间
            spark.sql("select name, orderdate, lag(orderdate,1) over (partition by name order by orderdate) " +
                    "from business").show();
    
    • 1
    • 2
    • 3

    在这里插入图片描述

            // 4 查询每个顾客上次的购买时间
            spark.sql("select name, orderdate, lag(orderdate,1) over (partition by name order by orderdate) " +
                    "from business").show();
                
    
    • 1
    • 2
    • 3
    • 4
    		spark.sql("select * ,rank() over(order by score) from score ").show();
    		spark.sql("select * ,dense_rank() over(order by score) from score ").show();
            //5. 每个学科前三名
            spark.sql("select * ,rank() over(partition by subject order by score desc ) rank from score ").createOrReplaceTempView("t");
            spark.sql("select * from t where t.rank <=3").show();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    3.2 行列转换

    3.2.1 行转列

    • 函数说明
      CONCAT(string A/col, string B/col…):返回输入字符串连接后的结果,支持任意个输入字
      符串(任意一个字段为空整体拼接结果就为空);
      CONCAT_WS(separator, str1, str2,...):它是一个特殊形式的 CONCAT()。第一个参数剩余参
      数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是 NULL,返回值也将
      为 NULL。这个函数会跳过分隔符参数后的任何 NULL 和空字符串。分隔符将被加到被连接
      的字符串之间;
      注意: CONCAT_WS must be "string or array\
      COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重
      汇总,产生 Array 类型字段。
          //查询每个科目包含哪些人
            spark.sql("use test");
            spark.sql("insert into table score values ('小明',\"语文\",95)");
            spark.sql("select subject ,concat_ws(\",\",collect_set(name)) from score group by subject").show();
            System.out.println("===============");
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    3.2.2 列转行

    • 函数说明
      EXPLODE(col):将 hive 一列中复杂的 Array 或者 Map 结构拆分成多行。
      LATERAL VIEW
      用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
      解释:用于和 split, explode 等 UDTF 一起使用,它能够将一列数据拆成多行数据,在此
      基础上可以对拆分后的数据进行聚合。
    3.2.2.1 案例数据
    《疑犯追踪》 悬疑,动作,科幻,剧情
    《Lie to me》 悬疑,警匪,动作,心理,剧情
    《战狼 2》 战争,动作,灾难
    
    • 1
    • 2
    • 3
    3.2.2.2 案例代码
            spark.sql("create table movie_info(" +
                    " movie string," +
                    " category string)" +
                    "row format delimited fields terminated by \"\\t\"");
            spark.sql("insert into table movie_info values ('《疑犯追踪》',\"悬疑,动作,科幻,剧情\")," +
                    "('《Lie to me》',\"悬疑,警匪,动作,心理,剧情\"),('《战狼 2》',\"战争,动作,灾难\")");
            spark.sql("select movie, category_name from " +
                    "movie_info " +
                    "lateral view explode(split(category,\",\")) movie_info_tmp as category_name").show();
    
                    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    4. Demo

    4.1 数据准备

    表结构

    1. 视频表
      在这里插入图片描述
    2. 用户表
      在这里插入图片描述
      sql语句
    create table gulivideo_ori(
    videoId string, 
     uploader string, 
     age int, 
     category array<string>, 
     length int, 
     views int, 
     rate float, 
     ratings int, 
     comments int,
     relatedId array<string>)
    row format delimited fields terminated by "\t"
    collection items terminated by "&"
    stored as textfile;
    
    
    create table gulivideo_user_ori(
     uploader string,
     videos int,
     friends int)
    row format delimited 
    fields terminated by "\t" 
    stored as textfile;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4.2 需求实现

    4.2.1 统计视频类别热度 Top10

    分析:

    1. 即统计每个类别有多少个视频,求出出包含视频最多的前 10 个类别
    2. 根据类别group by 统计videoId的个数
    SELECT
    	t.category_name,
    	count( t.videoId ) num 
    FROM
    	( SELECT videoId, category_name 
    	  FROM gulivideo_ori lateral VIEW explode ( category ) gulivideo_ori_tmp AS category_name ) t 
    GROUP BY
    	t.category_name 
    ORDER BY
    	num DESC 
    	LIMIT 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    4.2.2 统计出视频观看数最高的 20 个视频的所属类别以及类别包含Top20 视频的个数

    1. 查询观看数最高的20个视频信息
    2. 行转列category
    3. 查询视频分类名称和该分类下有多少个 Top20 的视频
    SELECT
     t2.category_name,
     COUNT(t2.videoId) video_sum
    FROM 
    ( SELECT
    	t1.videoId,
    	category_name 
    FROM
    ( SELECT videoId, views, category 
      FROM gulivideo_ori ORDER BY views DESC LIMIT 20 ) t1 
    	lateral VIEW explode ( t1.category ) t1_tmp AS category_name
    	) t2 
    GROUP BY
    	t2.category_name
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述

    4.2.3 统计视频观看数 Top50 所关联视频的所属类别排序

    1. 查询视频观看数 Top50 所关联视频id-行转列
    2. 查询id对应的 种类 将种类行转列
    3. 统计每个种类的 视频数量
    4. 开窗函数排序
    SELECT
    t6.category_name,
    t6.video_sum,
    rank() over ( ORDER BY t6.video_sum DESC ) rk 
    FROM
    	(
    	SELECT
    		t5.category_name,
    		COUNT( t5.relatedid_id ) video_sum 
    	FROM
    		(
    		SELECT
    			t4.relatedid_id,
    			category_name 
    		FROM
    			(
    			SELECT
    				t2.relatedid_id,
    				t3.category 
    			FROM
    				(
    				SELECT
    					relatedid_id 
    				FROM
    					( SELECT videoId, views, relatedid FROM gulivideo_ori ORDER BY views DESC LIMIT 50 ) t1 lateral VIEW explode ( t1.relatedid ) t1_tmp AS relatedid_id 
    				) t2
    				JOIN gulivideo_orc t3 ON t2.relatedid_id = t3.videoId 
    			) t4 lateral VIEW explode ( t4.category ) t4_tmp AS category_name 
    		) t5 
    	GROUP BY
    		t5.category_name 
    	ORDER BY
    	video_sum DESC 
    	) t6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在这里插入图片描述

    4.2.4 统计每个类别视频观看数 Top3

    SELECT 
     t2.videoId,
     t2.views,
     t2.category_name,
     t2.rk
    FROM 
    (
    SELECT 
     t1.videoId,
     t1.views,
     t1.category_name,
     rank() over(PARTITION BY t1.category_name ORDER BY t1.views DESC ) rk
    FROM 
    (
    SELECT
     videoId,
     views,
     category_name
    FROM gulivideo_ori
    lateral VIEW explode(category) gulivideo_ori_tmp AS category_name
    )t1
    )t2
    WHERE t2.rk <= 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在这里插入图片描述

    4.2.5 统计上传视频最多的用户 Top3以及他们上传的视频观看次数在前 3 的视频

    spark.sql("SELECT  " +
                    " uploader, " +
                    " videos " +
                    "FROM gulivideo_user_ori  " +
                    "ORDER BY  " +
                    " videos " +
                    "DESC " +
                    "LIMIT 3  ").createOrReplaceTempView("t1");
            spark.sql("SELECT  " +
                    " t2.videoId, " +
                    " t2.views, " +
                    " t2.uploader " +
                    "FROM " +
                    "t1 " +
                    "JOIN gulivideo_ori t2  " +
                    "ON t1.uploader = t2.uploader " +
                    "ORDER BY t2.views DESC LIMIT 3").show();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    5. 优化

    5.1 in/exists 效率最低

    测试用例大表100w 小表10w

    select t1.id , t1.uid
            from bigtable1 t1
    where t1.id in (select t.id as id from smalltable t limit 10);
    
    • 1
    • 2
    • 3

    最佳替代

    select t1.id from bigtable1 t1
        left semi join (select t.id as id from smalltable t limit 10) as temp on temp.id = t1.id;
        
    
    • 1
    • 2
    • 3

    对比

    select temp.id from
        (select t.id as id from smalltable t limit 10) as temp
        left semi join bigtable1 t1 on temp.id = t1.id;
        
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    5.2 MapJoin

    MapJoin 是将 Join 双方比较小的表直接分发到各个 Map 进程的内存中,在 Map 进程中进行 Join 操 作,这样就不用进行 Reduce 步骤,从而提高了速度。

    1. 开启 MapJoin 参数设置
    • 设置自动选择 MapJoin
    set hive.auto.convert.join=true; #默认为 true
    
    • 1
    • 大表小表的阈值设置(默认 25M 以下认为是小表)
    set hive.mapjoin.smalltable.filesize=25000000
    
    • 1
    1. demo 测试
      insert overwrite table jointable
      select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
      from smalltable s
      left join bigtable b
      on s.id = b.id
      失效了
      原因:此时小表(左连接)作为主表,所有数据都要写出去,因此此时会走 reduce,mapjoin失效

    5.3 大表与大表之间的join

    • 减少不必要的字段筛选
    • 看能否将where提前 减少表与表之间的关联
    • 建立分桶

    建立分桶测试

    建立原始表

    create table bigtable1(
    	 id bigint,
    	 t bigint,
    	 uid string,
    	 keyword string,
    	 url_rank int,
    	 click_num int,
    	 click_url string)
    row format delimited fields terminated by '\t';
    load data local inpath '/opt/module/data/bigtable' into table bigtable1;
    
    create table bigtable2(
    	 id bigint,
    	 t bigint,
    	 uid string,
    	 keyword string,
    	 url_rank int,
    	 click_num int,
    	 click_url string)
    row format delimited fields terminated by '\t';
    load data local inpath '/opt/module/data/bigtable' into table bigtable1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    建立分桶表

    create table bigtable_buck1(
    	 id bigint,
    	 t bigint,
    	 uid string,
    	 keyword string,
    	 url_rank int,
    	 click_num int,
    	 click_url string)
    	clustered by(id)
    	into 6 buckets
    	row format delimited fields terminated by '\t';
    load data local inpath '/opt/module/data/bigtable' into table 
    bigtable_buck1;
    
    
    create table bigtable_buck2(
    	 id bigint,
    	 t bigint,
    	 uid string,
    	 keyword string,
    	 url_rank int,
    	 click_num int,
    	 click_url string)
    	clustered by(id)
    	into 6 buckets
    	row format delimited fields terminated by '\t';
    load data local inpath '/opt/module/data/bigtable' into table 
    bigtable_buck2;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    原始表速度

    select a.id,a.name,a.age,a.phone,b.id,b.grade,b.address
    from bigtable a
    join bigtable2 b
    on a.id = b.id;
    
    • 1
    • 2
    • 3
    • 4

    分桶表速度

    • 参数设置
    set hive.optimize.bucketmapjoin = true;
    set hive.optimize.bucketmapjoin.sortedmerge = true;
    set 
    hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
    
    • 1
    • 2
    • 3
    • 4
    • sql编写
    insert overwrite table jointable
    select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
    from bigtable_buck1 s
    join bigtable_buck2 b
    on b.id = s.id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    【网络协议】传输层协议
    【API篇】四、Flink物理分区算子API
    找出重复成员
    8.3 数据结构——交换排序
    xdebug 远程调试 vsCode
    Java SE 学习笔记(十四)—— IO流(3)
    pip 问题
    centos7下编译安装freeswitch及常见编译问题的解决
    自动控制原理7.3---z变换理论
    第五章:GC日志分析
  • 原文地址:https://blog.csdn.net/L_994572281_LYA/article/details/126624684