• elasticsearch使用脚本 滚动关闭索引,更新index setting


    一 问题

         在旧的索引中更新mapping时,新增了分词器(分词器已经在模板中添加),但是在更新mapping时报错:

    查看elasticsearch官网,发现不允许在已经存在的索引中动态更新分词器,只能先将索引close,更新分词器,然后再打开

    Update index settings API | Elasticsearch Guide [8.3] | Elastic

    You can only define new analyzers on closed indices.
    
    To add an analyzer, you must close the index, define the analyzer, and reopen the index.

    二 问题解决方式步骤(已经验证)

    2.1 由暂停数据写入&关闭分片分配

        暂停数据写入,可以避免恢复阶段translog中大量数据回放,提升索引恢复速度。

    关闭分片分配

     PUT _cluster/settings
      { "persistent" : { "cluster.routing.rebalance.enable": "none" } }

    2.2 对所有需要进行更新分词器的索引执行flush操作。

        作用是将内存buffer flush到硬盘中,避免恢复阶段translog中大量数据回放,提升索引恢复速度。

       (1)获取所有索引

              GET /_cat/indices

     (2)获取指定索引对应的setting

             GET /prod_candidatev2_chunk_{i}/_settings

            根据setting内容判断索引是否包含分词器 back_edge_ngram_analyzer word_analyzer 作为更新索引判断依据

      (3)对指定索引进行flush操作以及sync操作,加速恢复,避免从主分片全量拉取数据同步至副本。

         POST /index{i}/_flush

        POST /index{i}/_flush/synced

    2.3 对索引进行关闭操作

      POST /index{i}/_close

    2.4 更新索引分词器

    PUT index{i}/_settings
    {
        
        "analysis": {
          "filter":{
            "back_edge_ngram_filter": {
              "min_gram": "1",
              "side": "back",
              "type": "edgeNGram",
              "max_gram": "256"
            }
          },
          "analyzer": {
            "back_edge_ngram_analyzer": {
              "filter": [
                "standard",
                "lowercase",
                "back_edge_ngram_filter"
              ],
              "tokenizer": "keyword"
            },
            "word_analyzer": {
              "filter": [
                "standard",
                "lowercase"
              ],
              "tokenizer": "keyword"
            },
            "mk_edge2": { 
                  "type": "mk_edge",
                  "extra_prarm": "cb,3,-1"
                },
                "mk_edge1": {  
                  "type": "mk_edge",
                  "extra_prarm": "wb,1,-1"
                }
          }
        }
      }

    2.5 轮询等待集群恢复green状态

    GET  _cluster/health

    2.6 重复2.2至2.5步骤,直至所有索引恢复

    python处理脚本:

     

    1. import time
    2. from tqdm import trange
    3. from elasticsearch import Elasticsearch
    4. ES_HOST = ["http://elastic:"]
    5. client = Elasticsearch(ES_HOST)
    6. # index_pattern = "thoth_candidatev2_chunk_*"
    7. index_pattern = "prod_candidatev2_chunk_*"
    8. # index_pattern = "test_candidatev2_chunk_*"
    9. put_all_index = False
    10. def main():
    11. all_indices = get_all_indices()
    12. print("Number of indices: ", len(all_indices))
    13. all_indices = [index for index in all_indices if not is_updated_index_settings(index) ]
    14. print("Number of not updated indices: ", len(all_indices))
    15. for index in all_indices:
    16. if put_all_index or select():
    17. print(f"Start put {index} settings")
    18. put_index_settings(index)
    19. check_cluster_health(index)
    20. else:
    21. break
    22. print('Finished')
    23. def select():
    24. global put_all_index
    25. text = input("continue(all/y/n): ")
    26. if text == 'y':
    27. return True
    28. elif text == 'n':
    29. return False
    30. elif text == 'all':
    31. put_all_index = True
    32. return True
    33. def is_updated_index_settings(index):
    34. settings = client.indices.get_settings(index=index)
    35. analysis = settings[index]["settings"]["index"]["analysis"]
    36. if "word_analyzer" in analysis["analyzer"] and "back_edge_ngram_analyzer" in analysis["analyzer"] and "back_edge_ngram_filter" in analysis["filter"]:
    37. print(f"{index} done")
    38. return True
    39. else:
    40. return False
    41. def put_index_settings(index):
    42. if client.cat.indices(index=index,params={"h": "status"}).strip() != 'close':
    43. print(f"flush {index}")
    44. client.indices.flush(index=index)
    45. print(f"flush {index} done")
    46. close_index(index)
    47. body = '{"analysis":{"filter":{"back_edge_ngram_filter":{"min_gram":"1","side":"back","type":"edgeNGram","max_gram":"256"}},"analyzer":{"back_edge_ngram_analyzer":{"filter":["standard","lowercase","back_edge_ngram_filter"],"tokenizer":"keyword"},"word_analyzer":{"filter":["standard","lowercase"],"tokenizer":"keyword"},"mk_edge2":{"type":"mk_edge","extra_prarm":"cb,3,-1"},"mk_edge1":{"type":"mk_edge","extra_prarm":"wb,1,-1"}}}}'
    48. client.indices.put_settings(index=index, body=body)
    49. if not is_updated_index_settings(index):
    50. print(f"put index error: {index}")
    51. put_index_settings(index)
    52. open_index(index)
    53. def close_index(index):
    54. print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
    55. client.indices.close(index)
    56. print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
    57. def open_index(index):
    58. print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
    59. client.indices.open(index)
    60. print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
    61. def check_cluster_health(index):
    62. t = trange(100, desc="recover: ", leave=True)
    63. last_progress = 0
    64. while client.cluster.health()["status"] != "green":
    65. t.set_description(client.cluster.health()["status"])
    66. current_progress = client.cluster.health()["active_shards_percent_as_number"]
    67. t.update(current_progress - last_progress)
    68. last_progress = current_progress
    69. recovery_status = client.cat.recovery(index=index, params={"h": "index,shard,translog_ops_percent,files_percent,stage", "v": "true"})
    70. output = []
    71. for idx, item in enumerate(recovery_status.split('\n')):
    72. if idx == 0:
    73. output.append(item)
    74. else:
    75. output.append(item) if not item.endswith('done') else None
    76. if len(output) > 1:
    77. print('\n'.join(output))
    78. time.sleep(2)
    79. t.set_description(client.cluster.health()["status"])
    80. t.update(100 - last_progress)
    81. t.close()
    82. def get_all_indices():
    83. return client.indices.get_alias(index_pattern)
    84. def test_put_index():
    85. index = "test_candidatev2_chunk_{chunk}"
    86. body = ''
    87. for chunk in range(0, 10):
    88. client.indices.create(index=index.format(chunk=chunk), body=body)
    89. if __name__ == "__main__":
    90. # test_put_index()
    91. main()

  • 相关阅读:
    【java面试题】Redis多线程模型怎么理解,那它会有线程安全问题吗?
    bash if条件判断
    如何提升网站图片的加载速度呢?
    全套阿里巴巴开发手册和大神解读一起给你【2022年整理】
    zabbix的安装配置,邮件告警,钉钉告警
    Django第三章(模版系统全局变量-if判断-for循环-过滤器-模版继承/引用-引用静态文件)
    c语言程序范例
    【Servlet】Servlet API
    linux mysql 安装
    系统认知篇:防腐层、门面模式及适配模式的本质
  • 原文地址:https://blog.csdn.net/leixingbang1989/article/details/126499251