• 时序数据库 InfluxDB


    目录

    一、介绍

    二、安装

    三、inflxudb保留字

    四、基本语法

    1、客户端操作

    1. 数据库操作

    2. 数据表和数据操作

    3. series 操作

    4.Shard 

    5. 用户操作

    2、API操作

    状态码

    3、Java操作

    五、常用函数 

    六、存储策略

    1.查看策略

    2.创建策略

    3、修改策略

    4. 删除

    七、目录与文件结构

    八、数据备份

    1、备份和恢复DB数据

    一、DB备份

    二、DB恢复

    二、备份和恢复元数据

    1、备份元数据

    2、恢复元数据


    一、介绍

           InfluxDB 是一个时间序列数据库,GO 编写的,旨在处理高写入和查询负载。InfluxDB 旨在用作涉及大量时间戳数据的任何用例的后备存储,包括 DevOps 监控、应用程序指标、物联网传感器数据和实时分析。

    特点:

    • 专门为时间序列数据编写的自定义高性能数据存储。TSM 引擎允许高速摄取和数据压缩
    • 完全用 Go 编写。它编译成一个没有外部依赖性的二进制文件。
    • 简单、高性能的写入和查询 HTTP API。
    • 插件支持其他数据摄取协议,例如 Graphite、collectd 和 OpenTSDB。
    • 专为轻松查询聚合数据而定制的类似 SQL 的表达查询语言。
    • 标签允许为系列建立索引以实现快速高效的查询。
    • 保留策略有效地自动使陈旧数据过期。
    • 连续查询自动计算聚合数据,使频繁查询更高效。

    InfluxDB有三大特性: 
        Time Series (时间序列):你可以使用与时间有关的相关函数(如最大,最小,求和等)        
        Metrics(度量):你可以实时对大量数据进行计算 
        Eevents(事件):它支持任意的事件数据

    InfluxDB 提供三种操作方式:

    1. 客户端命令行方式
    2. HTTP API 接口
    3. 各语言 API 库

    InfluxDB 和传统数据库(如:MySQL)区别:

    InfluxDB传统数据库中的概念
    database数据库
    measurement数据库中的表
    points每个表里某个时刻的某个条件下的一个 field 的数据,因为体现在图表上就是一个点,于是将其称为 point。Point 由时间戳(time)、数据(field)、标签(tags)组成
    series序列,所有在数据库中的数据,都需要通过图表来展示,而这个 series 表示这个表里面的数据,可以在图表上画成几条线。具体可以通过 SHOW SERIES FROM "表名" 进行查询
    Retention policy数据保留策略,可以定义数据保留的时长,每个数据库可以有多个数据保留策略,但只能有一个默认策略

    注意:​​point​​​由3部分组成​​time+fields+tags​​。

    Point 属性传统数据库中的概念
    time每行记录都有一列time,主索引,记录时间戳,单位纳秒,时区UTC(东八区减8小时)
    fields普通列,key-value结构,value数据类型支持型(float、integer、string、boolean)
    tags索引列,key-value结构,value数据类型只支持string

    说明

    • 在插入新数据时,tag、field 和 time之间用空格分隔 。
    • fields和tags key名称严格区分大小写。

    ​fields数据类型:

    类型备注
    floatinfluxdb的fields默认是float浮点型
    integer整型,insert语句如需写入field是整型,需在数值后面加个i
    string字符串,insert语句如需写入field是字符串,需英文双引号包含数值
    boolean布尔型,真可以用 t , T , true , True , TRUE表示;假可以用 f , F , false , False 或者 FALSE表示

    注意

            在influxdb中,字段必须存在。因为字段是没有索引的。如果使用字段作为查询条件,会扫描符合查询条件的所有字段值,性能不及tag。tags是可选的,但是强烈建议你用上它,因为tag是有索引的,tags相当于SQL中的有索引的列。tag value只能是string类型。

    端口

    • 8083:Web admin 管理服务的端口, http://localhost:8083
    • 8086:HTTP API 的端口
    • 8088:集群端口 (目前还不是很清楚, 配置在全局的 bind-address,默认不配置就是开启的)

    二、安装

    docker pull influxdb:1.8.3
    docker run -d -p 18083:8083 -p 18088:8088 -p 18086:8086 --name my_influxdb influxdb:1.8.3
    docker container update --restart=always my_influxdb
    

    可视化工具:

    1、web界面 

    2、influxDB Studio 

    客户端为绿色版,下载解压打开即可。

    下载地址:Releases · CymaticLabs/InfluxDBStudio

    案例数据:

    curl https://s3.amazonaws.com/noaa.water-database/NOAA_data.txt -o NOAA_data.txt 

    导入数据: 

    influx -import -path=NOAA_data.txt -precision=s -database=NOAA_water_database 

    三、inflxudb保留字

     

    四、基本语法

    docker exec -it my_influxdb  bash
    cd /etc/influxdb/
    influx 

    1、客户端操作

    1. 数据库操作

    # 查看所有数据库

    show databases;

    # 建库

    create database dbname;

    # 删库

    drop database daname

    # 切换使用数据库

    use dbname

    2. 数据表和数据操作

    查看所有表

    show measurements

    数据中的时间字段默认显示的是一个纳秒时间戳,改成可读格式。时间默认是UTC时间

    precision rfc3339; # 之后再查询,时间就是rfc3339标准格式 
    # 或可以在连接数据库的时候,直接带该参数
    influx -precision rfc3339

    查看一个measurement中所有的tag key

    show tag keys

     查看一个measurement中所有的field key

    show field keys

    查看一个measurement中所有的保存策略(可以有多个,一个标识为default)

     show retention policies;

    插入数据

            标准格式,注意在写数据的时候如果不添加时间戳,系统会默认添加一个时间。InfluxDB 中没有显式的新建表的语句,只能通过 insert 数据的方式来建立新表。

    insert [,=...] =[,=...] [unix-nano-timestamp]
    
    insert 表名,tags fileds;

    #插入fields,和tags.

    insert student,a=1,b=2,c=3 name="sa",age=12,emial="xxx@163.com";
    #只插入​​fields​​​,可以没有​​tags
    insert student name="sa",age=12,emial="xxx@163.com";

    删表

    drop measurement student

     查询 

    SELECT field keys [time | tag kyes | * ] FROM measurements WHERE conditions GROUP BY [tag keys | time] ORDER BY time [asc | desc] LIMIT number [OFFSET number]

    1. SELECT​​后面查询显示字段​必须至少有一个​​field key​​​,否则会抛异常​​ERR: at least 1 non-time field must be queried​​​。还可显示​​tag keys​​​、​​time​​​,或者​​*​​显示所有字段。
    2. ​​FROM​​​查询数据来源一个或者多个​​measurement​​。
    3. ​​WHERE​​​查询条件可为​​tag keys​​​和​​time​​​,​​field keys​​​ 也可作为查询条件但是不常用,因为不是索引,查询效率比较低。​​tag keys​​​条件操作符支持​​=、!=、<>、正则​​​,​​field keys​​​支持​​=、!=、<>、>、>=、<、<=、正则​​。
    4. ​​GROUP BY​​​只能对​​tag keys​​​和​​time​​​进行合分组,可以多字段排序,如​​group by tag1,tag2,time​​​,也可一个​​*​​​对所有的​​tag​​​进行分组聚合(不包括​​time​​)。
    5. ​​ORDER BY​​​只能对​​time​​​进行排序,​​asc​​​升序,​​desc​​降序。
    6. ​​LIMIT​​​分页,​​LIMIT​​​后面的数字是查询显示多少条,​​OFFSET​​​后面数字代表偏移量(从0开始代表第一条),如​​limit 10 offset 10​​意思为从第11条开始往后10条。

    正则 

    #每个表输出一行(支持 Go 语言的正则表达式、支持类似于 MySQL 中的 limit 语句)

    SELECT * FROM /.*/ LIMIT 1

    limit

    limit​​​可单独使用,也可配合​​offset​​​使用,​​offset​​偏移量的意思

    select * from student limit 2;

    排序

    order by​​​ 只能对​​time​​​进行排序,​​asc​​​升序,​​desc​​降序。

    select * from student  order by time desc;

    分组

    SELECT () FROM_clause WHERE GROUP BY time(time_interval,[)] 

    (1)group by tag

    select * from h2o_pH group by location limit 2

    (2)group by time(1m) 

            对​​time​​​分组时并不是简单的​​group by time​​​,​​time​​​后面还需要加一个分组聚合的持续时间,如​​group by time(1m)​​。支持的持续时间单位有:

    (3)fill(0)填充null
            可以看到​​values​​​结果集中有​​null​​​的情况,可在查询语句中加​​fill(0)​​​,遇到​​null​​​用​​0​​​来填充。​​fill()​​中只能填数字。

    条件查询

    select * from student where a='1' and age=12; # a字段是tag :字符串


    select sum(age) as ageNums from student where time < now() group by a order by time desc;

    格式化显示查询数据

    format json
    select * from student where a='1' and age=12;

    模糊查询(正则表达式

    # location名称中包含coyote的数据
    select * from h2o_quality where location=~/.*coyote.*/ limit 10

    没有in还有or

    select * from h2o_pH where location='coyote_creek' or location='santa_monica' limit 4

    from多表查询 

            多表查询,以时间进行连接,不存在的值用null填充。一般情况连接的两张表tag和field上有一定的联系和比较,毫不相干的两表连接查询没什么价值。

    select * from h2o_feet,h2o_pH limit 1

    注意:

    1.field tag是关键字 如果字段名是这两个,则需要加上单引号。

    2.tags​​​之间用逗号分隔,​​fields​​之间用逗号分隔,​​​tags​​和​​fields​​之间用空格分隔​。

    3.tags​​​都是字符串类型,但是不用双引号括起来;​​fields​​中有字符串类型需要用​英文双引号​括起来,如果不用英文双引号,会报语法错误​​invalid boolean​​,会认为是无效的布尔值,因为布尔类型无需加双引号。

    4.tags​​​中设置布尔值就是字符串,​​fields​​​中有布尔值,可用 ​​t , T , true , True , TRUE,f , F , false , False​​表示。

    3. series 操作

            series 相当于是 InfluxDB 中一些数据的集合,在同一个 database 中,retention policy、measurement、tag sets 完全相同的数据同属于一个 series,同一个 series 的数据在物理上会按照时间顺序排列存储在一起。

    ​​shard​​​主要由4部分组成​​Cache​​​、​​Wal​​​、​​Tsm file​​​、​​Compactor​​。

    #series 表示这个表里面的数据,可以在图表上画成几条线,series 主要通过 tags 排列组合算出来。

    show series from student;

    4.Shard 

            shard 在 InfluxDB 中是一个比较重要的概念,它和 retention policy 相关联。每一个存储策略下会存在许多 shard,每一个 shard 存储一个指定时间段内的数据,并且不重复,例如 7点-8点 的数据落入 shard0 中,8点-9点的数据则落入 shard1 中。每一个 shard 都对应一个底层的 tsm 存储引擎,有独立的 cache、wal、tsm file。

            如果创建一个新的 retention policy 设置数据的保留时间为 1 天,则单个 shard 所存储数据的时间间隔为 1 小时,超过1个小时的数据会被存放到下一个 shard 中。

    5. 用户操作

    # 显示用户
    SHOW USERS
    # 创建用户
    CREATE USER "username" WITH PASSWORD 'password'
    # 创建管理员权限的用户 

    #influxdb 的权限设置比较简单,只有读、写、ALL 几种。
    CREATE USER "username" WITH PASSWORD 'password' WITH ALL PRIVILEGES
    # 删除用户
    DROP USER "username"

    2、API操作

    状态码

    • 2xx:服务请求正常
    • 4xx:代表请求语法有问题
    • 5xx:服务端出问题,导致超时等故障

    官方地址

    3、Java操作

    1. org.influxdb
    2. influxdb-java
    3. 2.23
    1. spring:
    2. application:
    3. name: spring-lean-influxdb
    4. influx:
    5. url: http://192.168.56.22:18086
    6. user:
    7. password:
    8. database: test
    1. import lombok.Data;
    2. import org.springframework.beans.factory.annotation.Value;
    3. import org.springframework.context.annotation.Configuration;
    4. @Data
    5. @Configuration
    6. public class InfluxDBConfig {
    7. @Value("${spring.influx.user}")
    8. public String userName;
    9. @Value("${spring.influx.password}")
    10. public String password;
    11. @Value("${spring.influx.url}")
    12. public String url;
    13. //数据库
    14. @Value("${spring.influx.database}")
    15. public String database;
    16. }
    1. import lombok.Builder;
    2. import lombok.Data;
    3. import org.influxdb.annotation.Column;
    4. import org.influxdb.annotation.Measurement;
    5. import org.influxdb.annotation.TimeColumn;
    6. import java.util.Date;
    7. @Builder
    8. @Data
    9. @Measurement(name = "t_log")
    10. public class LogInfo {
    11. // Column中的name为measurement中的列名
    12. // 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
    13. @Column(name = "time")
    14. private String time;
    15. // 注解中添加tag = true,表示当前字段内容为tag内容
    16. @Column(name = "module", tag = true)
    17. private String module;
    18. @Column(name = "level", tag = true)
    19. private String level;
    20. @Column(name = "device_id", tag = true)
    21. private String deviceId;
    22. @Column(name = "msg")
    23. private String msg;
    24. @TimeColumn
    25. private Date createTime;
    26. }
    1. import com.lean.influxdb.config.InfluxDBConfig;
    2. import org.influxdb.InfluxDB;
    3. import org.influxdb.InfluxDBFactory;
    4. import org.influxdb.dto.BatchPoints;
    5. import org.influxdb.dto.Point;
    6. import org.influxdb.dto.Query;
    7. import org.influxdb.dto.QueryResult;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.stereotype.Service;
    10. import org.springframework.util.StringUtils;
    11. import javax.annotation.PostConstruct;
    12. import java.util.ArrayList;
    13. import java.util.HashMap;
    14. import java.util.List;
    15. import java.util.Map;
    16. import java.util.concurrent.TimeUnit;
    17. @Service
    18. public class InfluxDBService {
    19. @Autowired
    20. private InfluxDBConfig influxDBConfig;
    21. @PostConstruct
    22. public void initInfluxDb() {
    23. this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
    24. this.influxDB = influxDbBuild();
    25. }
    26. //保留策略
    27. private String retentionPolicy;
    28. private InfluxDB influxDB;
    29. /**
    30. * 连接时序数据库;获得InfluxDB
    31. **/
    32. private InfluxDB influxDbBuild() {
    33. if (influxDB == null) {
    34. if(StringUtils.isEmpty(influxDBConfig.userName)){
    35. influxDB = InfluxDBFactory.connect(influxDBConfig.url);
    36. }else {
    37. influxDB = InfluxDBFactory.connect(influxDBConfig.url, influxDBConfig.userName, influxDBConfig.password);
    38. }
    39. influxDB.setDatabase(influxDBConfig.database);
    40. }
    41. return influxDB;
    42. }
    43. /**
    44. * 插入
    45. */
    46. public void insertPoint(Point point) {
    47. influxDbBuild();
    48. influxDB.write(point);
    49. }
    50. /**
    51. * 插入
    52. *
    53. * @param measurement 表
    54. * @param tags 标签
    55. * @param fields 字段
    56. */
    57. public void insert(String measurement, Map tags, Map fields) {
    58. influxDbBuild();
    59. Point.Builder builder = Point.measurement(measurement);
    60. builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    61. builder.tag(tags);
    62. builder.fields(fields);
    63. influxDB.write(influxDBConfig.database, "", builder.build());
    64. }
    65. /**
    66. * @param measurement
    67. * @param time
    68. * @param tags
    69. * @param fields
    70. * @return void
    71. * @desc 插入, 带时间time
    72. */
    73. public void insert(String measurement, long time, Map tags, Map fields) {
    74. influxDbBuild();
    75. Point.Builder builder = Point.measurement(measurement);
    76. builder.time(time, TimeUnit.MILLISECONDS);
    77. builder.tag(tags);
    78. builder.fields(fields);
    79. influxDB.write(influxDBConfig.database, "", builder.build());
    80. }
    81. /**
    82. * 查询
    83. *
    84. * @param command 查询语句
    85. * @return
    86. */
    87. public QueryResult query(String command) {
    88. influxDbBuild();
    89. return influxDB.query(new Query(command, influxDBConfig.database));
    90. }
    91. /**
    92. * @param queryResult
    93. * @desc 查询结果处理
    94. */
    95. public List> queryResultProcess(QueryResult queryResult) {
    96. List> mapList = new ArrayList<>();
    97. List resultList = queryResult.getResults();
    98. //把查询出的结果集转换成对应的实体对象,聚合成list
    99. for (QueryResult.Result query : resultList) {
    100. List seriesList = query.getSeries();
    101. if (seriesList != null && seriesList.size() != 0) {
    102. for (QueryResult.Series series : seriesList) {
    103. List columns = series.getColumns();
    104. String[] keys = columns.toArray(new String[columns.size()]);
    105. List> values = series.getValues();
    106. if (values != null && values.size() != 0) {
    107. for (List value : values) {
    108. Map map = new HashMap(keys.length);
    109. for (int i = 0; i < keys.length; i++) {
    110. map.put(keys[i], value.get(i));
    111. }
    112. mapList.add(map);
    113. }
    114. }
    115. }
    116. }
    117. }
    118. return mapList;
    119. }
    120. /**
    121. * @desc InfluxDB 查询 count总条数
    122. */
    123. public long countResultProcess(QueryResult queryResult) {
    124. long count = 0;
    125. List> list = queryResultProcess(queryResult);
    126. if (list != null && list.size() != 0) {
    127. Map map = list.get(0);
    128. double num = (Double) map.get("count");
    129. count = new Double(num).longValue();
    130. }
    131. return count;
    132. }
    133. /**
    134. * 创建数据库
    135. * @param dbName 创建数据库
    136. * @return
    137. */
    138. public void createDB(String dbName) {
    139. influxDbBuild();
    140. influxDB.createDatabase(dbName);
    141. }
    142. /**
    143. * 批量写入测点
    144. * @param batchPoints
    145. */
    146. public void batchInsert(BatchPoints batchPoints) {
    147. influxDbBuild();
    148. influxDB.write(batchPoints);
    149. }
    150. /**
    151. * 批量写入数据
    152. *
    153. * @param database 数据库
    154. * @param retentionPolicy 保存策略
    155. * @param consistency 一致性
    156. * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
    157. */
    158. public void batchInsert(final String database, final String retentionPolicy,
    159. final InfluxDB.ConsistencyLevel consistency, final List records) {
    160. influxDbBuild();
    161. influxDB.write(database, retentionPolicy, consistency, records);
    162. }
    163. /**
    164. * @param consistency
    165. * @param records
    166. * @desc 批量写入数据
    167. */
    168. public void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List records) {
    169. influxDbBuild();
    170. influxDB.write(influxDBConfig.database, "", consistency, records);
    171. }
    172. }
      1. import com.lean.influxdb.entity.LogInfo;
      2. import com.lean.influxdb.service.InfluxDBService;
      3. import org.influxdb.dto.Point;
      4. import org.influxdb.dto.QueryResult;
      5. import org.junit.jupiter.api.Test;
      6. import org.springframework.beans.factory.annotation.Autowired;
      7. import org.springframework.boot.test.context.SpringBootTest;
      8. import java.util.Date;
      9. import java.util.HashMap;
      10. import java.util.List;
      11. import java.util.Map;
      12. import java.util.concurrent.TimeUnit;
      13. @SpringBootTest
      14. class SpringLeanInfluxdbApplicationTests {
      15. @Test
      16. void contextLoads() {
      17. }
      18. @Autowired
      19. private InfluxDBService influxDBService;
      20. @Test
      21. public void test() {
      22. LogInfo logInfo = LogInfo.builder()
      23. .level("1")
      24. .module("log")
      25. .deviceId("1")
      26. .msg("消息")
      27. .createTime(new Date())
      28. .build();
      29. Point point = Point.measurementByPOJO(logInfo.getClass())
      30. .addFieldsFromPOJO(logInfo)
      31. .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
      32. .build();
      33. influxDBService.insertPoint(point);
      34. }
      35. @Test
      36. public void test1() {
      37. Integer pageSize = 15;
      38. Integer pageNum = 1;
      39. // InfluxDB支持分页查询,因此可以设置分页查询条件
      40. String pageQuery = " LIMIT " + pageSize + " OFFSET " + ((pageNum - 1) * pageSize);
      41. // 此处查询所有内容,如果
      42. String queryCmd = "SELECT * FROM student ORDER BY time DESC " + pageQuery;
      43. QueryResult queryResult = influxDBService.query(queryCmd);
      44. List> maps = influxDBService.queryResultProcess(queryResult);
      45. maps.stream().forEach(s->{
      46. s.forEach((k,v)->{
      47. System.out.printf("k",k);
      48. System.out.printf("v",v);
      49. });
      50. });
      51. List resultList = queryResult.getResults();
      52. System.out.println(resultList);
      53. }
      54. @Test
      55. public void testSave() {
      56. String measurement = "student";
      57. Map tags = new HashMap<>();
      58. tags.put("code", "1");
      59. tags.put("studentNums", "1");
      60. Map fields = new HashMap<>();
      61. fields.put("name", "张三");
      62. fields.put("age", "11");
      63. influxDBService.insert(measurement, tags, fields);
      64. }
      65. @Test
      66. public void testGetdata() {
      67. String command = "select * from student";
      68. QueryResult queryResult = influxDBService.query(command);
      69. List> result = influxDBService.queryResultProcess(queryResult);
      70. for (Map map : result) {
      71. System.out.println("time:" + map.get("time")
      72. + " code:" + map.get("code")
      73. + " studentNums:" + map.get("studentNums")
      74. + " name:" + map.get("name")
      75. + " age:" + map.get("age"));
      76. }
      77. }
      78. }

      五、常用函数 

      更多查看官网:地址

      六、存储策略

              InfluxDB 每秒可以处理成千上万条数据,要将这些数据全部保存下来会占用大量的存储空间,有时需要自动清理历史数据。

      1.查看策略

               一个database可以有多个保留策略​​retention policy​​​,但是只能有一个默认​​retention policy​​

      show retention policies on databaseName

      2.创建策略

      数据库新建都会分配一个默认的保留策略:

      • name​​,保留策略的名称。
      • ​​duration​​​,数据保留的持续时长,最小为​​1h​​​。如果设置为0,数据永久保存(官方默认RP),否则过期清理。它具有各种时间参数,比如:h(小时),w(星期)
      • ​​shardGroupDuration​​​,数据存储在​​shardGroup​​​的时间跨度。​​shardGroup​​​是​​influxdb​​​的一个逻辑存储结构,其下包含多个​​shard​​。
      • ​​replicaN​​,全称replication,复制因子,它决定在集群中存储多少个数据副本。inflxudb集群中跨N个数据节点复制数据,其中N就是复制因子。复制因子对单个节点实例不起作用,单机版默认是1。
      • ​​default​​,true为默认保留策略。

      基本语法:

      CREATE RETENTION POLICY ON DURATION REPLICATION [SHARD DURATION ] [DEFAULT]

      • retention_policy_name​​: 策略名。
      • ​​database_name​​: 数据库名(db必须存在)。
      • ​​duration​​: 数据保留时长。
      • ​​REPLICATION​​: 复制因子,单机版设置为1即可。
      • ​​SHARD DURATION​​​:设置​​shardGroupDuration​​​时长,表示每个​​shard group​​时间跨度时长。可不填,默认根据RP的duration计算。
      • ​​default​​: true为设置该RP为默认RP。

      CREATE RETENTION POLICY "策略名称" ON "表名称" DURATION 30d REPLICATION 1 DEFAULT

      3、修改策略

      ​ALTER RETENTION POLICY "策略名称" ON "表名称" DURATION 3w DEFAULT

      4. 删除

      DROP RETENTION POLICY "策略名称" ON "表名称"

      七、目录与文件结构

      InfluxDB 的数据存储主要有三个目录。默认情况下是 meta, wal 以及 data 三个目录。

      1. meta 用于存储数据库的一些元数据,meta 目录下有一个 meta.db 文件。
      2. wal 目录存放预写日志文件,以 .wal 结尾。
      3. data 目录存放实际存储的数据文件,以 .tsm 结尾。

       shard:

              ​​influxdb​​​存储引擎​​TSM​​​的具体实现。​​TSM TREE​​​是专门为​​influxdb​​​构建的数据存储格式。与现有的​​B+ tree​​​或​​LSM tree​​​实现相比,​​TSM tree​​具有更好的压缩和更高的读写吞吐量。主要由4部分组成​​Cache​​​、​​Wal​​​、​​Tsm file​​​、​​Compactor​​

      shard group:

              ​​存储​​shard​​​的逻辑容器,每一个​​shard group​​​都有一个不重叠的时间跨度,可根据保留策略​​retention policy​​​的​​duration​​​换算而得。数据根据不同的时间跨度存储在不同的​​shard group​​中。

      cache : 

              在内存中是一个简单的 map 结构,这里的 key 为 seriesKey + 分隔符 + filedName,目前代码中的分隔符为 #!~#,entry 相当于是一个按照时间排序的存放实际值的数组。

              插入数据时,实际上是同时往 cache 与 wal 中写入数据,可以认为 cache 是 wal 文件中的数据在内存中的缓存。当 InfluxDB启动时,会遍历所有的 wal 文件,重新构造 cache,这样即使系统出现故障,也不会导致数据的丢失。

      wal:

               对数据的修改以日志的形式持久化存储在磁盘上。

              内容与内存中的 cache 相同,其作用就是为了持久化数据,当系统崩溃后可以通过 wal 文件恢复还没有写入到 tsm 文件中的数据。

              文件中的一条数据,对应的是一个 key(measument + tags + fieldName) 下的所有 value 数据,按照时间排序。

      tsm file:

              真实存放数据的文件,单个 tsm file 大小最大为 2GB。每隔一段时间,内存中的时序数据就会执行flush操作将数据写入到文件。

      compactor:

              在后台持续运行,每隔 1 秒会检查一次是否有需要压缩合并的数据。主要进行两种操作,一种是 cache 中的数据大小达到阀值后,进行快照,之后转存到一个新的 tsm 文件中。另外一种就是合并当前的 tsm 文件,将多个小的 tsm 文件合并成一个,使每一个文件尽量达到单个文件的最大大小,减少文件的数量,并且一些数据的删除操作也是在这个时候完成。

      数据存储

      LSM-Tree VS B-Tree

      八、数据备份

              两种备份方式:1.db备份 2.元数据备份。元数据备份的备份是整个备份,不能拆分,而db数据的备份,完整的、增量的(从 RFC3339 格式的时间开始),或者针对特定的分片 ID。

      1、备份和恢复DB数据

      一、DB备份

      基本语法:

      通过​​influxd backup -h​​查看​​backup​​有哪些可选参数。

       Usage: influxd backup [options] PATH

      -portable: 默认
      -host # 需要备份的influxdb服务机器地址,可选,Defaults to 127.0.0.1:8088.
      -database # 需要备份的db名称,可选,若没有指定,将备份所有数据库
      -retention # 备份某个保留策略的数据,未指定,则备份所有rp的数据。
      -shard # 需要备份的shard id,可选,若指定了备份shard,必须先选择rp
      -start # 起始时间,日期必须采用 RFC3339格式(例如, 2015-12-24T08:12:23Z). 不能和-since一起使用
      -end # 结束时间,日期必须采用 RFC3339格式(例如, 2015-12-24T08:12:23Z) 不能和-since一起使用
      -since # 备份这个timestamp之后的数据,建议用-start 代替
      -skip-errors # 可选,当备份shards时,跳过备份失败的shard,继续备份其他shard。

      influxd backup -database

      mydatabase是您要备份的数据库的名称,以及path-to-backup备份数据应该存储的位置。

      注意: Metastore 备份也包含在每个数据库的备份中

      案例:

      1.备份所有数据库

      influxd backup -portable /path/to/backup-directory

      2.备份school库的所有数据。

      influxd backup -portable -db school ~/tmp/influx_backup/

      3.备份school 库昨天的数据。

      influxd backup -portable -db school -start 2022-11-14T13:24:52.085243953Z -end 2020-11-15T13:24:52.085243953Z ~/tmp/influx_backup_yesterday/

      4.只备份school ​​​的​​shard=2​​的数据

      influxd backup -portable -db school -shard 2 ~/tmp/influx_backup_2/

      5.远程备份(所有数据库)

      influxd backup -portable -host 1192.168.56.11:8088 /path/to/backup-directory

      二、DB恢复

      使用该influxd restore实用程序将时间序列数据和元数据从 InfluxDB 备份还原到 InfluxDB。 

      ​influxd restore -h​​查看restore可选参数:

      Usage: influxd restore -portable [options] PATH

      Note: Restore using the '-portable' option consumes files in an improved Enterprise-compatible 
        format that includes a file manifest.

      Options:
          -portable 
                  
          -host  
                  
          -db    
                  从备份数据的哪个库恢复数据
          -newdb
                  数据恢复到新库名称,若没有指定,选择-db 的名称。
                  newdb不存在,恢复时会自动创建
          -rp    
                  从备份数据的哪个rp恢复数据,指定了-rp,必须指定-db
          -newrp
                  恢复数据新的rp名称,newrp必须存在。指定了-rp,未指定-newrp则使用-rp
          -shard
                  需要恢复的shard,如果指定了'-db ' and '-rp ',必须-shard
          PATH
                  备份数据文件list 

      案例:

      1.恢复​​~/tmp/influx_backup/​​​下​​school​​​库数据到新库​​new_school。

      influxd restore -portable  -db school -newdb new_school  ~/tmp/influx_backup/

       2.恢复​​~/tmp/influx_backup/​​​下​​school​​​库,​​rp​​​为​​rp_3​​​数据到新库​​new_school1​​。

      influxd restore -portable  -db school -rp rp_3 -newdb new_school1  ~/tmp/influx_backup/

      3.恢复​​~/tmp/influx_backup/​​​下​​school​​​库,​​rp​​​为​​rp_3​​​,​​shard​​​为2的数据到新库​​new_school2​​​,并重命名​​rp​​​的名称为​​rp_3days​​。

      influxd restore -portable  -db school -rp rp_3 -shard 2  -newdb new_school2   -newrp rp_3days ~/tmp/influx_backup/

      注意:

      二、备份和恢复元数据

      1、备份元数据

      influxd backup

      备份元数据,没有任何其他参数,备份将只转移当前状态的系统元数据到​​path-to-backup​​​。​​path-to-backup​​为备份保存的目录,不存在会自动创建。

      influxd backup tmp/mata_backup

       

       

      2、恢复元数据

      ​基本语法:

      influxd restore -metadir

      参考来源: 

      influxdb官方英文文档
      influxdb中文文档
      https://segmentfault.com/a/1190000012385313

    173. 相关阅读:
      图文教程:教你快速生成实体店电子优惠券
      jeecgboot源码下载及启动
      相似对角化和约旦标准型求法(附带Matlab代码)
      苹果转移供应链,促中国手机和中国制造更紧密合作,加速技术升级
      编译器一日一练(DIY系列之词法分析)
      AWVS的简介与安装
      智慧路灯远程智能控制
      JUC第二十九讲:JUC工具类: Phaser详解
      50.宝石补给
      CIE A-Level化学Paper 1真题讲解(3)
    174. 原文地址:https://blog.csdn.net/weixin_43549578/article/details/127861588