本文档主要介绍 Doris 的建表和数据划分,以及建表操作中可能遇到的问题和解决方法。
数据分片(Sharding)是分布式数据库分而治之 (Divide And Conquer) 这一设计思想的体现。过去的单机数据库在大数据量下往往面临存储和 IO 的限制,而分布式数据库则通过数据划分的规则,将数据打散分布至不同的机器或节点上,形成分布式存储,因此突破了单机存储空间和 IO 的瓶颈、使库表数据量可以无限拓展。
数据分片主要有范围分片或哈希分片这两种方式,而在实际数据库的实现中,往往呈现为分区和分桶两种形式。分区一般是按照时间或其他连续值对数据进行划分,在执行查询操作时可以通过分区裁剪过滤不必要的范围扫描,提升执行效率,同时也使得对分区数据的增删改等管理操作更为便捷。而分桶则是按照某个关键字执行哈希运算,将相同哈希值的数据放到一起,这样可以有效定位数据、避免数据倾斜。
在 Doris 中,数据都以表(Table)的形式进行逻辑上的描述。
一张表包括行(Row)和列(Column):
Row:即用户的一行数据;
Column: 用于描述一行数据中不同的字段。
Column 可以分为两大类:Key 和 Value。从业务角度看,Key 和 Value 可以分别对应维度列和指标列。Doris的key列是建表语句中指定的列,建表语句中的关键字’unique key’或’aggregate key’或’duplicate key’后面的列就是key列,除了key列剩下的就是value列。从聚合模型的角度来说,Key 列相同的行,会聚合成一行。其中 Value 列的聚合方式由用户在建表时指定。关于更多聚合模型的介绍,可以参阅 Doris 数据模型。
在 Doris 的存储引擎中,用户数据被水平划分为若干个数据分片(Tablet,也称作数据分桶)。每个 Tablet 包含若干数据行。各个 Tablet 之间的数据没有交集,并且在物理上是独立存储的。
多个 Tablet 在逻辑上归属于不同的分区(Partition)。一个 Tablet 只属于一个 Partition。而一个 Partition 包含若干个 Tablet。因为 Tablet 在物理上是独立存储的,所以可以视为 Partition 在物理上也是独立。Tablet 是数据移动、复制等操作的最小物理存储单元。
若干个 Partition 组成一个 Table。Partition 可以视为是逻辑上最小的管理单元。数据的导入与删除,仅能针对一个 Partition 进行。
我们以一个建表操作来说明 Doris 的数据划分。
Doris 的建表是一个同步命令,SQL执行完成即返回结果,命令返回成功即表示建表成功。也可以通过 HELP CREATE TABLE; 查看更多帮助。
本小节通过一个例子,来介绍 Doris 的建表方式。
-- Range Partition
CREATE TABLE IF NOT EXISTS example_db.example_range_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "3",
"storage_medium" = "SSD",
"storage_cooldown_time" = "2018-01-01 12:00:00"
);
这里我们只以 AGGREGATE KEY 数据模型为例进行说明。更多数据模型参阅Doris 数据模型
列的基本类型,可以通过在 mysql-client 中执行 HELP CREATE TABLE; 查看。
AGGREGATE KEY 数据模型中,所有没有指定聚合方式(SUM、REPLACE、MAX、MIN)的列视为 Key 列。而其余则为 Value 列。
定义列时,可参照如下建议:
Doris 支持两层的数据划分。第一层是 Partition,支持 Range 和 List 的划分方式。第二层是 Bucket(Tablet),支持 Hash 和 Random 的划分方式。
也可以仅使用一层分区,建表时如果不写分区的语句即可,此时Doris会生成一个默认的分区,对用户是透明的。使用一层分区时,只支持 Bucket 划分。
VALUES LESS THAN (...) 仅指定上界,系统会将前一个分区的上界作为该分区的下界,生成一个左闭右开的区间。也支持通过 VALUES [...) 指定上下界,生成一个左闭右开的区间。 同时,也支持通过FROM(...) TO (...) INTERVAL ... 来批量创建分区。
通过 VALUES [...) 同时指定上下界比较容易理解。这里举例说明,当使用 VALUES LESS THAN (...) 语句进行分区的增删操作时,分区范围的变化情况:
如上 example_range_tbl 示例,当建表完成后,会自动生成如下3个分区:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201703: [2017-03-01, 2017-04-01)
当我们增加一个分区 p201705 VALUES LESS THAN (“2017-06-01”),分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201703: [2017-03-01, 2017-04-01)
p201705: [2017-04-01, 2017-06-01)
此时我们删除分区 p201703,则分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201705: [2017-04-01, 2017-06-01)
注意到 p201702 和 p201705 的分区范围并没有发生变化,而这两个分区之间,出现了一个空洞:[2017-03-01, 2017-04-01)。即如果导入的数据范围在这个空洞范围内,是无法导入的。
继续删除分区 p201702,分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201705: [2017-04-01, 2017-06-01)
空洞范围变为:[2017-02-01, 2017-04-01)
现在增加一个分区 p201702new VALUES LESS THAN (“2017-03-01”),分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702new: [2017-02-01, 2017-03-01)
p201705: [2017-04-01, 2017-06-01)
可以看到空洞范围缩小为:[2017-03-01, 2017-04-01)
现在删除分区 p201701,并添加分区 p201612 VALUES LESS THAN (“2017-01-01”),分区结果如下:
p201612: [MIN_VALUE, 2017-01-01)
p201702new: [2017-02-01, 2017-03-01)
p201705: [2017-04-01, 2017-06-01)
即出现了一个新的空洞:[2017-01-01, 2017-02-01)
综上,分区的删除不会改变已存在分区的范围。删除分区可能出现空洞。通过 VALUES LESS THAN 语句增加分区时,分区的下界紧接上一个分区的上界。
Range分区除了上述我们看到的单列分区,也支持多列分区,示例如下:
PARTITION BY RANGE(`date`, `id`)
(
PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"),
PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"),
PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01")
)
在以上示例中,我们指定 date(DATE 类型) 和 id(INT 类型) 作为分区列。以上示例最终得到的分区如下:
p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") )
p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") )
p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE))
注意,最后一个分区用户缺省只指定了 date 列的分区值,所以 id 列的分区值会默认填充 MIN_VALUE。当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:
数据 --> 分区
2017-01-01, 200 --> p201701_1000
2017-01-01, 2000 --> p201701_1000
2017-02-01, 100 --> p201701_1000
2017-02-01, 2000 --> p201702_2000
2017-02-15, 5000 --> p201702_2000
2017-03-01, 2000 --> p201703_all
2017-03-10, 1 --> p201703_all
2017-04-01, 1000 --> 无法导入
2017-05-01, 1000 --> 无法导入
Range分区同样支持批量分区, 通过语句 FROM ("2022-01-03") TO ("2022-01-06") INTERVAL 1 DAY 批量创建按天划分 的分区:2022-01-03到2022-01-06(不含2022-01-06日),分区结果如下:
p20220103: [2022-01-03, 2022-01-04)
p20220104: [2022-01-04, 2022-01-05)
p20220105: [2022-01-05, 2022-01-06)
Range 除了支持Range 分区之外,也支持List 分区,我们通过使 VALUES IN 进行确定
-- List Partition
CREATE TABLE IF NOT EXISTS example_db.example_list_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) NOT NULL COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY LIST(`city`)
(
PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"),
PARTITION `p_usa` VALUES IN ("New York", "San Francisco"),
PARTITION `p_jp` VALUES IN ("Tokyo")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "3",
"storage_medium" = "SSD",
"storage_cooldown_time" = "2018-01-01 12:00:00"
);
BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME, CHAR, VARCHAR 数据类型,分区值为枚举值。只有当数据为目标分区枚举值其中之一时,才可以命中分区。VALUES IN (...) 来指定每个分区包含的枚举值。下面通过示例说明,进行分区的增删操作时,分区的变化。
如上 example_list_tbl 示例,当建表完成后,会自动生成如下3个分区:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_jp: ("Tokyo")
当我们增加一个分区 p_uk VALUES IN (“London”),分区结果如下:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_jp: ("Tokyo")
p_uk: ("London")
当我们删除分区 p_jp,分区结果如下:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_uk: ("London")
List分区也支持多列分区,示例如下:
PARTITION BY LIST(`id`, `city`)
(
PARTITION `p1_city` VALUES IN (("1", "Beijing"), ("1", "Shanghai")),
PARTITION `p2_city` VALUES IN (("2", "Beijing"), ("2", "Shanghai")),
PARTITION `p3_city` VALUES IN (("3", "Beijing"), ("3", "Shanghai"))
)
在以上示例中,我们指定 id(INT 类型) 和 city(VARCHAR 类型) 作为分区列。以上示例最终得到的分区如下:
p1_city: [("1", "Beijing"), ("1", "Shanghai")]
p2_city: [("2", "Beijing"), ("2", "Shanghai")]
p3_city: [("3", "Beijing"), ("3", "Shanghai")]
当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:
数据 ---> 分区
1, Beijing ---> p1_city
1, Shanghai ---> p1_city
2, Shanghai ---> p2_city
3, Beijing ---> p3_city
1, Tianjin ---> 无法导入
4, Beijing ---> 无法导入
以下场景推荐使用复合分区
用户也可以不使用复合分区,即使用单分区。则数据只做 HASH 分布。
上述静态的分区需要手动指定边界,分区个数太多使用起来也不方便。动态 Range Partition 帮助我们解决了这个问题,只需指定一些分区的参数即可动态创建,PARTITION_DESC 相对更简单,只需指定哪个列作为分区列即可:
PARTITION BY RANGE(`date`)
剩余参数需要在PROPERTIES进行配置:
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.create_history_partition"="true",
"replication_num" = "1"
);

