在进行influxdb2的真正学习之前,我们先完成必须的下载安装,和基础的添加、查询、删除数据。
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.8.x86_64.rpm
sudo yum localinstall influxdb2-2.0.8.x86_64.rpm
安装好influxdb2后,直接启用运行influxd命令启动。
然后使用influxsetup命令初始化influxdb数据库,如图完成数据库配置
配置清单
Username
root
Password
2342@@#@#
Organizationname
abc
Bucketname
test
如上就完成了数据库的初始化,我们可以使用auth命令查看配置信息是否已经生效,查看token信息,如果不存在任何auth信息,使用influx命令是无法连接到influxd,完成数据的操作。通过token信息我们也可以直接在程序中利用该token信息直接对influxdb进行操作
对influxdb操作,最基本的是通过内置的influx命令完成,或者在程序中使用httpapi完成操作,这里以infux为例,简单介绍下读数据、写数据、和删除数据的基础操作,形成基本认识。
Influxquery‘query_command’
查看数据,使用内置的flux语法进行查询
Influxwrite
写数据,可以通过命令行的形式或者导入文件的方式进行数据的写入
Influxdelete
删除数据,最常用的是删除指定时间范围之内的数据
–bucket参数指定待删除数据归属的bucket
–start指定删除数据的起始时间
–stop指定删除数据的结束时间
–predicate对被删除的数据进行过滤
在经过这一节,我们必须理解几个相对抽象的概念,才能更好的应用操作,这里仅给出便于理解的概念
bucket
桶,influxdb中存储时间序列数据的指定位置,每个bucket都有一个数据存储周期配置(可配置为无限),超过该保留时间的数据自动被清除,每个bucket必须属于一个organizations。
Bucket类比于关系数据库中的database
organization
组织,org是一组用户、任务、bucket、dashboard的抽象容器概念
measurement
Measurement类似于关系数据库中表的概念
Point
Point类似于关系数据库中的一行数据 ,一个point由tag 、timestamp、field组合的数据
通过write命令写入如下数据
influx write -b test’
mem,host=host1 used_percent=44.23 1577836800000000000
mem,host=host2 used_percent=72.01 1557836800000000000
mem,host=host1 used_percent=42.61 1537836810000000000
mem,host=host2 used_percent=82.98 1477844810000000000
mem,host=host1 used_percent=46.40 157783880000000000
mem,host=host2 used_percent=43.77 157678367820000000000
‘
写入的bucket为test中的measument为mem,其中tag为host,field数据为used_percent。
类比到mysql,即往数据库test中的表men中写入内存占用数据6条,且通过host列代表不同的主机。
influx query ’
from(bucket:“test”)
|> range(start:0) ’
from函数指定查询的bucket
range指定查询的时间范围,这里根据插入的时间戳得到对应的秒数,stop不填写为到当前时间点
influx query ’
from(bucket:“test”)
|> range(start:0)
|> filter( fn: ® => r.host == “host1”) ’
通过filter进行过滤host1的内存占用情况,fn为自定义的函数fn:®中的r为传入给fn的参数,代表一行数据,=>之后为定义的fn函数内容,和如下的查询等价:
influx query ’
fn****=********® => r.host == “host1”****
from(bucket:“test”)
|> range(start:0)
|> filter(fn) ’
需要注意filter参数只能接受fn的参数为r的函数,如果将r修改成其他形参则会报错。
以上为使用filter函数,指定measurement为mem且host为host2
influx delete
-o abc
–bucket test
–start 1970-01-01T00:00:00Z
–stop $(date +“%Y-%m-%dT%H:%M:%SZ”)
–predicate ‘_measurement=“mem”’
Influxdb2不需要特定的配置参数即可以启动,可以完全做到开箱可用,非常方便,但是在实际项目中,我们还是需要对inflxdb2进行一些项目定制化的配置,以适配具体的业务。
Influxdb2中的启动参数,可以有三种方式配置,三种方式从上到下依次优先级降低:
在系统中配置环境变量INFLUXD_CONFIG_PATH,可以将我们的个人配置文件放到指定目录,并且配置文件支持yaml、json、toml三种格式,在配置文件中我们可以指定请求的长度、加解密信息,以及其他数据存储路径等内容。
export INFLUXD_CONFIG_PATH=/path/to/custom/config/directory
Influxdb2中的配置数据相关的存储文件在不同的系统下可能具有一定的差异,在linux下默认配置和数据保存在~/.influxdbv2/目录下
~/.influxdbv2/engine/
Influxdb时间序列化数据存储的位置,
data目录存储时间结构合并树文件(time-Structured Merge
Tree ,TSM)
Wal(Write Ahead Log)存储预写日志的存储目录
通过export INFLUXD_ENGINE_PATH=~/.influxdbv2/engine迁移该目录
或者在json配置文件中添加:
“engine-path”: “/users/user/.influxdbv2/engine”
~/.influxdbv2/influxd.bolt
非时间序列数据的基于文件的键值存储,例如 InfluxDB user、oards、task等
在json配置文件中指定
“bolt-path”: “/users/user/.influxdbv2/influxd.bolt”
~/.influxdbv2/configs
授权信息存储等
Bucket用于存储数据的结构,它必须属于某个org,且Bucket必须配置保留周期(可以设置为无限期)。Bucket类似于sql中的database概念
InfluxDB基础数据结构,用于描述存储在相关字段中的数据。类似于sql中的表的概念。
作为key的一部分
作为value
Serialkey是能够通过bucket、measurement、tagset和fieldkey表示出的唯一对象,我忍叫做序列键,能够唯一
一个point代表一条数据,类似于sql中的一条数据,具有以下特点:
如果两个point的serial和时间戳一致,在influxdb中存储时会认为其key相同,将两个point存储在一起,且所有信息都会指向后添加的point
Influxdb设计之初就是为了应对日志、监控、告警的业务场景,所以引擎的实现以此为出发点,主要有三个原则
,influxdb引入了以下的概念来达到以上目的
Write Ahead Log ,预写日志。在influxdb引擎重启的时候进行加载构建高速缓存信息,wal确保数据在系统发生故障时能够正确的持久化存储。其存储目录为~/.influxdbv2/engine/wal,每个wal文件以命名格式为*.wal结尾。
需要注意wal文件可以看作是运行程序的内存数据的对应,需要根据情况触发从内存将数据保存到wal文件中,或者通过定时刷新写入到wal文件中。一旦influxdb引擎故障则cache中未写入到数据将丢失。恢复过程中直接读取wal文件数据进行恢复。
该高速缓存是当前存储的WAL文件中的Points在内存中的一个数据副本。该缓存信息主要包括如下内容:
对存储引擎进行查询时,其结果来自高速缓存和TSM文件这两部分。当执行查询时其实时在缓存的数据副本上执行,这种查询方式不会影响到同一时间的写入操作。而删除操作只需要发送给cache被清除指定的键或者指定的时间范围即可,同时该操作会在wal文件中记录删除操作。
TSM Tree ,Time-Structured Merge Tree,时间结构合并树。是 InfluxDB 根据实际需求在 LSM Tree 的基础上稍作修改优化而来。
为了有效地压缩和存储数据,存储引擎按序列键(serialkey)对字段值进行分组,然后按时间对这些字段值进行排序。一个序列key,由measurement、tagset和值,还有field key组成。
influxdb存储引擎会按照内存的使用情况将cache中的数据写入到tsm文件中,写入到tsm成功后会将对应的wal文件进行清除。
TSM文件在~/.influxdbv2/engine/data/之下的目录,按照tsm所属的bucket、tag等信息保存在不同的目录。
在数据读取过程中,越来越多的数据会导致influxdb查询的性能降低,这里influxdb引入了TSI,时间序列指数的概念,解决key的查找性能问题。
TSI存储measurement、tag、field的序列索引,这可以使得数据库可以快速的查询到哪些measurement、tag、filed存在,查询给定一个measurement、tag、filed的key信息,存在哪些serisalkey。
https://docs.influxdata.com/influxdb/v2.0/reference/internals/storage-engine/
在使用influx执行命令之前,必须经过auth认证,一种是手动配置另外就是在安装好inflxdb之后使用influxsetup命令完成配置,如果想重新配置,请把默认配置目录/root/.influxdbv2/目录删除,再重新运行influxd,再次使用influxsetup完成配置即可。最简单的配置清单(influx setup既可完成):
以下为常用命令清单:
Influxsetup
初始化influxdb配置,删除/root/.influxdbv2/配置可以再次进行配置
Influx auth
进行用户认证信息的管理,该信息存储默认存储在/root/.influxdbv2/configs文件中
Influxbucket
对用户的bucket进行增删查改
Influxconfig
对/root/.influxdbv2/configs进行管理,也可以通过$INFLUX_CONFIGS_PATH命令重新指定目录
Influxdelete
在influxdb中删除一条数据
Influx org
对Organization进行增删查改
Influxquery
通过flux语法对数据进行查询,写入等操作
Influxtask
对influx任务进行管理,增删查改
Influxwrite
写数据到influxdb中
influxbackup
备份数据到指定目录
influx restore
从指定目录恢复数据
influx auth list
查看当前inflxudb程序的权限信息列表,包括用户id用户名 用户token信息、用户的权限列表。该信息默认保存在/root/.influxdbv2/configs
influx auth create
为当前用户创建一个权限配置,生成唯一的token信息
Influxauthdelete
通过id删除对应的权限配置
Influx auth active
使对应的权限是否生效
这样创建的token对所属的bucket具有全部的读写权限,通过influxsetup创建的用户默认具有全部权限
influx auth create
–read-buckets
–read-checks
–read-dashboards
–read-dbrps
–read-notificationEndpoints
–read-notificationRules
–read-orgs
–read-tasks
–read-telegrafs
–read-user
–write-buckets
–write-checks
–write-dashboards
–write-dbrps
–write-notificationEndpoints
–write-notificationRules
–write-orgs
–write-tasks
–write-telegrafs
–write-user
对于bucketid为0000000000000001 0000000000000002具有读写bucket的权限。
influx auth create
–read-bucket 0000000000000001
–read-bucket 0000000000000002
–write-bucket 0000000000000001
–write-bucket 0000000000000002
通过细力度的控制,控制权限分发,同样可以创建对于特定org的权限等等
Influxbucketlist
查看influxdb中的bucket列表,包括bucketidbucket名称,保留数据时间计划,所属org等信息
influx bucket create
创建bucket
–org 指定所属org
–name指定名称必须
–retention指定保留计划,如30d-保留30天,2h-保留两个小时,如果为0则为永远
–description 可选,描述信息
influx bucket delete
–name 被删除的bucket名称
–id被删除bucket的id
Influxbucketupdate
–id被更新的bucketid,其他参数和create参数相同
bucket create --org abc–name test-bucket --retention 10d
influx bucket list
influx bucket delete --name test-bucket
influx bucket create -n “test-12h” -r 12h -o abc
influx bucket create -n “test-1d” -r 1d -o abc。、
influx bucket create -n “test-7d” -r 7d -o abc
influx bucket create -n “test-30d” -r 30d -o abc
influx bucket create -n “test-60d” -r 60d -o abc
Influxorgcreate
–name指定新建的org名称
influx org create --name test
Influxorgdelete
–id通过id删除对应的org
influx org delete --id 975933ce78f0536e
Influxorglist
显示当前org的列表
Influx org update
–id 需要更新信息的org
–name需要更新的名称
influx org update --id fdbcfasd8b174c7c --name test3
Influx org members list
查看当前org的成员信息
–name待查询的org名称
–id待查询的orgid
influx org members list --name abc
Influx org members add
为org添加对应的用户成员id,如
influx org members add --name test3 --member 11asdf13e283d5ea09c3b
Influx org members remove
为对应的org移除特定用户
write命令可以通过influx自带的lineprotocol格式写入数据,或者直接导入带注释的csv格式,通过注释内容解析成lineprotocol数据格式。
–bucket
指定bucket,并写入lineprotocal数据
influx write --bucket example-bucket "m,host=host1 field1=1.2”
–file
通过文件直接写入lineprotocol数据,可以一次写入多个文件
influx write --bucket example-bucket --file path/to/line-protocol.txt–file path/to/line-2.txt
–url
通过url直接写入lineprotocol数据,可以写入多个url并和file命令一起用
influx write --bucket example-bucket --url https://example.com/line-protocol.txt
–compression
指定写入文件的加密方式,如–compression gzip,为gzip格式
–format csv
通过命令行写入csv格式的数据,需要带注释解析成lineprotocol
写入的lineprotocol数据格式如图:
Measurement
必选
插入的测量表名称
tagset
可选
和measurement进行逗号分隔,如果存在多个tag也用逗号分隔,和field参数使用空格分隔
Fieldset
必选
存储的数据内容,多个数据使用逗号分隔
Timestamp
可选
插入数据的时间戳,如果不填写则默认为当前,和field参数使用空格进行分隔
*第一个空格后为field数据,第二个空格后为时间戳参数
measu_test,host=host1myfield=2231324681356453098000000
measument名称为measu_test,插入tag为host1,且field数据名称为myfield,value为223的数据
measu_test mystr=223, myNum=223,mystr=”hello”
在measu_test插入数据 field为myNum和mystr的数据,注意field内部分隔符为逗号,measument和field的分隔符为空格
写入csv格式数据如图
通过#datatype告诉inflxudb解析成lineprotocol数据,每一列数据的含义和类型,第二列指定数据表头。之后为数据行。
influx query ‘flux_str’
直接通过flux语法字符串查询
influx query -f /path/to/query.flux
通过flux文件进行查询
influxdb2中使用全新的flux语法进行,Flux 是第四代编程语言,专为数据脚本、ETL、监控和警报而设计。它的作用超越了一门查询语言和编程语言。它提供了一个规划器和优化器,无缝地结合了查询和编程,形成了一个统一的整体。实现这些设计目标最终将得到图灵完备的 Flux,不仅可以用于查询和处理时间序列数据,还可以用于处理一般的数据。
同时在flux已经实现了很多内置的通用函数,方便用户的使用,如count、sort、limit,等sql中常用的方法,只是在使用中存在略微的差异。具体语法参考:https://docs.influxdata.com/flux/v0.x/get-started/syntax-basics/。
对于我们简单的使用来说,需要掌握常用的flux函数和管道运算符(|>),其结构大体如下:
data |> someFunction() |> anotherFunction()
以一个实例演示
或
from(bucket:“test”)
|> range(start:157234368)
|> filter( fn: ® => r._measurement == “mem” and r.host == “host2”)
from函数获取数据,同时我们也可以使用参数代替,如定义data=from(bucket:“test”),获取的数据通过管道符号交由下个函数进行处理(这里的逻辑类似于shell中的管道符)。
这里from获取到的数据,经过range函数确定时间范围过滤,然后再经过filter过滤。最后得到我们实际想要插叙的数据。
–org
必选
指定被删除的org的名称,或者设置全局变量$INFLUX_ORG
–bucket
必选
指定删除数据的bucket,或者设置全局变量$INFLUX_BUCKET_NAME
–start
删除数据的起始时间,RFC3339Nano格式,如2009-01-02T23:00:00Z
–stop
删除数据的截至时间参数
–predicate
对待删除point数据的匹配选项,如
host=“prod-1.4” AND region=“us-west”
_measurement=“sensorData”
influx delete -o abc–bucket test-12h–start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-5m”’
influx delete -o abc–bucket test-7d --start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-20m”’
influx delete -o abc–bucket test-30d --start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-1h”’
influx delete -o abc–bucket test-60d–start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-1d”’
influx delete -o abc–bucket test-60d–start 2020-03-01T00:00:00Z --stop 2022-11-14T00:00:00Z --predicate ‘_measurement=“test-intt-7d”’
influxtasklist
查看当前数据库任务
influxtaskcreate-ffile_name
通过flux文件创建任务
influx task delete -i
通过taskid删除任务
influx task run list --task-id
查看当前taskid的任务 运行历史记录
./influx task log list --task-id
查看某个任务的运行日志
./influx task update–task-id
通过id更新任务
./influx task run retry
–task-id=08qewrdadsd57819000
–run-id=082340qwerbe770019000
重新运行任务,该任务至少已经被运行过一次
influx backup /root/test/backup_$(date ‘+%Y-%m-%d_%H-%M’)
通过指定目录的数据恢复数据库
influx restorepath
通过指定目录恢复全部数据
influx restore/root/test/backup_2021-08-23_06-28
–bucket
恢复指定bucket数据
influx restore backups/2020-01-20_12-00/ --bucket example-bucket
–new-bucket
恢复指定bucket数据到新的bucket
influx restore /backups/2020-01-20_12-00/ --bucket example-bucket --new-bucket new-example-bucket
–full
全量恢复,如果不指定该参数,将会因数据库内部存在部分冲突的时候无法导入
./influx restore --full /root/test/backup_2021-08-23_06-28
原来使用类sql方式查询,现在使用内置的flux方式查询
原来的bucket+时间保留计划=现在的bucket
1.x版本和2.x版本最大的差异是连续查询(continuous query)已经被任务(task)所取代。influxdb中的连续查询功能是对外提供的对数据处理的功能,如为了预防我们的存储日志过大会建立起保存策略,超过设置的超时时间数据就丢失了。针对这种情况,我们可以通过连续查询功能,对用户的数据进行汇总、抽样等操作,再插入到另外的表中即可,虽然丢失了一定的精度,但是让数据占用的空间大大减小。
bucket
待查询的bucket名字
bucketID
bucketid
host
要查询的 InfluxDB 实例的 URL。
org
所属组织名称
orgID
所属组织id
token
需要提供查询的apitoken信息
from(
bucket: “example-bucket”,
host: “https://us-east-1-1.aws.cloud2.influxdata.com”,
org: “example-org”,
token: token
)
bucket
待查询的bucket名字
bucketID
bucketid
host
要查询的 InfluxDB 实例的 URL。
org
所属组织名称
orgID
所属组织id
token
需要提供查询的apitoken信息
timeColumn
是否添加time列,默认名称”_time”
tagColumns
输出tag列信息。默认为字符串类型的所有列,不包括所有值列和由 fieldFn 标识的列。
fieldFn
从输入中中获取记录并返回数据的函数。对于输入表中的每条数据(record),fieldFn 返回一条对应数据,将该输出字段键映射到输出值。
fieldFn: ® => ({“hum”: r.hum, “temp”: r.temp}) 如,对记录中的hum值指定名称为”hum”
to(
bucketID: “1234567890”,
orgID: “0987654321”,
timeColumn: “_time”,
tagColumns: [“tag1”, “tag2”, “tag3”],
fieldFn: ® => ({ [r._field]: r._value })
)
filter(fn: ® => r._measurement == “traffic-sip-1d” )
对数据进行分组或者合并
group()
对多个分组进行合并
group(columns:[“column_name”])
以某些列进行分组
group(columns:[“dim”])
from(bucket:“test-60d”)
|> range(start: -5m)
|> filter(fn: ® => r._measurement == “traffic-dip-1d”)
|> pivot(rowKey: [“_time”], columnKey: [“_field”], valueColumn: “_value”)
|> group()
|> map(fn:® => ({dim:r.dim,_time:r._time, up:r.up,down:r.down, total:r.total,session:r.session}))
|> top(n:10,columns: [“up”])
对每条记录进行map操作后写入新表,默认情况下不做处理的列会被删除,使用with则是在原有的基础上更新
map(fn:® => ({dim:r.dim,_time:r._time, up:r.up,down:r.down, total:r.total,session:r.session}))
map(fn: ® => ({ _value: r._value * r._value }))
map(fn: ® => ({ r with newColumn: r._value * 2 }))
以每一行为基础,从第一行往后累加,最后一行即为累加后的总数
cumulativeSum(columns: [“up”,“down”,“total”,“session” ])
取表中的最后一行数据
|> group(columns:[“dim”])
|> cumulativeSum(columns: [“up”,“down”,“total”,“session” ])
|> last(column:“_time”)
|> group()
不指定参数默认对于_value的个数进行计数
count()
count(column: “name”)
按照某一列进行排序
sort(columns: [“_value”], desc: false)
influxdb内部数据为key-value形式,在进行表的整体操作时,要以某些key为键汇聚数据,通常使用_time透视为列
from(bucket:“test-60d”)|> range(start: -task.every)
|> filter(fn: ® => r._measurement == “traffic-dip-1d”)
|> pivot(rowKey: [“_time”], columnKey: [“_field”], valueColumn: “_value”)
在influxdb2.x版本中,针对复杂场景下的数据处理,连续查询依然存在着一定的功能的不足,所以使用了任务机制进行了替代。
任务机制作为连续查询的功能升级,可以非常方便的将连续查询转换为任务,具体过程可参考官方文档:Migrate continuous queries to tasks | InfluxDB OSS 2.0 Documentation
influxdbtask是一个预定义的脚本,通过对输入数据流进行修改、分析,然后存储到新的bucket中或者执行其他操作。
influxdb任务由以下四部分组成:
以下为官方的示例:
// Task options____任务配置____每小时执行
option task ={
name:“cqinterval15m”,
every:1h,
offset:0m,
concurrency:1,
}
// Data source____指定数据源
data =from(bucket:“example-bucket”)
|>range(start:-task.every)
|>filter(fn:® =>
r._measurement=="mem"and
r.host ==“myHost”
)
data
// Data transformation____指定处理逻辑函数mean
|>aggregateWindow(
every:5m,
fn:mean
)
// Data destination 数据存储
|>to(bucket:“example-downsampled”)
pushdowns函数是flux内的定义,是对底层的数据源进行操作的函数或者函数组合,而不需要在内存中对数据进行处理。使用pushdown进行查询可以有效的提高查询效率。如果一但是非非pushdown操作,则需要将数据拉入到内存中进行操作。
如range为pushdown函数,它在处理中是在influxdb数据库源数据进行取数据,而sort为非pushdown函数,执行时必须将数据放到内存中进行排序。所以在实际执行中我们应该先利用range函数过滤数据再执行sort。
pushdown函数
count()
filter()*
keep()
mean()
drop()
fill()
last()
min()
duplicate()
first()
max()
range()
rename()
sum()
window()
pushdown函数组合
group()|>count()
group()|>max()
window()|>count()
window()|>max()
group() |> first()
group()|>min()
window()|> first()
window()|> in()
group() |> last()
group()|>sum()
window()|> last()
window()|>sum()
避免使用filter在过滤时进行实时的计算,而是应该尽量采用静态的方式,如果要动态的设置过滤器,请在filter之外使用变量定义。
不推荐的方式
推荐的方式
以下函数属于消耗内存和cpu较多的函数。
map
使用set而尽可能的少的使用map
reduce
join
join
union
联合不同的表
pivot
透视列
window类窗口函数对数据进行基于时间范围的分组,通常用于聚合和采样数,较短的窗口时间范围将会影响性能。总时长太长,窗口时间太少,将会产生很多的分组窗口数据,影响到查询的性能。
如果请求的数据范围较大,且精度较高则会查询出大量的数据造成性能的消耗,所以遇到该情况,应该利用任务对数据进行采样进行预处理
使用flux内置的profiler包可以对查询性能进行评估,通过直接导入该包,即能非常方便的输出统计信息。
query:提供有关整个 Flux 脚本执行的统计信息。
operator:提供有关查询中每个操作的统计信息。