• ClickHouse数据一致性


    查询CK手册发现,即便对数据一致性支持最好的Mergetree,也只是保证最终一致性

    我们在使用 ReplacingMergeTree、SummingMergeTree 这类表引擎的时候,会出现短暂数据不一致的情况。

    在某些对一致性非常敏感的场景,通常有以下几种解决方案。

    准备测试表和数据

    (1)创建表

    1. CREATE TABLE test_a(
    2.   user_id UInt64,
    3.   score String,
    4.   deleted UInt8 DEFAULT 0,
    5.   create_time DateTime DEFAULT toDateTime(0)
    6. )ENGINE= ReplacingMergeTree(create_time)
    7. ORDER BY user_id;

    其中:

    user_id 是数据去重更新的标识;

    create_time 是版本号字段,每组数据中 create_time 最大的一行表示最新的数据;

    deleted 是自定的一个标记位,比如 0 代表未删除,1  代表删除数据。

    (2)写入 1000万 测试数据

    1. INSERT INTO TABLE test_a(user_id,score)
    2. WITH(
    3. SELECT ['A','B','C','D','E','F','G']
    4. )AS dict
    5. SELECT number AS user_id, dict[number%7+1FROM numbers(10000000);

    (3)修改前 50万 行数据,修改内容包括 name 字段和 create_time 版本号字段

    1. INSERT INTO TABLE test_a(user_id,score,create_time)
    2. WITH(
    3. SELECT ['AA','BB','CC','DD','EE','FF','GG']
    4. )AS dict
    5. SELECT number AS user_id, dict[number%7+1], now() AS create_time FROM numbers(500000);

    (4)统计总数

    1. SELECT COUNT() FROM test_a;
    2. 10500000

    还未触发分区合并,所以还未去重。

    手动 OPTIMIZE

    在写入数据后,立刻执行OPTIMIZE强制触发新写入分区的合并动作。

    1. OPTIMIZE TABLE test_a FINAL;
    2. 语法:OPTIMIZE TABLE [db.]name [ON CLUSTER cluster] [PARTITION partition | PARTITION ID 'partition_id'] [FINAL] [DEDUPLICATE [BY expression]]

    通过 Group by 去重

    (1)执行去重的查询

    1. SELECT
    2.   user_id ,
    3. argMax(score, create_timeAS score,
    4. argMax(deleted, create_timeAS deleted,
    5. max(create_timeAS ctime 
    6. FROM test_a 
    7. GROUP BY user_id
    8. HAVING deleted = 0;

    函数说明:

    • argMax(field1,field2):按照 field2 的最大值取 field1 的值。

    当我们更新数据时,会写入一行新的数据,例如上面语句中,通过查询最大的 create_time 得到修改后的score字段值。

    (2)创建视图,方便测试

    1. CREATE VIEW view_test_a AS
    2. SELECT
    3.   user_id ,
    4. argMax(score, create_timeAS score,
    5. argMax(deleted, create_timeAS deleted,
    6. max(create_timeAS ctime 
    7. FROM test_a 
    8. GROUP BY user_id
    9. HAVING deleted = 0;

    3)插入重复数据,再次查询

    1. #再次插入一条数据
    2. INSERT INTO TABLE test_a(user_id,score,create_time) VALUES(0,'AAAA',now())
    3. #再次查询
    4. SELECT *
    5. FROM view_test_a
    6. WHERE user_id = 0;

    4)删除数据测试

    1. #再次插入一条标记为删除的数据
    2. INSERT INTO TABLE test_a(user_id,score,deleted,create_time) VALUES(0,'AAAA',1,now());
    3. #再次查询,刚才那条数据看不到了
    4. SELECT *
    5. FROM view_test_a
    6. WHERE user_id = 0;

    这行数据并没有被真正的删除,而是被过滤掉了。在一些合适的场景下,可以结合 表级别的 TTL 最终将物理数据删除。

    通过 FINAL 查询

    在查询语句后增加FINAL修饰符这样在查询的过程中将会执行Merge的特殊逻辑例如数据去重预聚合等

    但是这种方法在早期版本基本没有人使用,因为在增加 FINAL之后,我们的查询将会变成一个单线程的执行过程,查询速度非常慢。

    v20.5.2.7-stable版本中,FINAL查询支持多线程执行,并且可以通过max_final_threads 参数控制单个查询的线程数。但是目前读取part部分的动作依然是串行的。

    FINAL查询最终的性能和很多因素相关,列字段的大小、分区的数量等等都会影响到最终的查询时间,所以还要结合实际场景取舍。

    参考链接:https://github.com/ClickHouse/ClickHouse/pull/10463

    使用hits_v1表进行测试:

    分别安装了20.4.5.36  21.7.3.14 两个版本的ClickHouse进行对比。

    4.1 老版本测试

    1普通查询语句

    select * from visits_v1 WHERE StartDate = '2014-03-17' limit 100;

    2)FINAL查询

    select * from visits_v1 FINAL WHERE StartDate = '2014-03-17' limit 100;

    先前的并行查询变成了单线程。

    4.2 新版本测试

    1)普通语句查询

    select * from visits_v1 WHERE StartDate = '2014-03-17' limit 100 settings max_threads = 2;

    查看执行计划

    explain pipeline select * from visits_v1 WHERE StartDate = '2014-03-17' limit 100 settings max_threads = 2;

    (Expression)                   

    ExpressionTransform × 2        

      (SettingQuotaAndLimits)      

        (Limit)                    

        Limit 2 → 2

          (ReadFromMergeTree)      

    MergeTreeThread × 2 0 → 1

    明显将由2个线程并行读取 part 查询。

    2)FINAL查询

    select * from visits_vfinal WHERE StartDate = '2014-03-17' limit 100  settings max_final_threads = 2;

    查询速度没有普通的查询快,但是相比之前已经有了一些提升,查看 FINAL 查询的执行计划:

    explain pipeline select * from visits_v1 final WHERE StartDate = '2014-03-17' limit 100  settings max_final_threads = 2;

    (Expression)                          

    ExpressionTransform × 2               

      (SettingQuotaAndLimits)             

        (Limit)                           

        Limit 2 → 2                       

          (ReadFromMergeTree)             

          ExpressionTransform × 2         

    CollapsingSortedTransform × 2

              Copy 1 → 2                  

                AddingSelector            

    ExpressionTransform

                    MergeTree 0 → 1       

    CollapsingSortedTransform这一步开始已经是多线程执行,但是读取 part 部分的动作还是串行。

  • 相关阅读:
    char[]、char*与string相互转换——独家原创
    在Node.js中,什么是中间件(middleware)?它们的作用是什么?
    ios 给UIView类控件添加渐变颜色的注意事项
    laravel系列(三) Dcat admin框架工具表单以及普通表单的使用
    图像的梯度
    apache配置https证书问题记录
    海外服务器AS4837和AS4134S线路含义?怎么测试?
    IPv4数据报格式
    Cross-species regulatory sequence activity prediction
    Polygon ID架构
  • 原文地址:https://blog.csdn.net/shangjg03/article/details/134478749