批量创建分区功能在前期充分调研了用户的需求,本着简洁、强大、易用的设计目标,将设计核心锁定在几个要素中:
假设其数据包含从几年前直到现在的全量信息,想要将十年内的数据按每一天一个分区进行创建。在批量分区功能中,PARTITION_DESC只需要一句,并且不用在PARTITION中设置分区相关参数:
-- 当然,分区创建个数受到max_multi_partition_num参数控制,该值默认为4096,有需求可以修改
PARTITION BY RANGE(`date`)
(
FROM ("2013-01-01") TO ("2023-01-01") INTERVAL 1 DAY
)
从这个 case 来看,批量分区功能的语法更为简洁,但该功能的易用性和灵活性远不止于此。
假设有另一批数据:公司前几年的数据量较大且为冷数据,故可以将一年的数据合到一个分区里面;而后来因为业务迅速发展,需要将每一月的数据作为一个分区;随着公司业务进一步发展,按月分区已经不能满足快速增长的数据需求,需要按周进行分区;……;时至今日,公司每天产生海量数据,可能需要按小时分区才能符合需求。根据这个场景,不难写出批量分区创建的 PARTITION_DESC:
-- 此处需要注意,如果要使用小时级别的分区,则分区列必须是datetime类型
-- 同样的,分区创建个数也受到max_multi_partition_num参数控制
PARTITION BY RANGE(sdate)
(
FROM ("2000-01-01") TO ("2021-01-01") INTERVAL 1 YEAR,
FROM ("2021-01-01") TO ("2022-01-01") INTERVAL 1 MONTH,
FROM ("2022-01-01") TO ("2023-01-01") INTERVAL 1 WEEK,
FROM ("2023-01-01") TO ("2023-02-01") INTERVAL 1 DAY,
FROM ("2023-02-01 00") TO ("2099-12-31 23") INTERVAL 1 HOUR
)
除了上述不同时间粒度的分区可以灵活组合外,还可以将静态 Range Partition 和批量分区功能结合起来。例如需要将该公司 2022-01-01 到 2023-01-01 的数据按天创建分区,2022-01-01 之前的数据归到一个名为"pold"分区中,我们可以将静态分区和批量分区组合起来,PARTITION_DESC如下:
PARTITION BY RANGE(sdate)
(
PARTITION pold VALUES LESS THAN ("2022-01-01"),
FROM ("2022-01-01") TO ("2023-01-01") INTERVAL 1 DAY
)
批量分区创建功能支持不同时间粒度,其语法简洁有力,且各种类型分区可以灵活组合,在面对大量历史分区和部分特殊分区的需求时,该功能显得游刃有余,可以极大提高开发效率。
CREATE TABLE `dwd_user_portrait_media_df` (
`t_date` DATE NOT NULL COMMENT '日期',
`user_id` BIGINT NOT NULL COMMENT 'user_id',
`opus_cnt` int NULL COMMENT '发帖数',
`last_opus_created_time` datetime NULL COMMENT '最后一次发帖时间',
`first_opus_created_time` datetime NULL COMMENT '首次发帖时间',
`first_access_created_time` datetime NULL COMMENT '首次访问时间',
`last_access_created_time` datetime NULL COMMENT '最好一次访问时间',
`avg_duration_time` int NULL COMMENT '平均访问时长',
`total_duration_time` int NULL COMMENT '总共访问时长',
`access_dates` int NULL COMMENT '访问日期数'
) ENGINE=OLAP
UNIQUE KEY(`t_date`,`user_id`)
COMMENT '用户画像基础表'
PARTITION BY RANGE(`t_date`)()
DISTRIBUTED BY HASH(`t_date`,`user_id`) BUCKETS 1
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-100",
"dynamic_partition.end" = "3",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "10",
"dynamic_partition.prefix" = "p"
)
如果在较长的建表语句中出现语法错误,可能会出现语法错误提示不全的现象。这里罗列可能的语法错误供手动纠错:
HELP CREATE TABLE;,检查相关语法结构。Failed to create partition [xxx] . Timeout
Doris 建表是按照 Partition 粒度依次创建的。当一个 Partition 创建失败时,可能会报这个错误。即使不使用 Partition,当建表出现问题时,也会报 Failed to create partition,因为如前文所述,Doris 会为没有指定 Partition 的表创建一个不可更改的默认的 Partition。
当遇到这个错误是,通常是 BE 在创建数据分片时遇到了问题。可以参照以下步骤排查:
Failed to create partition 日志。在该日志中,会出现一系列类似 {10001-10010} 字样的数字对。数字对的第一个数字表示 Backend ID,第二个数字表示 Tablet ID。如上这个数字对,表示 ID 为 10001 的 Backend 上,创建 ID 为 10010 的 Tablet 失败了。Too many open files。打开的文件句柄数超过了 Linux 系统限制。需修改 Linux 系统的句柄数限制。如果创建数据分片时超时,也可以通过在 fe.conf 中设置 tablet_create_timeout_second=xxx 以及 max_create_table_timeout_second=xxx 来延长超时时间。其中 tablet_create_timeout_second 默认是1秒, max_create_table_timeout_second 默认是60秒,总体的超时时间为min(tablet_create_timeout_second * replication_num, max_create_table_timeout_second),具体参数设置可参阅 FE配置项
建表命令长时间不返回结果。
Doris 的建表命令是同步命令。该命令的超时时间目前设置的比较简单,即(tablet num * replication num)秒。如果创建较多的数据分片,并且其中有分片创建失败,则可能导致等待较长超时后,才会返回错误。
正常情况下,建表语句会在几秒或十几秒内返回。如果超过一分钟,建议直接取消掉这个操作,前往 FE 或 BE 的日志查看相关错误。