这里是分页爬取数据,判断msgId重复的数据,并写入文件。这里终点参考分页的方式
此处是根据下面的代码判断数据是否读取完成的:
- # 检查是否有更多结果需要滚动
- scroll_id = query.get('_scroll_id')
- if scroll_id is None or len(query['hits']['hits']) == 0:
- break
- # 使用滚动ID进行下一次滚动查询
- query = es.scroll(scroll_id=scroll_id, scroll='25m')
-
完整代码
- from elasticsearch import Elasticsearch
-
-
- es = Elasticsearch(hosts="http://XXXX:9200/", http_auth=('elastic', '密码'))
- scroll_id = None
- fileName = "order-info-repeat" + ".txt"
- msgIdList = []
- msgIdListRepeat = []
- total=0
- repeatTotal=0
- query_json = {
- "_source": ["message", "logger_name", "@timestamp"],
- "query": {
- "bool": {
- "filter":
- [
- {
- "bool":
- {
- "filter":
- [
- {
- "multi_match":
- {
- "lenient": True,
- "query": "rocketMQ syncPassengerOrderCnosumer topicList",
- "type": "phrase"
- }
- }
- ]
- }
- },
- {
- "range":
- {
- "@timestamp":
- {
- "format": "strict_date_optional_time",
- "gte": "2023-07-27T02:30:00.000Z",
- "lte": "2023-07-27T04:00:00.000Z"
- }
- }
- }
- ],
- "must":
- [
- ],
- "must_not":
- [
- ],
- "should":
- [
- ]
- }
- }
- }
- query = es.search(index='indexName索引名称', body=query_json, scroll='25m', size=5000,request_timeout=2000000)
- while True:
- for k in query['hits']['hits']:
- timestr = k['_source']['@timestamp']
- result = k['_source']['message']
- orderIdStart = result.find('order_id') + 11
- orderIdEnd = orderIdStart + 19
- orderId = result[orderIdStart:orderIdEnd]
- msgIdStart = result.find('msgId:') + 7
- msgIdEnd = msgIdStart + 32
- msgId = result[msgIdStart:msgIdEnd]
- if msgId in msgIdList:
- msgIdListRepeat.append(msgId)
- repeatTotal=repeatTotal+1
- print("repeat,msgId:"+msgId+",orderId:"+orderId)
- with open(fileName, 'a+', encoding='utf-8') as f2:
- f2.writelines("repeat,msgId:"+msgId+",orderId:"+orderId + "\n")
- f2.close()
- msgIdList.append(msgId)
- total=total+len(query['hits']['hits'])
- print("total:"+str(total)+",repeatTotal:"+str(repeatTotal)+",len:"+str(len(query['hits']['hits'])))
- # 检查是否有更多结果需要滚动
- scroll_id = query.get('_scroll_id')
- if scroll_id is None or len(query['hits']['hits']) == 0:
- break
- # 使用滚动ID进行下一次滚动查询
- query = es.scroll(scroll_id=scroll_id, scroll='25m')
-