在旧的索引中更新mapping时,新增了分词器(分词器已经在模板中添加),但是在更新mapping时报错:
查看elasticsearch官网,发现不允许在已经存在的索引中动态更新分词器,只能先将索引close,更新分词器,然后再打开
Update index settings API | Elasticsearch Guide [8.3] | Elastic
You can only define new analyzers on closed indices. To add an analyzer, you must close the index, define the analyzer, and reopen the index. |
暂停数据写入,可以避免恢复阶段translog中大量数据回放,提升索引恢复速度。
关闭分片分配
PUT _cluster/settings { "persistent" : { "cluster.routing.rebalance.enable": "none" } } |
作用是将内存buffer flush到硬盘中,避免恢复阶段translog中大量数据回放,提升索引恢复速度。
(1)获取所有索引
GET /_cat/indices
(2)获取指定索引对应的setting
GET /prod_candidatev2_chunk_{i}/_settings
根据setting内容判断索引是否包含分词器 back_edge_ngram_analyzer word_analyzer 作为更新索引判断依据
(3)对指定索引进行flush操作以及sync操作,加速恢复,避免从主分片全量拉取数据同步至副本。
POST /index{i}/_flush
POST /index{i}/_flush/synced
POST /index{i}/_close
PUT index{i}/_settings { "analysis": { "filter":{ "back_edge_ngram_filter": { "min_gram": "1", "side": "back", "type": "edgeNGram", "max_gram": "256" } }, "analyzer": { "back_edge_ngram_analyzer": { "filter": [ "standard", "lowercase", "back_edge_ngram_filter" ], "tokenizer": "keyword" }, "word_analyzer": { "filter": [ "standard", "lowercase" ], "tokenizer": "keyword" }, "mk_edge2": { "type": "mk_edge", "extra_prarm": "cb,3,-1" }, "mk_edge1": { "type": "mk_edge", "extra_prarm": "wb,1,-1" } } } } |
GET _cluster/health
python处理脚本:
- import time
-
- from tqdm import trange
-
- from elasticsearch import Elasticsearch
-
- ES_HOST = ["http://elastic:"]
-
- client = Elasticsearch(ES_HOST)
-
- # index_pattern = "thoth_candidatev2_chunk_*"
- index_pattern = "prod_candidatev2_chunk_*"
- # index_pattern = "test_candidatev2_chunk_*"
-
- put_all_index = False
-
- def main():
- all_indices = get_all_indices()
- print("Number of indices: ", len(all_indices))
- all_indices = [index for index in all_indices if not is_updated_index_settings(index) ]
- print("Number of not updated indices: ", len(all_indices))
- for index in all_indices:
- if put_all_index or select():
- print(f"Start put {index} settings")
- put_index_settings(index)
- check_cluster_health(index)
- else:
- break
- print('Finished')
-
- def select():
- global put_all_index
-
- text = input("continue(all/y/n): ")
- if text == 'y':
- return True
- elif text == 'n':
- return False
- elif text == 'all':
- put_all_index = True
- return True
-
-
- def is_updated_index_settings(index):
- settings = client.indices.get_settings(index=index)
- analysis = settings[index]["settings"]["index"]["analysis"]
- if "word_analyzer" in analysis["analyzer"] and "back_edge_ngram_analyzer" in analysis["analyzer"] and "back_edge_ngram_filter" in analysis["filter"]:
- print(f"{index} done")
- return True
- else:
- return False
-
- def put_index_settings(index):
-
- if client.cat.indices(index=index,params={"h": "status"}).strip() != 'close':
- print(f"flush {index}")
- client.indices.flush(index=index)
- print(f"flush {index} done")
-
- close_index(index)
-
- body = '{"analysis":{"filter":{"back_edge_ngram_filter":{"min_gram":"1","side":"back","type":"edgeNGram","max_gram":"256"}},"analyzer":{"back_edge_ngram_analyzer":{"filter":["standard","lowercase","back_edge_ngram_filter"],"tokenizer":"keyword"},"word_analyzer":{"filter":["standard","lowercase"],"tokenizer":"keyword"},"mk_edge2":{"type":"mk_edge","extra_prarm":"cb,3,-1"},"mk_edge1":{"type":"mk_edge","extra_prarm":"wb,1,-1"}}}}'
- client.indices.put_settings(index=index, body=body)
-
- if not is_updated_index_settings(index):
- print(f"put index error: {index}")
- put_index_settings(index)
-
- open_index(index)
-
- def close_index(index):
- print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
- client.indices.close(index)
- print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
-
- def open_index(index):
- print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
- client.indices.open(index)
- print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
-
-
- def check_cluster_health(index):
- t = trange(100, desc="recover: ", leave=True)
- last_progress = 0
- while client.cluster.health()["status"] != "green":
- t.set_description(client.cluster.health()["status"])
- current_progress = client.cluster.health()["active_shards_percent_as_number"]
- t.update(current_progress - last_progress)
- last_progress = current_progress
- recovery_status = client.cat.recovery(index=index, params={"h": "index,shard,translog_ops_percent,files_percent,stage", "v": "true"})
- output = []
- for idx, item in enumerate(recovery_status.split('\n')):
- if idx == 0:
- output.append(item)
- else:
- output.append(item) if not item.endswith('done') else None
- if len(output) > 1:
- print('\n'.join(output))
- time.sleep(2)
- t.set_description(client.cluster.health()["status"])
- t.update(100 - last_progress)
- t.close()
-
- def get_all_indices():
- return client.indices.get_alias(index_pattern)
-
- def test_put_index():
- index = "test_candidatev2_chunk_{chunk}"
- body = ''
- for chunk in range(0, 10):
- client.indices.create(index=index.format(chunk=chunk), body=body)
-
-
- if __name__ == "__main__":
- # test_put_index()
- main()