• Lostash同步Mysql数据到Elasticsearch(三)Elasticsearch模板与索引设置


    logstash数据同步ES相关

    同步数据时,Elasticsearch配合脚本的相关处理设置

    1.模板创建更新

    1. 在kibana中执行,或者直接给ES发送请求,你懂得,不懂得百度下ES创建template
    2. PUT /_template/test-xxx
    1. {
    2. "template": "idx_znyw_data_gkb_logstash",
    3. "order": 1,
    4. "settings": {
    5. "number_of_shards": 1
    6. },
    7. "mappings": {
    8. "dynamic": "true",
    9. "dynamic_date_formats": [
    10. "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
    11. "yyyy-MM-dd'T'HH:mm:ss.SSS+0800",
    12. "yyyy-MM-dd'T'HH:mm:ss'Z'",
    13. "yyyy-MM-dd'T'HH:mm:ss+0800",
    14. "yyyy-MM-dd'T'HH:mm:ss",
    15. "yyyy-MM-dd HH:mm:ss",
    16. "yyyy-MM-dd"
    17. ],
    18. "properties": {
    19. "@timestamp": {
    20. "type": "date"
    21. },
    22. "@version": {
    23. "type": "integer"
    24. },
    25. "_class": {
    26. "type": "text",
    27. "fields": {
    28. "keyword": {
    29. "type": "keyword",
    30. "ignore_above": 256
    31. }
    32. }
    33. },
    34. "id": {
    35. "type": "long"
    36. },
    37. "location": {
    38. "properties": {
    39. "lat": {
    40. "type": "double"
    41. },
    42. "lon": {
    43. "type": "double"
    44. }
    45. }
    46. },
    47. "latitude": {
    48. "type": "double"
    49. },
    50. "longitude": {
    51. "type": "double"
    52. },
    53. "ipAddr": {
    54. "type": "ip",
    55. "index": true,
    56. "store": false
    57. },
    58. "setupTime": {
    59. "type": "date",
    60. "index": true,
    61. "store": false,
    62. "format": "date",
    63. "ignore_malformed": false
    64. },
    65. "updateDate": {
    66. "type": "date",
    67. "index": true,
    68. "store": false,
    69. "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd'T'HH:mm:ss||yyyy-MM-dd'T'HH:mm:ss+0800||yyyy-MM-dd'T'HH:mm:ss'Z'||yyyy-MM-dd'T'HH:mm:ss.SSS+0800||yyyy-MM-dd'T'HH:mm:ss.SSS'Z'||yyyy-MM-dd||epoch_millis",
    70. "ignore_malformed": false
    71. },
    72. "delFlag": {
    73. "type": "integer"
    74. }
    75. }
    76. }
    77. }

    2.特殊版本类型logstash脚本处理

    上面红色部分的需要特殊处理,下面给出特殊操作的logstash脚本处理方法,在filter中特殊处理,这里是直接上干货,其他的概念性的问题,百度一下

    1. filter {
    2. #坐标
    3. mutate {
    4. add_field => ["[location][lon]", "%{[longitude]}"]
    5. add_field => ["[location][lat]", "%{[latitude]}"]
    6. }
    7. #时间
    8. ruby {
    9. code => "
    10. date_person = event.get('setupTime').time.localtime + 0*60*60
    11. event.set('setupTime', date_person.strftime('%Y-%m-%d'))
    12. "
    13. }
    14. ruby {
    15. code => "
    16. event.set('createDate', event.get('createDate').time.localtime + 0*60*60)"
    17. }
    18. }

    3.索引更新设置

    1. #在kibana中执行,执行前要关闭索引,但是注意,这里不能修改分片数,分片数只能在索引创建时指定,那数据量超量时,分片数不够用怎么整呢,看标题4
    2. //先把索引关掉
    3. POST /idx_xxx_sysinfo/_close
    4. POST /idx_xxx_sysinfo/_open
    5. //更新索引
    6. PUT /idx_xxx_sysinfo/_settings
    1. {
    2. "index":{
    3. "number_of_replicas": 2,
    4. "max_result_window": 65536,
    5. "max_inner_result_window": 10000,
    6. "translog.durability": "request",
    7. "translog.sync_interval": "3s",
    8. "auto_expand_replicas": false,
    9. "analysis.analyzer.default.type": "ik_max_word",
    10. "analysis.search_analyzer.default.type": "ik_smart",
    11. "shard.check_on_startup": false,
    12. "codec": "default",
    13. "store.type": "niofs"
    14. }
    15. }

    4.分片数量不够时(ES内部内部数据迁移策略)

    ES内部内部数据迁移策略

    使用场景:

    1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。

    2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,

    所以这种情况下也可以考虑尝试使用Reindex。

    1. POST _reindex?slices=5&refresh
    2. {
    3. "source": {
    4. "index": "idx_znyg_datanbqseries_new",
    5. "size": 10000 //批量执行数
    6. },
    7. "dest": {
    8. "index": "idx_znyg_datanbqseries"
    9. //version_type"version_type": "internal"或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:
    10. "version_type": "internal"
    11. }
    12. }

    slices大小设置注意事项:
    1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
    2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
    3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。

    效果
    实践证明,比默认设置reindex速度能提升10倍+。

    创建索引

    1. PUT /idx_znyg_datanbqseries_new?pretty
    2. {
    3. "settings": {
    4. "index.number_of_shards": 12,
    5. "index.number_of_replicas": 1,
    6. "index.max_result_window": 65536,
    7. "index.max_inner_result_window": 10000,
    8. "index.translog.durability": "request",
    9. "index.translog.sync_interval": "3s",
    10. "index.auto_expand_replicas": false,
    11. "index.analysis.analyzer.default.type": "ik_smart",
    12. "index.analysis.search_analyzer.default.type": "ik_smart",
    13. "index.shard.check_on_startup": false,
    14. "index.codec": "default",
    15. "index.store.type": "niofs"
    16. },
    17. "mappings": {
    18. }
    19. }

    5.logstash大数据量脚本加速

    脚本执行SQL分析,人为加速处理设置。

    1. 如下SQL为logstash数据同步执行脚本,分析设置后,做如下处理,OFFSET -500获取上次执行完数据,拿最小ID保存至ID文件内,关闭服务重启启动,此时速度飞起,当OFFSET 数量过500万时,每次查询30S+,所以速度就下降了。
    2. SELECT * FROM (select del_flag as delFlag from znyw_data_nbq_series WHERE id>= 1347422522840252418) AS `t1` LIMIT 500 OFFSET 500

    6.logstash特性(重要)

    有时候发现数据库数量和索引数量不匹配,需要重新同步,请按照如下操作处理:
    1.停止服务
    2.删除索引更新数据文件
    3.重启启动服务查看日志
    这样操作是因为,增量id更新,没有读文件,这样操作最保险,还有就是数据不一致不要慌,查看任务执行日志,有些脚本过滤插件有问题的也不会更新。

  • 相关阅读:
    以太坊智能合约的技术与组件
    Universal Data Access Components (UniDAC) 9.3.0
    隐藏的增长动力:中国工业的售后服务
    C++ 算数运算符 学习资料
    轻松学习jQuery控制DOM
    胖小酱之贪婪的萨米提寓言故事
    基于C语言的使用checksum进行差错检测
    day31 文件上传&js验证&mime&user.ini&语言特性
    云汉芯城一站式电子制造平台启想智联顺利通过IATF16949:2016质量管理体系认证
    基于stm32单片机体重秤电子秤超重提醒
  • 原文地址:https://blog.csdn.net/vc33569/article/details/133046728