• StarRocks 自增ID实现分页优化


    StarRocks 自增ID实现分页优化

    场景介绍

    目前StarRocks在不支持自增ID的情况下,对于明细模型的分页查询场景,由于要保证每一次分页查询出来的数据的唯一性,需要我们人为去指定order by的列,无法利用到StarRocks自身的排序键等特性,造成分页查询场景下,性能并不是很好。

    有没有一种替代方案能够在外部实现一种自增id,保证每个批次提交的数据都比之前批次的数据的ID大,同时,该ID具有唯一性。并且是一个友好的数据类型(数值型),用来做明细模型的第一列,利用StarRocks的排序键来为分页场景加速。

    当然是有的。

    实现方案

    该方案其实就是利用各种etl工具,例如spark connector,flink connector,datax等等,在数据进入StarRocks之前,做一个新增衍生列的操作,新增一个全局自增的ID,放在第一列,写入到StarRocks中去,当成排序键,用来加速。

    测试用例

    spark connector

    中秋节的时候,社区流木大佬,发布了spark connector,对应链接为:spark connector 支持了读写StarRocks的数据,借此机会,我们使用该connector来实现一个我们的案例,具体对比测试一下,分页查询的场景性能提升。

    测试环境

    测试环境为本地部署的虚拟集群。具体配置如下:

    角色数量使用版本CPU内存磁盘
    fe32.3.04C6G40G
    be32.3.04C6G40G

    三台机器为fe be混布。

    数据准备

    数据为本地造的数据,数据格式为JSON数据,数据结构如下所示:

    {"dept":"8","date2":"2020-06-06 00:00:41","id":"8","date1":"2020-08-01 19:19:03","emp_id":"30999482"}
    
    • 1
    • 数据解释:

      emp_id:是一个 0 到 100000000的随机整数

      date1: 是一个 2020-01-01 到 2021-03-11的随机日期

      date2: 是一个 2020-01-01 到 2021-03-11的随机日期

      id: 是 一个 -1 到 10的随机整数

      dept: 是 一个 -1 到 10的随机整数

    • 建表语句

      CREATE TABLE `no_snow` (
        `emp_id` int(11) NOT NULL DEFAULT "-1" COMMENT "",
        `dept` varchar(65533) NOT NULL COMMENT "",
        `id` int(11) NOT NULL COMMENT "",
          `date1` datetime comment "date1",
          `date2` datetime comment "date2"
      ) ENGINE=OLAP 
      DUPLICATE  KEY(`emp_id`,`dept`,`id`)
      COMMENT "OLAP"
      DISTRIBUTED BY HASH(`emp_id`) BUCKETS 8 
      PROPERTIES (
      "replication_num" = "2",
      "in_memory" = "false",
      "storage_format" = "DEFAULT",
      "enable_persistent_index" = "false"
      );
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

      转换后的表结构为:

      CREATE TABLE `snow` (
          `snow_id` bigint not null comment '',
        `emp_id` int(11) NOT NULL DEFAULT "-1" COMMENT "",
        `dept` varchar(65533) NOT NULL COMMENT "",
        `id` int(11) NOT NULL COMMENT "",
          `date1` datetime comment "date1",
          `date2` datetime comment "date2"
      ) ENGINE=OLAP 
      DUPLICATE KEY(`snow_id`,`emp_id`,`dept`,`id`)
      COMMENT "OLAP"
      DISTRIBUTED BY HASH(`snow_id`,`emp_id`,`dept`,`id`) BUCKETS 8 
      PROPERTIES (
      "replication_num" = "2",
      "in_memory" = "false",
      "storage_format" = "DEFAULT",
      "enable_persistent_index" = "false"
      );
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    编写代码
    • 数据导入

      这里我们准备了一个共计20000000条数据的结果集,数据大小为:2.37G

      这里我们将这一个文件拆分为 7个行数为4000000行的小文件,进行导入

      导入命令如下:

      curl --location-trusted -u root: -H "label:testcdc005" -H "format: json"  -H "jsonpaths:[\"$.emp_id\",\"$.dept\",\"$.id\",\"$.date1\",\"$.date2\"]" -H "ignore_json_size:true" -T ./data.json.04 http://192.168.110.170:8036/api/test/testcdc/_stream_load
      
      • 1
      {
          "TxnId": 8016,
          "Label": "testcdc005",
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 4000000,
          "NumberLoadedRows": 4000000,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 408356148,
          "LoadTimeMs": 9708,
          "BeginTxnTimeMs": 1,
          "StreamLoadPutTimeMs": 6,
          "ReadDataTimeMs": 311,
          "WriteDataTimeMs": 9638,
          "CommitAndPublishTimeMs": 61
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • spark 代码

      spark connector 的依赖加载参考 spark connector这篇文章

      • maven 配置

        <properties>
                <maven.compiler.source>8maven.compiler.source>
                <maven.compiler.target>8maven.compiler.target>
                <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
                <spark.version>2.4.8spark.version>
                <scala.version>2.11scala.version>
                <hadoop.version>2.6.0hadoop.version>
            properties>
            <dependencies>
                <dependency>
                    <groupId>org.apache.sparkgroupId>
                    <artifactId>spark-core_${scala.version}artifactId>
                    <version>${spark.version}version>
                dependency>
        
                <dependency>
                    <groupId>org.apache.sparkgroupId>
                    <artifactId>spark-sql_${scala.version}artifactId>
                    <version>${spark.version}version>
                dependency>
                <dependency>
                    <groupId>com.starrocksgroupId>
                    <artifactId>starrocks-spark2_2.11artifactId>
                    <version>1.0.1version>
                dependency>
                <dependency>
                    <groupId>org.apache.sparkgroupId>
                    <artifactId>spark-streaming_${scala.version}artifactId>
                    <version>${spark.version}version>
                dependency>
                <dependency>
                    <groupId>org.apache.hadoopgroupId>
                    <artifactId>hadoop-clientartifactId>
                    <version>2.7.7version>
                dependency>
                <dependency>
                    <groupId>com.alibabagroupId>
                    <artifactId>fastjsonartifactId>
                    <version>2.0.11version>
                dependency>
                <dependency>
                    <groupId>mysqlgroupId>
                    <artifactId>mysql-connector-javaartifactId>
                    <version>5.1.27version>
                dependency>
            dependencies>
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
      • spark 代码

        import org.apache.spark.sql.{Dataset, SparkSession}
        
        
        object SrTest {
          def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder()
              .appName("sparksql")
              .master("local")
              .getOrCreate()
        
        // read data from sr
            val srReader = spark.read.format("starrocks")
              .option("starrocks.fenodes", "192.168.110.170:8036")
              .option("starrocks.benodes", "192.168.110.170:8046")
              .option("user", "root")
              .option("password", "")
              .option("starrocks.table.identifier", "test.testcdc")
              .load()
            // srReader.show(5)
            val flow = new SnowFlow(1, 1, 1)
            import spark.implicits._
            srReader.show(10)
        //etl
            val resDS: Dataset[(Long, Int, String, Int, String, String)] = srReader.map(x => {
              val emp_id: Int = x.getAs[Int]("emp_id")
              val id: Int = x.getAs[Int]("id")
              val date1: String = x.getAs[String]("date1")
              val date2: String = x.getAs[String]("date2")
              val dept = x.getAs[String]("dept")
              val snowId = flow.nextId()
              (snowId, emp_id, dept, id, date1, date2)
            })
        
            resDS.show(5)
              //write data to sr
            resDS.coalesce(5).toDF("snow_id", "emp_id", "dept", "id", "date1", "date2").write.format("starrocks")
              .option("starrocks.fenodes", "192.168.110.170:8036")
              .option("starrocks.benodes", "192.168.110.170:8046")
              .option("user", "root")
              .option("password", "")
              .option("starrocks.table.identifier", "test.testsnow").save()
          }
        }
        
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44

        这里的代码主要是读取sr数据,然后增加了一个衍生列,写回到sr。

      • 雪花算法代码

        import java.io.Serializable;
        public class SnowFlow implements Serializable {
            //因为二进制里第一个 bit 为如果是 1,那么都是负数,但是我们生成的 id 都是正数,所以第一个 bit 统一都是 0。
        
            //机器ID  2进制5位  32位减掉1位 31个
            private long workerId;
            //机房ID 2进制5位  32位减掉1位 31个
            private long datacenterId;
            //代表一毫秒内生成的多个id的最新序号  12位 4096 -1 = 4095 个
            private long sequence;
            //设置一个时间初始值    2^41 - 1   差不多可以用69年
            private long twepoch = 1585644268888L;
            //5位的机器id
            private long workerIdBits = 5L;
            //5位的机房id;。‘
            private long datacenterIdBits = 5L;
            //每毫秒内产生的id数 2 的 12次方
            private long sequenceBits = 2L;
            // 这个是二进制运算,就是5 bit最多只能有31个数字,也就是说机器id最多只能是32以内
            private long maxWorkerId = -1L ^ (-1L << workerIdBits);
            // 这个是一个意思,就是5 bit最多只能有31个数字,机房id最多只能是32以内
            private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
        
            private long workerIdShift = sequenceBits;
            private long datacenterIdShift = sequenceBits + workerIdBits;
            private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
        
            // -1L 二进制就是1111 1111  为什么?
            // -1 左移12位就是 1111  1111 0000 0000 0000 0000
            // 异或  相同为0 ,不同为1
            // 1111  1111  0000  0000  0000  0000
            // ^
            // 1111  1111  1111  1111  1111  1111
            // 0000 0000 1111 1111 1111 1111 换算成10进制就是4095
            private long sequenceMask = -1L ^ (-1L << sequenceBits);
            //记录产生时间毫秒数,判断是否是同1毫秒
            private long lastTimestamp = -1L;
        
            public long getWorkerId() {
                return workerId;
            }
        
            public long getDatacenterId() {
                return datacenterId;
            }
        
            public long getTimestamp() {
                return System.currentTimeMillis();
            }
        
            public SnowFlow() {
            }
        
            public SnowFlow(long workerId, long datacenterId, long sequence) {
        
                // 检查机房id和机器id是否超过31 不能小于0
                if (workerId > maxWorkerId || workerId < 0) {
                    throw new IllegalArgumentException(
                            String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
                }
        
                if (datacenterId > maxDatacenterId || datacenterId < 0) {
        
                    throw new IllegalArgumentException(
                            String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
                }
                this.workerId = workerId;
                this.datacenterId = datacenterId;
                this.sequence = sequence;
            }
        
            // 这个是核心方法,通过调用nextId()方法,
            // 让当前这台机器上的snowflake算法程序生成一个全局唯一的id
            public synchronized long nextId() {
                // 这儿就是获取当前时间戳,单位是毫秒
                long timestamp = timeGen();
                // 判断是否小于上次时间戳,如果小于的话,就抛出异常
                if (timestamp < lastTimestamp) {
        
                    System.err.printf("clock is moving backwards. Rejecting requests until %d.", lastTimestamp);
                    throw new RuntimeException(
                            String.format("Clock moved backwards. Refusing to generate id for %d milliseconds",
                                    lastTimestamp - timestamp));
                }
        
                // 下面是说假设在同一个毫秒内,又发送了一个请求生成一个id
                // 这个时候就得把seqence序号给递增1,最多就是4096
                if (timestamp == lastTimestamp) {
        
                    // 这个意思是说一个毫秒内最多只能有4096个数字,无论你传递多少进来,
                    //这个位运算保证始终就是在4096这个范围内,避免你自己传递个sequence超过了4096这个范围
                    sequence = (sequence + 1) & sequenceMask;
                    //当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生ID
                    if (sequence == 0) {
                        timestamp = tilNextMillis(lastTimestamp);
                    }
        
                } else {
                    sequence = 0;
                }
                // 这儿记录一下最近一次生成id的时间戳,单位是毫秒
                lastTimestamp = timestamp;
                // 这儿就是最核心的二进制位运算操作,生成一个64bit的id
                // 先将当前时间戳左移,放到41 bit那儿;将机房id左移放到5 bit那儿;将机器id左移放到5 bit那儿;将序号放最后12 bit
                // 最后拼接起来成一个64 bit的二进制数字,转换成10进制就是个long型
                return ((timestamp - twepoch) << timestampLeftShift) |
                        (datacenterId << datacenterIdShift) |
                        (workerId << workerIdShift) | sequence;
            }
        
            /**
             * 当某一毫秒的时间,产生的id数 超过4095,系统会进入等待,直到下一毫秒,系统继续产生ID
             *
             * @param lastTimestamp
             * @return
             */
            private long tilNextMillis(long lastTimestamp) {
        
                long timestamp = timeGen();
        
                while (timestamp <= lastTimestamp) {
                    timestamp = timeGen();
                }
                return timestamp;
            }
        
            //获取当前时间戳
            private long timeGen() {
                return System.currentTimeMillis();
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
        • 49
        • 50
        • 51
        • 52
        • 53
        • 54
        • 55
        • 56
        • 57
        • 58
        • 59
        • 60
        • 61
        • 62
        • 63
        • 64
        • 65
        • 66
        • 67
        • 68
        • 69
        • 70
        • 71
        • 72
        • 73
        • 74
        • 75
        • 76
        • 77
        • 78
        • 79
        • 80
        • 81
        • 82
        • 83
        • 84
        • 85
        • 86
        • 87
        • 88
        • 89
        • 90
        • 91
        • 92
        • 93
        • 94
        • 95
        • 96
        • 97
        • 98
        • 99
        • 100
        • 101
        • 102
        • 103
        • 104
        • 105
        • 106
        • 107
        • 108
        • 109
        • 110
        • 111
        • 112
        • 113
        • 114
        • 115
        • 116
        • 117
        • 118
        • 119
        • 120
        • 121
        • 122
        • 123
        • 124
        • 125
        • 126
        • 127
        • 128
        • 129
        • 130
        • 131
  • 相关阅读:
    redis-cli客户端中获取数据中文显示xe问题
    [重庆思庄每日技术分享]-ORA-16525 dg broker不可用
    13.Pandas怎么实现DataFrame的Mergee
    Debian11系统简单配置
    java八股文易错点(持续更新......)
    怎样给视频添加自定义的封面
    数值分析思考题(钟尓杰版)参考解答——第五章
    ORB-SLAM2 ---- Initializer::Normalize函数
    -bash: ~/anaconda3/bin/python:Invalid argument 问题解决
    图文并茂的帮助文档你值得拥有
  • 原文地址:https://blog.csdn.net/flyinthesky111/article/details/126832394