• Apache Doris的Colocation join本地join实现


    1. 介绍

    让两个表的join,能够分布式的在本地完成,而不用跨节点数据传输。加速查询

    2. 原理

    假设有这样的一条SQL查询:select * from tb1 join tb2 on tb1.key1 = tb2.key1 and tb1.key2 = tb2.key2。为了能够实现分布式的本地join,则两表相同的key1和key2必须位于同一个BE节点,两表不同的key1和key2可以位于不同的BE节点

    因为Doris的Partition只是一个逻辑上的分区,真正影响数据分布在哪一个BE节点的是由Bucket决定的。所以需要两表分桶列的类型和分桶列的数量相同,同时分桶个数也要相同,这样做hash之后,两表相同分桶列的数据会在同一个BE节点。即tb1.key1和tb2.key1类型相同,tb1.key2和tb2.key2类型相同

    为了保证在同一个BE上拥有这两个表的副本,还需要两个表的副本数相同

    在创建tb1和tb2的同时,还需要将两个表通过colocate_with属性划分到一个组Colocation Group(CG),方便管理,CG的Colocation Group Schema(CGS)会记录分桶列类型、分桶数、副本数等元数据信息

    3. CG的创建和删除、表添加到CG的规则

    CG的创建和删除
    CG不存在会自动创建。当一个表drop后,超过回收站的停留时间(默认一天),就会被彻底删除。当CG中的最好一个表被彻底删除,CG也会被自动删除

    表添加到CG的规则
    如果将一个表添加到一个已经存在的CG,会先判断新添加的表是否符合CGS的要求,不符合则不能添加。一个表只能属于一个CG,添加到新的CG,会从原先的CG移除

    4. 使用

    创建两张表

    mysql> create table click(
        -> user_id bigint,
        -> click_date date,
        -> city varchar(32),
        -> url varchar(512)
        -> ) partition by range(click_date) (
        -> partition p1 values less than ('2022-01-01'),
        -> partition p2 values less than ('2023-01-01')
        -> ) distributed by hash(user_id, city) buckets 8
        -> properties (
        -> 'replication_num' = '3',
        -> 'colocate_with' = 'group1'
        -> );
    Query OK, 0 rows affected (0.12 sec)
    
    mysql>
    mysql> create table user_live(
        -> user_id bigint,
        -> city varchar(32),
        -> name varchar(32)
        -> ) unique key(user_id, city)
        -> distributed by hash(user_id, city) buckets 8
        -> properties(
        -> 'replication_num' = '3',
        -> 'colocate_with' = 'group1'
        -> );
    Query OK, 0 rows affected (0.08 sec)
    
    mysql>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    group的名字需要一个database中唯一

    查看查询语句的执行计划

    mysql> explain select * from click a join user_live b on a.user_id = b.user_id and a.city = b.city;
    +----------------------------------------------------------------------------------------------------------------------+
    | Explain String                                                                                                       |
    +----------------------------------------------------------------------------------------------------------------------+
    ......省略部分......
    |   2:VHASH JOIN                                                                                                       |
    |   |  join op: INNER JOIN(COLOCATE[])[]                                                                               |
    |   |  equal join conjunct: `a`.`user_id` = `b`.`user_id`                                                              |
    |   |  equal join conjunct: `a`.`city` = `b`.`city`                                                                    |
    |   |  runtime filters: RF000[in_or_bloom] <- `b`.`user_id`, RF001[in_or_bloom] <- `b`.`city`                          |
    |   |  cardinality=0                                                                                                   |
    |   |  vec output tuple id: 2  |                                                                                       |
    |   |----1:VOlapScanNode                                                                                               |
    |   |       TABLE: user_live(null), PREAGGREGATION: OFF. Reason: null                                                  |
    |   |       PREDICATES: `b`.`__DORIS_DELETE_SIGN__` = 0                                                                |
    |   |       partitions=0/1, tablets=0/0, tabletList=                                                                   |
    |   |       cardinality=0, avgRowSize=41.0, numNodes=1                                                                 |
    |   |                                                                                                                  |
    |   0:VOlapScanNode                                                                                                    |
    ......省略部分......
    24 rows in set (0.04 sec)
    
    mysql> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    join op的join为COLOCATE,表示使用的是colocate join

    5. 查看CG和CG数据分布情况

    查看CG

    mysql> show proc '/colocation_group';
    +-------------+--------------+---------------------+------------+-------------------------+-------------------------+----------+----------+
    | GroupId     | GroupName    | TableIds            | BucketsNum | ReplicaAllocation       | DistCols                | IsStable | ErrorMsg |
    +-------------+--------------+---------------------+------------+-------------------------+-------------------------+----------+----------+
    | 12002.17009 | 12002_group1 | 17007, 17076, 17146 | 8          | tag.location.default: 3 | bigint(20), varchar(32) | true     |          |
    +-------------+--------------+---------------------+------------+-------------------------+-------------------------+----------+----------+
    1 row in set (0.00 sec)
    
    mysql>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    说明:

    • GroupId: 一个Group的全集群唯一标识,前半部分为db id,后半部分为group id
    • GroupName: Group的全名
    • TabletIds: 该Group包含的Table的id列表
    • DistCols: 即分桶列类型
    • IsStable: 该Group是否稳定。当IsStable为false时,表示当前Group内有部分表的tablet正在做修复或迁移,此时相关表的Colocation Join将退化为其它Join

    查看CG的数据分布情况

    mysql> show proc '/colocation_group/12002.17009';
    +-------------+--------------------------+
    | BucketIndex | {"location" : "default"} |
    +-------------+--------------------------+
    | 0           | 10002, 11001, 11002      |
    | 1           | 10002, 11001, 11002      |
    | 2           | 10002, 11001, 11002      |
    | 3           | 11002, 11001, 10002      |
    | 4           | 11002, 10002, 11001      |
    | 5           | 11002, 10002, 11001      |
    | 6           | 11001, 11002, 10002      |
    | 7           | 11002, 11001, 10002      |
    +-------------+--------------------------+
    8 rows in set (0.01 sec)
    
    mysql> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    6. 修改和删除表的colocate_with属性

    修改表的colocate_with属性。这样再join就不能使用colocate join了

    mysql> alter table user_live set ('colocate_with' = 'group2');
    Query OK, 0 rows affected (0.08 sec)
    
    mysql> 
    
    • 1
    • 2
    • 3
    • 4

    删除表的colocate_with属性

    mysql> alter table user_live set ('colocate_with' = '');
    Query OK, 0 rows affected (0.02 sec)
    
    mysql>
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    把Java代理说清楚
    鉴源论坛 · 观辙丨汽车电子架构和CAN网络基础
    使用ICMP协议来判断UDP端口的存活状态
    重建大师创建缓存目录失败,密集匹配失败是什么原因呢
    云端智享——记移动云手写docker-demo
    《虚拟现实技术》教学上机实验报告
    PHP8中获取并删除数组中最后一个元素-PHP8知识详解
    Docker安装Elasticsearch 8.1.3
    21款奔驰GLS400升级电吸门 告别异响
    【Paper】2021_多智能体系统滞后一致性研究_马逸文
  • 原文地址:https://blog.csdn.net/yy8623977/article/details/126159770