同步数据时,Elasticsearch配合脚本的相关处理设置
- 在kibana中执行,或者直接给ES发送请求,你懂得,不懂得百度下ES创建template
- PUT /_template/test-xxx
- {
- "template": "idx_znyw_data_gkb_logstash",
- "order": 1,
- "settings": {
- "number_of_shards": 1
- },
- "mappings": {
- "dynamic": "true",
- "dynamic_date_formats": [
- "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
- "yyyy-MM-dd'T'HH:mm:ss.SSS+0800",
- "yyyy-MM-dd'T'HH:mm:ss'Z'",
- "yyyy-MM-dd'T'HH:mm:ss+0800",
- "yyyy-MM-dd'T'HH:mm:ss",
- "yyyy-MM-dd HH:mm:ss",
- "yyyy-MM-dd"
- ],
- "properties": {
- "@timestamp": {
- "type": "date"
- },
- "@version": {
- "type": "integer"
- },
- "_class": {
- "type": "text",
- "fields": {
- "keyword": {
- "type": "keyword",
- "ignore_above": 256
- }
- }
- },
- "id": {
- "type": "long"
- },
- "location": {
- "properties": {
- "lat": {
- "type": "double"
- },
- "lon": {
- "type": "double"
- }
- }
- },
- "latitude": {
- "type": "double"
- },
- "longitude": {
- "type": "double"
- },
- "ipAddr": {
- "type": "ip",
- "index": true,
- "store": false
- },
- "setupTime": {
- "type": "date",
- "index": true,
- "store": false,
- "format": "date",
- "ignore_malformed": false
- },
-
- "updateDate": {
- "type": "date",
- "index": true,
- "store": false,
- "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd'T'HH:mm:ss||yyyy-MM-dd'T'HH:mm:ss+0800||yyyy-MM-dd'T'HH:mm:ss'Z'||yyyy-MM-dd'T'HH:mm:ss.SSS+0800||yyyy-MM-dd'T'HH:mm:ss.SSS'Z'||yyyy-MM-dd||epoch_millis",
- "ignore_malformed": false
- },
- "delFlag": {
- "type": "integer"
- }
- }
- }
- }
上面红色部分的需要特殊处理,下面给出特殊操作的logstash脚本处理方法,在filter中特殊处理,这里是直接上干货,其他的概念性的问题,百度一下。
- filter {
-
- #坐标
- mutate {
- add_field => ["[location][lon]", "%{[longitude]}"]
- add_field => ["[location][lat]", "%{[latitude]}"]
-
- }
-
-
- #时间
- ruby {
- code => "
- date_person = event.get('setupTime').time.localtime + 0*60*60
- event.set('setupTime', date_person.strftime('%Y-%m-%d'))
- "
- }
- ruby {
- code => "
- event.set('createDate', event.get('createDate').time.localtime + 0*60*60)"
- }
-
- }
- #在kibana中执行,执行前要关闭索引,但是注意,这里不能修改分片数,分片数只能在索引创建时指定,那数据量超量时,分片数不够用怎么整呢,看标题4
- //先把索引关掉
- POST /idx_xxx_sysinfo/_close
- POST /idx_xxx_sysinfo/_open
- //更新索引
- PUT /idx_xxx_sysinfo/_settings
- {
-
- "index":{
- "number_of_replicas": 2,
- "max_result_window": 65536,
- "max_inner_result_window": 10000,
- "translog.durability": "request",
- "translog.sync_interval": "3s",
- "auto_expand_replicas": false,
- "analysis.analyzer.default.type": "ik_max_word",
- "analysis.search_analyzer.default.type": "ik_smart",
- "shard.check_on_startup": false,
- "codec": "default",
- "store.type": "niofs"
- }
- }
4.分片数量不够时(ES内部内部数据迁移策略)
ES内部内部数据迁移策略
使用场景:
1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。
2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,
所以这种情况下也可以考虑尝试使用Reindex。
- POST _reindex?slices=5&refresh
- {
- "source": {
- "index": "idx_znyg_datanbqseries_new",
- "size": 10000 //批量执行数
- },
- "dest": {
- "index": "idx_znyg_datanbqseries",
- //version_type"version_type": "internal"或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:
- "version_type": "internal"
- }
- }
slices大小设置注意事项:
1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。
效果
实践证明,比默认设置reindex速度能提升10倍+。
创建索引
-
- PUT /idx_znyg_datanbqseries_new?pretty
- {
- "settings": {
- "index.number_of_shards": 12,
- "index.number_of_replicas": 1,
- "index.max_result_window": 65536,
- "index.max_inner_result_window": 10000,
- "index.translog.durability": "request",
- "index.translog.sync_interval": "3s",
- "index.auto_expand_replicas": false,
- "index.analysis.analyzer.default.type": "ik_smart",
- "index.analysis.search_analyzer.default.type": "ik_smart",
- "index.shard.check_on_startup": false,
- "index.codec": "default",
- "index.store.type": "niofs"
- },
- "mappings": {
- }
-
- }
脚本执行SQL分析,人为加速处理设置。
- 如下SQL为logstash数据同步执行脚本,分析设置后,做如下处理,OFFSET -500获取上次执行完数据,拿最小ID保存至ID文件内,关闭服务重启启动,此时速度飞起,当OFFSET 数量过500万时,每次查询30S+,所以速度就下降了。
- SELECT * FROM (select del_flag as delFlag from znyw_data_nbq_series WHERE id>= 1347422522840252418) AS `t1` LIMIT 500 OFFSET 500
有时候发现数据库数量和索引数量不匹配,需要重新同步,请按照如下操作处理:
1.停止服务
2.删除索引更新数据文件
3.重启启动服务查看日志
这样操作是因为,增量id更新,没有读文件,这样操作最保险,还有就是数据不一致不要慌,查看任务执行日志,有些脚本过滤插件有问题的也不会更新。