• ClickHouse学习笔记之数据一致性


    背景

    ClickHouse中,即使是对数据一致性支持最好的合并树引擎,也只能保证最终一致性。例如,ReplacingMergeTree对数据的去重只会在数据合并期间进行,合并会在后台一个不确定的时间进行,因此我们不能与先做出计划,从而有一些数据在被读取时可能仍未被处理。尽管我们可以通过optimize语句发起计划外的合并,但那会引发大量的数据IO,因此不要依靠该语句。所以,ReplacingMergeTree适用于后台清除重复数据以节省空间,但不能保证没有重复的数据出现。

    我们在使用ReplacingMergeTreeSummingMergeTree这类表引擎时,会出现短暂的数据不一致的情况。本文介绍一些在某些对一致性非常敏感的场景下,常用的几种解决方法。不过首先,我们要创建测试表和测试数据。

    创建表和测试数据

    创建表:

    CREATE TABLE test_a(
    user_id UInt64,
    score String,
    deleted UInt8 DEFAULT 0,
    create_time DateTime DEFAULT toDateTime(0)
    )ENGINE= ReplacingMergeTree(create_time)
    ORDER BY user_id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    其中:

    • user_id是数据去重的标识;
    • create_time是版本号字段,每组数据中该字段最大的一行为最新的数据;
    • deleted是自定义的标志位,0表示未删除,1表示待删除或已删除。

    然后写入1000万条测试数据:

    INSERT INTO TABLE test_a(user_id,score)
    WITH(
    SELECT ['A','B','C','D','E','F','G']
    )AS dict
    SELECT number AS user_id, dict[number%7+1] FROM numbers(10000000);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    对前50万行插入新数据,不同之处在于新老数据的namecreate_time

    INSERT INTO TABLE test_a(user_id,score,create_time)
    WITH(
    SELECT ['AA','BB','CC','DD','EE','FF','GG']
    )AS dict
    SELECT number AS user_id, dict[number%7+1], now() AS create_time FROM
    numbers(500000);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    统计总数:

    scentos :) select count() from test_a;
    
    SELECT count()
    FROM test_a
    
    Query id: 9a4b0f6d-2bb4-4a01-a880-6c3be531b0b0
    
    ┌──count()─┐
    │ 10500000 │
    └──────────┘
    
    1 rows in set. Elapsed: 0.001 sec.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    此时没有触发分区合并,因此还没有去重。

    手动optimize

    在写入数据后,可以通过立刻执行optimize强制触发新写入分区的合并操作:

    optimize table test_a final;
    
    • 1

    通过group by去重

    执行去重查询:

    SELECT
    user_id ,
    argMax(score, create_time) AS score,
    argMax(deleted, create_time) AS deleted,
    max(create_time) AS ctime
    FROM test_a
    GROUP BY user_id
    HAVING deleted = 0;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    其中用到的argMax(a, b):按照b的最大值取a,在上例中,argMax(score, create_time)即取create_time最大的score,也就是最新的score
    我们把该查询写成视图,方便测试:

    CREATE VIEW view_test_a AS
    SELECT
    user_id ,
    argMax(score, create_time) AS score,
    argMax(deleted, create_time) AS deleted,
    max(create_time) AS ctime
    FROM test_a
    GROUP BY user_id
    HAVING deleted = 0;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    插入重复数据并查询:

    INSERT INTO TABLE test_a(user_id,score,create_time) VALUES(0,'AAAA',now());
    
    SELECT * FROM view_test_a WHERE user_id = 0;
    
    • 1
    • 2
    • 3

    查询结果:

    SELECT *
    FROM view_test_a
    WHERE user_id = 0
    
    Query id: 79f2ac74-f361-436d-b766-3303cdf5d57c
    
    ┌─user_id─┬─score─┬─deleted─┬───────────────ctime─┐
    │       0 │ AAAA  │       02021-12-12 14:02:53 │
    └─────────┴───────┴─────────┴─────────────────────┘
    ↘ Progress: 16.39 thousand rows, 385.05 KB (3.19 million rows/s., 74.91 MB/s.)  
    1 rows in set. Elapsed: 0.005 sec. Processed 16.39 thousand rows, 385.05 KB (3.15 million rows/s., 73.96 MB/s.)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    删除数据并测试,注意我们删除数据的方式是插入一条deleted字段为1的对应数据:

    INSERT INTO TABLE test_a(user_id,score,deleted,create_time) VALUES(0,'AAAA',1,now());
    
    • 1

    再次查询,发现没有数据,因为最新的数据已经被视图中having deleted = 0过滤掉了(having
    先查询再过滤,where是先过滤再查询,因为视图使用了聚合函数,所以只能用having):

    scentos :) select * from view_test_a where user_id = 0;
    
    SELECT *
    FROM view_test_a
    WHERE user_id = 0
    
    Query id: 220834a5-5d28-4d72-9404-d30f8f4c1d1c
    
    → Progress: 16.39 thousand rows, 385.08 KB (4.53 million rows/s., 106.57 MB/s.) Ok.
    
    0 rows in set. Elapsed: 0.004 sec. Processed 16.39 thousand rows, 385.08 KB (4.44 million rows/s., 104.27 MB/s.)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    实际上,该数据(0, AAAA)显然没有被删除,可以测试一下:再加一层视图,专门保存未被删除的数据:

     create view non_deleted_test_a as
               select user_id as user_id, score as score, deleted as deleted, create_time as create_time
               from test_a
               where deleted = 0;
    
    • 1
    • 2
    • 3
    • 4

    然后查询non_deleted_test_a视图中最新的数据,依旧保存为视图:

    create view latest_non_deleted_test_a as select user_id ,
               argMax(score, create_time) AS score,
               argMax(deleted, create_time) AS deleted,
               max(create_time) AS ctime
               FROM non_deleted_test_a
               GROUP BY user_id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    最后通过latest_non_deleted_test_a查询即可:

    scentos :) select * from latest_non_deleted_test_a where user_id = 0;
    
    SELECT *
    FROM latest_non_deleted_test_a
    WHERE user_id = 0
    
    Query id: fa7726b2-3d2c-4e80-a4c4-267552363aba
    
    ┌─user_id─┬─score─┬─deleted─┬───────────────ctime─┐
    │       0 │ AAAA  │       02021-12-12 14:02:53 │
    └─────────┴───────┴─────────┴─────────────────────┘
    ↗ Progress: 16.39 thousand rows, 385.05 KB (3.19 million rows/s., 74.99 MB/s.)  
    1 rows in set. Elapsed: 0.005 sec. Processed 16.39 thousand rows, 385.05 KB (3.15 million rows/s., 74.01 MB/s.)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    所以,方才插入的deleted的数据依旧存在,我们可以考虑结合表级别的TTL最终将物理数据删除

    通过FINAL查询

    在查询语句后增加final修饰符,这样在查询的过程中将会执行合并的特殊逻辑,如去重、预聚合等。但这种方法会导致查询变成单线程执行,大幅降低查询速度,所以早期版本中没人用这种方法。但是在20.5.2.7版本及以后,final查询支持多线程执行,并且可以通过max_final_threads参数控制单个查询的线程数,但是目前读取part部分的动作依然是串行的。
    final查询的实际性能和很多因素有关,如列字段大小、分区数量等,因此要结合实际场景取舍,参考链接

    我们先看看设置查询线程后,普通查询的执行计划:

    scentos :) explain pipeline select * from visits_v1 WHERE StartDate = '2014-03-17'
    :-] limit 100 settings max_threads = 2;
    
    EXPLAIN PIPELINE
    SELECT *
    FROM visits_v1
    WHERE StartDate = '2014-03-17'
    LIMIT 100
    SETTINGS max_threads = 2
    
    Query id: d3ecce50-c53e-48fc-abcd-58c0e4feb4da
    
    ┌─explain─────────────────────────┐
    │ (Expression)                    │
    │ ExpressionTransform × 2         │
    │   (SettingQuotaAndLimits)       │
    │     (Limit)                     │
    │     Limit 22                 │
    │       (ReadFromMergeTree)       │
    │       MergeTreeThread × 2 01 │
    └─────────────────────────────────┘
    
    7 rows in set. Elapsed: 0.009 sec.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    MergeTreeThread × 2 0 → 1即表示用两个合并树线程读取part查询。我们再看看final查询:

    scentos :) explain pipeline select * from visits_v1 final WHERE StartDate = '2014-03-17' limit 100 settings max_final_threads = 2;
    
    EXPLAIN PIPELINE
    SELECT *
    FROM visits_v1
    FINAL
    WHERE StartDate = '2014-03-17'
    LIMIT 100
    SETTINGS max_final_threads = 2
    
    Query id: 66700eb5-dbd4-48e4-9920-f5e952105011
    
    ┌─explain──────────────────────────────────┐
    │ (Expression)                             │
    │ ExpressionTransform × 2                  │
    │   (Limit)                                │
    │   Limit 22                            │
    │     (Filter)                             │
    │     FilterTransform × 2                  │
    │       (SettingQuotaAndLimits)            │
    │         (ReadFromMergeTree)              │
    │         ExpressionTransform × 2          │
    │           CollapsingSortedTransform × 2  │
    │             Copy 12                   │
    │               AddingSelector             │
    │                 ExpressionTransform      │
    │                   MergeTreeInOrder 01 │
    └──────────────────────────────────────────┘
    
    14 rows in set. Elapsed: 0.009 sec.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    可知,CollapsingSortedTransform这一步是两个线程并行,但MergeTreeInOrder读取part部分还是单线程。

  • 相关阅读:
    玩转Mysql系列 - 第17篇:存储过程&自定义函数详解
    实现多彩线条摆出心形
    备忘录APP源码和设计报告
    进程的退出
    EasyExcel单元格数据超过32767报错问题处理
    Java基于微信小程序的校园订餐小程序的研究与实现,附源码
    计算文本相似度的几种方法以及实现原理
    论文分享:Generating Playful Palettes from Images
    java:jvm参数设置
    一致性检验评价方法kappa
  • 原文地址:https://blog.csdn.net/qq_37475168/article/details/127459667