• influxdb2的使用


    1. Influxdb2****
      1. Influxdb2入门****

    在进行influxdb2的真正学习之前,我们先完成必须的下载安装,和基础的添加、查询、删除数据。

        1. 下载安装

    wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.8.x86_64.rpm

    sudo yum localinstall influxdb2-2.0.8.x86_64.rpm

      1. 2 配置初始化信息

    安装好influxdb2后,直接启用运行influxd命令启动。

    然后使用influxsetup命令初始化influxdb数据库,如图完成数据库配置

    配置清单

    Username

    root

    Password

    2342@@#@#

    Organizationname

    abc

    Bucketname

    test

    如上就完成了数据库的初始化,我们可以使用auth命令查看配置信息是否已经生效,查看token信息,如果不存在任何auth信息,使用influx命令是无法连接到influxd,完成数据的操作。通过token信息我们也可以直接在程序中利用该token信息直接对influxdb进行操作

        1. 基本操作

    对influxdb操作,最基本的是通过内置的influx命令完成,或者在程序中使用httpapi完成操作,这里以infux为例,简单介绍下读数据、写数据、和删除数据的基础操作,形成基本认识。

    Influxquery‘query_command’

    查看数据,使用内置的flux语法进行查询

    Influxwrite

    写数据,可以通过命令行的形式或者导入文件的方式进行数据的写入

    Influxdelete

    删除数据,最常用的是删除指定时间范围之内的数据

    –bucket参数指定待删除数据归属的bucket

    –start指定删除数据的起始时间

    –stop指定删除数据的结束时间

    –predicate对被删除的数据进行过滤

          1. 概念解析

    在经过这一节,我们必须理解几个相对抽象的概念,才能更好的应用操作,这里仅给出便于理解的概念

    bucket

    桶,influxdb中存储时间序列数据的指定位置,每个bucket都有一个数据存储周期配置(可配置为无限),超过该保留时间的数据自动被清除,每个bucket必须属于一个organizations。

    Bucket类比于关系数据库中的database

    organization

    组织,org是一组用户、任务、bucket、dashboard的抽象容器概念

    measurement

    Measurement类似于关系数据库中表的概念

    Point

    Point类似于关系数据库中的一行数据 ,一个point由tag 、timestamp、field组合的数据

          1. 插入数据

    通过write命令写入如下数据

    influx write -b test’

    mem,host=host1 used_percent=44.23 1577836800000000000

    mem,host=host2 used_percent=72.01 1557836800000000000

    mem,host=host1 used_percent=42.61 1537836810000000000

    mem,host=host2 used_percent=82.98 1477844810000000000

    mem,host=host1 used_percent=46.40 157783880000000000

    mem,host=host2 used_percent=43.77 157678367820000000000

    写入的bucket为test中的measument为mem,其中tag为host,field数据为used_percent。

    类比到mysql,即往数据库test中的表men中写入内存占用数据6条,且通过host列代表不同的主机。

          1. 查询数据
            1. 查询1

    influx query ’

    from(bucket:“test”)

    |> range(start:0) ’

    from函数指定查询的bucket

    range指定查询的时间范围,这里根据插入的时间戳得到对应的秒数,stop不填写为到当前时间点

            1. 查询2

    influx query ’

    from(bucket:“test”)

    |> range(start:0)

    |> filter( fn: ® => r.host == “host1”) ’

    通过filter进行过滤host1的内存占用情况,fn为自定义的函数fn:®中的r为传入给fn的参数,代表一行数据,=>之后为定义的fn函数内容,和如下的查询等价:

    influx query ’

    fn****=********® => r.host == “host1”****

    from(bucket:“test”)

    |> range(start:0)

    |> filter(fn) ’

    需要注意filter参数只能接受fn的参数为r的函数,如果将r修改成其他形参则会报错。

    以上为使用filter函数,指定measurement为mem且host为host2

          1. 删除数据

    influx delete

    -o abc

    –bucket test

    –start 1970-01-01T00:00:00Z

    –stop $(date +“%Y-%m-%dT%H:%M:%SZ”)

    –predicate ‘_measurement=“mem”’

      1. Influxdb2基础****
        1. 配置文件
          1. 启动参数

    Influxdb2不需要特定的配置参数即可以启动,可以完全做到开箱可用,非常方便,但是在实际项目中,我们还是需要对inflxdb2进行一些项目定制化的配置,以适配具体的业务。

    Influxdb2中的启动参数,可以有三种方式配置,三种方式从上到下依次优先级降低:

    1. Influxd的启动参数
    2. 设置系统环境变量
    3. 配置文件加载

    在系统中配置环境变量INFLUXD_CONFIG_PATH,可以将我们的个人配置文件放到指定目录,并且配置文件支持yaml、json、toml三种格式,在配置文件中我们可以指定请求的长度、加解密信息,以及其他数据存储路径等内容。

    export INFLUXD_CONFIG_PATH=/path/to/custom/config/directory

          1. 数据配置

    Influxdb2中的配置数据相关的存储文件在不同的系统下可能具有一定的差异,在linux下默认配置和数据保存在~/.influxdbv2/目录下

    ~/.influxdbv2/engine/

    Influxdb时间序列化数据存储的位置,

    data目录存储时间结构合并树文件(time-Structured Merge

    Tree ,TSM)

    Wal(Write Ahead Log)存储预写日志的存储目录

    通过export INFLUXD_ENGINE_PATH=~/.influxdbv2/engine迁移该目录

    或者在json配置文件中添加:

    “engine-path”: “/users/user/.influxdbv2/engine”

    ~/.influxdbv2/influxd.bolt

    非时间序列数据的基于文件的键值存储,例如 InfluxDB user、oards、task等

    在json配置文件中指定

    “bolt-path”: “/users/user/.influxdbv2/influxd.bolt”

    ~/.influxdbv2/configs

    授权信息存储等

        1. 数据概念
          1. Bucket****

    Bucket用于存储数据的结构,它必须属于某个org,且Bucket必须配置保留周期(可以设置为无限期)。Bucket类似于sql中的database概念

          1. Measurement****

    InfluxDB基础数据结构,用于描述存储在相关字段中的数据。类似于sql中的表的概念。

          1. Tag****

    作为key的一部分

          1. Field****

    作为value

          1. Serial****

    Serialkey是能够通过bucket、measurement、tagset和fieldkey表示出的唯一对象,我忍叫做序列键,能够唯一

          1. Point****

    一个point代表一条数据,类似于sql中的一条数据,具有以下特点:

    1. 由serial(measurement,tagsetfieldkeyfilevalue)和时间戳timestamp组成 measurefield和时间戳为必选
    2. 由serial和timestamp功能决定其唯一性

    如果两个point的serial和时间戳一致,在influxdb中存储时会认为其key相同,将两个point存储在一起,且所有信息都会指向后添加的point

        1. 引擎实现

    Influxdb设计之初就是为了应对日志、监控、告警的业务场景,所以引擎的实现以此为出发点,主要有三个原则

    1. 确保数据安全地写入磁盘;
    2. 查询的数据返回的正确性;
    3. 在保证数据准确的情况下,保证高性能;

    ,influxdb引入了以下的概念来达到以上目的

          1. Wal****

    Write Ahead Log ,预写日志。在influxdb引擎重启的时候进行加载构建高速缓存信息,wal确保数据在系统发生故障时能够正确的持久化存储。其存储目录为~/.influxdbv2/engine/wal,每个wal文件以命名格式为*.wal结尾。

    需要注意wal文件可以看作是运行程序的内存数据的对应,需要根据情况触发从内存将数据保存到wal文件中,或者通过定时刷新写入到wal文件中。一旦influxdb引擎故障则cache中未写入到数据将丢失。恢复过程中直接读取wal文件数据进行恢复。

          1. Cache****

    该高速缓存是当前存储的WAL文件中的Points在内存中的一个数据副本。该缓存信息主要包括如下内容:

    1. 按关键字(measurement、tagset、唯一field)组织Point数据,存储该key自己时间序列的数据
    2. Cache中的数据为WAL未被压缩的数据
    3. Influxdb存储引擎重启通过WAL进行加载。在查询cache时,与TSM文件中的数据进行合并

    对存储引擎进行查询时,其结果来自高速缓存和TSM文件这两部分。当执行查询时其实时在缓存的数据副本上执行,这种查询方式不会影响到同一时间的写入操作。而删除操作只需要发送给cache被清除指定的键或者指定的时间范围即可,同时该操作会在wal文件中记录删除操作。

          1. TSM

    TSM Tree ,Time-Structured Merge Tree,时间结构合并树。是 InfluxDB 根据实际需求在 LSM Tree 的基础上稍作修改优化而来。

    为了有效地压缩和存储数据,存储引擎按序列键(serialkey)对字段值进行分组,然后按时间对这些字段值进行排序。一个序列key,由measurement、tagset和值,还有field key组成。

    influxdb存储引擎会按照内存的使用情况将cache中的数据写入到tsm文件中,写入到tsm成功后会将对应的wal文件进行清除。

    TSM文件在~/.influxdbv2/engine/data/之下的目录,按照tsm所属的bucket、tag等信息保存在不同的目录。

          1. TSI

    在数据读取过程中,越来越多的数据会导致influxdb查询的性能降低,这里influxdb引入了TSI,时间序列指数的概念,解决key的查找性能问题。

    TSI存储measurement、tag、field的序列索引,这可以使得数据库可以快速的查询到哪些measurement、tag、filed存在,查询给定一个measurement、tag、filed的key信息,存在哪些serisalkey。

    https://docs.influxdata.com/influxdb/v2.0/reference/internals/storage-engine/

      1. Influx基础命令****

    在使用influx执行命令之前,必须经过auth认证,一种是手动配置另外就是在安装好inflxdb之后使用influxsetup命令完成配置,如果想重新配置,请把默认配置目录/root/.influxdbv2/目录删除,再重新运行influxd,再次使用influxsetup完成配置即可。最简单的配置清单(influx setup既可完成):

    1. 建立最少一个用户,否则无法完成认证,无法进行操作
    2. 建立最少一个organization,否则无法创建bucket
    3. 建立最少一个bucket存储数据

    以下为常用命令清单:

    Influxsetup

    初始化influxdb配置,删除/root/.influxdbv2/配置可以再次进行配置

    Influx auth

    进行用户认证信息的管理,该信息存储默认存储在/root/.influxdbv2/configs文件中

    Influxbucket

    对用户的bucket进行增删查改

    Influxconfig

    对/root/.influxdbv2/configs进行管理,也可以通过$INFLUX_CONFIGS_PATH命令重新指定目录

    Influxdelete

    在influxdb中删除一条数据

    Influx org

    对Organization进行增删查改

    Influxquery

    通过flux语法对数据进行查询,写入等操作

    Influxtask

    对influx任务进行管理,增删查改

    Influxwrite

    写数据到influxdb中

    influxbackup

    备份数据到指定目录

    influx restore

    从指定目录恢复数据

        1. setup
        1. auth**-**配置权限

    influx auth list

    查看当前inflxudb程序的权限信息列表,包括用户id用户名 用户token信息、用户的权限列表。该信息默认保存在/root/.influxdbv2/configs

    influx auth create

    为当前用户创建一个权限配置,生成唯一的token信息

    Influxauthdelete

    通过id删除对应的权限配置

    Influx auth active

    使对应的权限是否生效

          1. 实例
    1. 创建读写权限token

    这样创建的token对所属的bucket具有全部的读写权限,通过influxsetup创建的用户默认具有全部权限

    influx auth create

    –read-buckets

    –read-checks

    –read-dashboards

    –read-dbrps

    –read-notificationEndpoints

    –read-notificationRules

    –read-orgs

    –read-tasks

    –read-telegrafs

    –read-user

    –write-buckets

    –write-checks

    –write-dashboards

    –write-dbrps

    –write-notificationEndpoints

    –write-notificationRules

    –write-orgs

    –write-tasks

    –write-telegrafs

    –write-user

    1. 对特定bucket具有读写访问权限

    对于bucketid为0000000000000001 0000000000000002具有读写bucket的权限。

    influx auth create

    –read-bucket 0000000000000001

    –read-bucket 0000000000000002

    –write-bucket 0000000000000001

    –write-bucket 0000000000000002

    通过细力度的控制,控制权限分发,同样可以创建对于特定org的权限等等

        1. bucket

    Influxbucketlist

    查看influxdb中的bucket列表,包括bucketidbucket名称,保留数据时间计划,所属org等信息

    influx bucket create

    创建bucket

    –org 指定所属org

    –name指定名称必须

    –retention指定保留计划,如30d-保留30天,2h-保留两个小时,如果为0则为永远

    –description 可选,描述信息

    influx bucket delete

    –name 被删除的bucket名称

    –id被删除bucket的id

    Influxbucketupdate

    –id被更新的bucketid,其他参数和create参数相同

          1. 实例

    bucket create --org abc–name test-bucket --retention 10d

    influx bucket list

    influx bucket delete --name test-bucket

    influx bucket create -n “test-12h” -r 12h -o abc

    influx bucket create -n “test-1d” -r 1d -o abc。、

    influx bucket create -n “test-7d” -r 7d -o abc

    influx bucket create -n “test-30d” -r 30d -o abc

    influx bucket create -n “test-60d” -r 60d -o abc

        1. org

    Influxorgcreate

    –name指定新建的org名称

    influx org create --name test

    Influxorgdelete

    –id通过id删除对应的org

    influx org delete --id 975933ce78f0536e

    Influxorglist

    显示当前org的列表

    Influx org update

    –id 需要更新信息的org

    –name需要更新的名称

    influx org update --id fdbcfasd8b174c7c --name test3

    Influx org members list

    查看当前org的成员信息

    –name待查询的org名称

    –id待查询的orgid

    influx org members list --name abc

    Influx org members add

    为org添加对应的用户成员id,如

    influx org members add --name test3 --member 11asdf13e283d5ea09c3b

    Influx org members remove

    为对应的org移除特定用户

          1. 实例
        1. write

    write命令可以通过influx自带的lineprotocol格式写入数据,或者直接导入带注释的csv格式,通过注释内容解析成lineprotocol数据格式。

    –bucket

    指定bucket,并写入lineprotocal数据

    influx write --bucket example-bucket "m,host=host1 field1=1.2”

    –file

    通过文件直接写入lineprotocol数据,可以一次写入多个文件

    influx write --bucket example-bucket --file path/to/line-protocol.txt–file path/to/line-2.txt

    –url

    通过url直接写入lineprotocol数据,可以写入多个url并和file命令一起用

    influx write --bucket example-bucket --url https://example.com/line-protocol.txt

    –compression

    指定写入文件的加密方式,如–compression gzip,为gzip格式

    –format csv

    通过命令行写入csv格式的数据,需要带注释解析成lineprotocol

          1. lineprotocal实例****

    写入的lineprotocol数据格式如图:

    Measurement

    必选

    插入的测量表名称

    tagset

    可选

    和measurement进行逗号分隔,如果存在多个tag也用逗号分隔,和field参数使用空格分隔

    Fieldset

    必选

    存储的数据内容,多个数据使用逗号分隔

    Timestamp

    可选

    插入数据的时间戳,如果不填写则默认为当前,和field参数使用空格进行分隔

    *第一个空格后为field数据,第二个空格后为时间戳参数

    1. 有tag

    measu_test,host=host1myfield=2231324681356453098000000

    measument名称为measu_test,插入tag为host1,且field数据名称为myfield,value为223的数据

    1. 无tag

    measu_test mystr=223, myNum=223,mystr=”hello”

    在measu_test插入数据 field为myNum和mystr的数据,注意field内部分隔符为逗号,measument和field的分隔符为空格

          1. csv格式

    写入csv格式数据如图

    通过#datatype告诉inflxudb解析成lineprotocol数据,每一列数据的含义和类型,第二列指定数据表头。之后为数据行。

        1. query

    influx query ‘flux_str’

    直接通过flux语法字符串查询

    influx query -f /path/to/query.flux

    通过flux文件进行查询

    influxdb2中使用全新的flux语法进行,Flux 是第四代编程语言,专为数据脚本、ETL、监控和警报而设计。它的作用超越了一门查询语言和编程语言。它提供了一个规划器和优化器,无缝地结合了查询和编程,形成了一个统一的整体。实现这些设计目标最终将得到图灵完备的 Flux,不仅可以用于查询和处理时间序列数据,还可以用于处理一般的数据。

    同时在flux已经实现了很多内置的通用函数,方便用户的使用,如count、sort、limit,等sql中常用的方法,只是在使用中存在略微的差异。具体语法参考:https://docs.influxdata.com/flux/v0.x/get-started/syntax-basics/

    对于我们简单的使用来说,需要掌握常用的flux函数和管道运算符(|>),其结构大体如下:

    data |> someFunction() |> anotherFunction()

    以一个实例演示

    from(bucket:“test”)

    |> range(start:157234368)

    |> filter( fn: ® => r._measurement == “mem” and r.host == “host2”)

    from函数获取数据,同时我们也可以使用参数代替,如定义data=from(bucket:“test”),获取的数据通过管道符号交由下个函数进行处理(这里的逻辑类似于shell中的管道符)。

    这里from获取到的数据,经过range函数确定时间范围过滤,然后再经过filter过滤。最后得到我们实际想要插叙的数据。

        1. delete

    –org

    必选

    指定被删除的org的名称,或者设置全局变量$INFLUX_ORG

    –bucket

    必选

    指定删除数据的bucket,或者设置全局变量$INFLUX_BUCKET_NAME

    –start

    删除数据的起始时间,RFC3339Nano格式,如2009-01-02T23:00:00Z

    –stop

    删除数据的截至时间参数

    –predicate

    对待删除point数据的匹配选项,如

    host=“prod-1.4” AND region=“us-west”

    _measurement=“sensorData”

          1. 示例

    influx delete -o abc–bucket test-12h–start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-5m”’

    influx delete -o abc–bucket test-7d --start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-20m”’

    influx delete -o abc–bucket test-30d --start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-1h”’

    influx delete -o abc–bucket test-60d–start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-1d”’

    influx delete -o abc–bucket test-60d–start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-7d”’

        1. task

    influxtasklist

    查看当前数据库任务

    influxtaskcreate-ffile_name

    通过flux文件创建任务

    influx task delete -i

    通过taskid删除任务

    influx task run list --task-id

    查看当前taskid的任务 运行历史记录

    ./influx task log list --task-id

    查看某个任务的运行日志

    ./influx task update–task-id

    通过id更新任务

    ./influx task run retry

    –task-id=08qewrdadsd57819000

    –run-id=082340qwerbe770019000

    重新运行任务,该任务至少已经被运行过一次

        1. backup

    influx backup /root/test/backup_$(date ‘+%Y-%m-%d_%H-%M’)

        1. restore

    通过指定目录的数据恢复数据库

    influx restorepath

    通过指定目录恢复全部数据

    influx restore/root/test/backup_2021-08-23_06-28

    –bucket

    恢复指定bucket数据

    influx restore backups/2020-01-20_12-00/ --bucket example-bucket

    –new-bucket

    恢复指定bucket数据到新的bucket

    influx restore /backups/2020-01-20_12-00/ --bucket example-bucket --new-bucket new-example-bucket

    –full

    全量恢复,如果不指定该参数,将会因数据库内部存在部分冲突的时候无法导入

    ./influx restore --full /root/test/backup_2021-08-23_06-28

      1. influxdbV1和influxdbV2****
    1. 查询方法变更

    原来使用类sql方式查询,现在使用内置的flux方式查询

    1. 底层数据结构变更

    原来的bucket+时间保留计划=现在的bucket

    1. task取代连续查询

    1.x版本和2.x版本最大的差异是连续查询(continuous query)已经被任务(task)所取代。influxdb中的连续查询功能是对外提供的对数据处理的功能,如为了预防我们的存储日志过大会建立起保存策略,超过设置的超时时间数据就丢失了。针对这种情况,我们可以通过连续查询功能,对用户的数据进行汇总、抽样等操作,再插入到另外的表中即可,虽然丢失了一定的精度,但是让数据占用的空间大大减小。

      1. flux语法
        1. 常用内置函数
          1. 输入
            1. from**-**输入数据

    bucket

    待查询的bucket名字

    bucketID

    bucketid

    host

    要查询的 InfluxDB 实例的 URL。

    org

    所属组织名称

    orgID

    所属组织id

    token

    需要提供查询的apitoken信息

    from(

    bucket: “example-bucket”,

    host: “https://us-east-1-1.aws.cloud2.influxdata.com”,

    org: “example-org”,

    token: token

    )

          1. 输出
          2. to**-**输出数据

    bucket

    待查询的bucket名字

    bucketID

    bucketid

    host

    要查询的 InfluxDB 实例的 URL。

    org

    所属组织名称

    orgID

    所属组织id

    token

    需要提供查询的apitoken信息

    timeColumn

    是否添加time列,默认名称”_time”

    tagColumns

    输出tag列信息。默认为字符串类型的所有列,不包括所有值列和由 fieldFn 标识的列。

    fieldFn

    从输入中中获取记录并返回数据的函数。对于输入表中的每条数据(record),fieldFn 返回一条对应数据,将该输出字段键映射到输出值。

    fieldFn: ® => ({“hum”: r.hum, “temp”: r.temp}) 如,对记录中的hum值指定名称为”hum”

    to(

    bucketID: “1234567890”,

    orgID: “0987654321”,

    timeColumn: “_time”,

    tagColumns: [“tag1”, “tag2”, “tag3”],

    fieldFn: ® => ({ [r._field]: r._value })

    )

          1. 数据转换
            1. range**-**过滤时间
            2. ******filter-**数据过滤

    filter(fn: ® => r._measurement == “traffic-sip-1d” )

            1. group**-**分组函数

    对数据进行分组或者合并

    group()

    对多个分组进行合并

    group(columns:[“column_name”])

    以某些列进行分组

    group(columns:[“dim”])

          1. ******join-**合并多个流
            1. top**-**前N项数据

    from(bucket:“test-60d”)

    |> range(start: -5m)

    |> filter(fn: ® => r._measurement == “traffic-dip-1d”)

    |> pivot(rowKey: [“_time”], columnKey: [“_field”], valueColumn: “_value”)

    |> group()

    |> map(fn:® => ({dim:r.dim,_time:r._time, up:r.up,down:r.down, total:r.total,session:r.session}))

    |> top(n:10,columns: [“up”])

          1. ******map-**数据隐射

    对每条记录进行map操作后写入新表,默认情况下不做处理的列会被删除,使用with则是在原有的基础上更新

    map(fn:® => ({dim:r.dim,_time:r._time, up:r.up,down:r.down, total:r.total,session:r.session}))

    map(fn: ® => ({ _value: r._value * r._value }))

    map(fn: ® => ({ r with newColumn: r._value * 2 }))

          1. ******cumulativeSum-**累加求和

    以每一行为基础,从第一行往后累加,最后一行即为累加后的总数

    cumulativeSum(columns: [“up”,“down”,“total”,“session” ])

          1. last**-**获取表或分组最后一行

    取表中的最后一行数据

    |> group(columns:[“dim”])

    |> cumulativeSum(columns: [“up”,“down”,“total”,“session” ])

    |> last(column:“_time”)

    |> group()

          1. count**-**对表中行进行计数

    不指定参数默认对于_value的个数进行计数

    count()

    count(column: “name”)

          1. sort**-**排序

    按照某一列进行排序

    sort(columns: [“_value”], desc: false)

          1. pivot**—**透视数据为列

    influxdb内部数据为key-value形式,在进行表的整体操作时,要以某些key为键汇聚数据,通常使用_time透视为列

    from(bucket:“test-60d”)|> range(start: -task.every)

    |> filter(fn: ® => r._measurement == “traffic-dip-1d”)

    |> pivot(rowKey: [“_time”], columnKey: [“_field”], valueColumn: “_value”)

        1. 自定义任务

    在influxdb2.x版本中,针对复杂场景下的数据处理,连续查询依然存在着一定的功能的不足,所以使用了任务机制进行了替代。

    任务机制作为连续查询的功能升级,可以非常方便的将连续查询转换为任务,具体过程可参考官方文档:Migrate continuous queries to tasks | InfluxDB OSS 2.0 Documentation

    influxdbtask是一个预定义的脚本,通过对输入数据流进行修改、分析,然后存储到新的bucket中或者执行其他操作。

    influxdb任务由以下四部分组成:

    1. taskoptions-任务选项,如定义任务执行周期,任务配置等
    2. a data source-需要被task处理的数据源
    3. data processing or transformation-需要执行的数据处理和数据转换
    4. adestionation -处理过后的数据存储位置信息

    以下为官方的示例:

    // Task options____任务配置____每小时执行

    option task ={

    name:“cqinterval15m”,

    every:1h,

    offset:0m,

    concurrency:1,

    }

    // Data source____指定数据源

    data =from(bucket:“example-bucket”)

    |>range(start:-task.every)

    |>filter(fn:® =>

    r._measurement=="mem"and

    r.host ==“myHost”

    )

    data

    // Data transformation____指定处理逻辑函数mean

    |>aggregateWindow(

    every:5m,

    fn:mean

    )

    // Data destination 数据存储

    |>to(bucket:“example-downsampled”)

        1. 查询优化
          1. 使用pushdowns函数

    pushdowns函数是flux内的定义,是对底层的数据源进行操作的函数或者函数组合,而不需要在内存中对数据进行处理。使用pushdown进行查询可以有效的提高查询效率。如果一但是非非pushdown操作,则需要将数据拉入到内存中进行操作。

    如range为pushdown函数,它在处理中是在influxdb数据库源数据进行取数据,而sort为非pushdown函数,执行时必须将数据放到内存中进行排序。所以在实际执行中我们应该先利用range函数过滤数据再执行sort。

            1. pushdown函数及组合

    pushdown函数

    count()

    filter()*

    keep()

    mean()

    drop()

    fill()

    last()

    min()

    duplicate()

    first()

    max()

    range()

    rename()

    sum()

    window()

    pushdown函数组合

    group()|>count()

    group()|>max()

    window()|>count()

    window()|>max()

    group() |> first()

    group()|>min()

    window()|> first()

    window()|> in()

    group() |> last()

    group()|>sum()

    window()|> last()

    window()|>sum()

          1. 避免filter运算

    避免使用filter在过滤时进行实时的计算,而是应该尽量采用静态的方式,如果要动态的设置过滤器,请在filter之外使用变量定义。

    不推荐的方式

    推荐的方式

          1. 谨慎使用以下函数

    以下函数属于消耗内存和cpu较多的函数。

    map

    使用set而尽可能的少的使用map

    reduce

    join

    join

    union

    联合不同的表

    pivot

    透视列

          1. 使用window类函数避免过小的窗口

    window类窗口函数对数据进行基于时间范围的分组,通常用于聚合和采样数,较短的窗口时间范围将会影响性能。总时长太长,窗口时间太少,将会产生很多的分组窗口数据,影响到查询的性能。

          1. 平衡时间范围和数据精度

    如果请求的数据范围较大,且精度较高则会查询出大量的数据造成性能的消耗,所以遇到该情况,应该利用任务对数据进行采样进行预处理

          1. 使用flux分析器测量查询性能

    使用flux内置的profiler包可以对查询性能进行评估,通过直接导入该包,即能非常方便的输出统计信息。

    query:提供有关整个 Flux 脚本执行的统计信息。

    operator:提供有关查询中每个操作的统计信息。

  • 相关阅读:
    深度神经网络为何成功?其中的过程、思想和关键主张选择
    算法训练营day45|动态规划 part07:完全背包 (LeetCode 70. 爬楼梯(进阶)、322. 零钱兑换、279.完全平方数)
    python 时间加法 输出t分钟后的时间
    Redis基础
    《最新出炉》系列入门篇-Python+Playwright自动化测试-49-Route类拦截修改请求-下篇
    基于java+SpringBoot+HTML+Mysql)疫情防控微信小程序
    基于百度地图的交通查询的毕业设计(android)
    【汇编】指令系统的寻址方式
    win7的Par虚拟机版优化(parellel desktop)
    arthas调查内存溢出 kibana宕机导致内存溢出
  • 原文地址:https://blog.csdn.net/m0_54866636/article/details/126032352