• python分页爬取es日志,获取数据


     这里是分页爬取数据,判断msgId重复的数据,并写入文件。这里终点参考分页的方式

    此处是根据下面的代码判断数据是否读取完成的:

    1. # 检查是否有更多结果需要滚动
    2. scroll_id = query.get('_scroll_id')
    3. if scroll_id is None or len(query['hits']['hits']) == 0:
    4. break
    5. # 使用滚动ID进行下一次滚动查询
    6. query = es.scroll(scroll_id=scroll_id, scroll='25m')

    完整代码 

    1. from elasticsearch import Elasticsearch
    2. es = Elasticsearch(hosts="http://XXXX:9200/", http_auth=('elastic', '密码'))
    3. scroll_id = None
    4. fileName = "order-info-repeat" + ".txt"
    5. msgIdList = []
    6. msgIdListRepeat = []
    7. total=0
    8. repeatTotal=0
    9. query_json = {
    10. "_source": ["message", "logger_name", "@timestamp"],
    11. "query": {
    12. "bool": {
    13. "filter":
    14. [
    15. {
    16. "bool":
    17. {
    18. "filter":
    19. [
    20. {
    21. "multi_match":
    22. {
    23. "lenient": True,
    24. "query": "rocketMQ syncPassengerOrderCnosumer topicList",
    25. "type": "phrase"
    26. }
    27. }
    28. ]
    29. }
    30. },
    31. {
    32. "range":
    33. {
    34. "@timestamp":
    35. {
    36. "format": "strict_date_optional_time",
    37. "gte": "2023-07-27T02:30:00.000Z",
    38. "lte": "2023-07-27T04:00:00.000Z"
    39. }
    40. }
    41. }
    42. ],
    43. "must":
    44. [
    45. ],
    46. "must_not":
    47. [
    48. ],
    49. "should":
    50. [
    51. ]
    52. }
    53. }
    54. }
    55. query = es.search(index='indexName索引名称', body=query_json, scroll='25m', size=5000,request_timeout=2000000)
    56. while True:
    57. for k in query['hits']['hits']:
    58. timestr = k['_source']['@timestamp']
    59. result = k['_source']['message']
    60. orderIdStart = result.find('order_id') + 11
    61. orderIdEnd = orderIdStart + 19
    62. orderId = result[orderIdStart:orderIdEnd]
    63. msgIdStart = result.find('msgId:') + 7
    64. msgIdEnd = msgIdStart + 32
    65. msgId = result[msgIdStart:msgIdEnd]
    66. if msgId in msgIdList:
    67. msgIdListRepeat.append(msgId)
    68. repeatTotal=repeatTotal+1
    69. print("repeat,msgId:"+msgId+",orderId:"+orderId)
    70. with open(fileName, 'a+', encoding='utf-8') as f2:
    71. f2.writelines("repeat,msgId:"+msgId+",orderId:"+orderId + "\n")
    72. f2.close()
    73. msgIdList.append(msgId)
    74. total=total+len(query['hits']['hits'])
    75. print("total:"+str(total)+",repeatTotal:"+str(repeatTotal)+",len:"+str(len(query['hits']['hits'])))
    76. # 检查是否有更多结果需要滚动
    77. scroll_id = query.get('_scroll_id')
    78. if scroll_id is None or len(query['hits']['hits']) == 0:
    79. break
    80. # 使用滚动ID进行下一次滚动查询
    81. query = es.scroll(scroll_id=scroll_id, scroll='25m')

  • 相关阅读:
    IDEA 又双叒叕 更新 大版本了 , IntelliJ IDEA 2022.3 正式发布,详情 请参考博文
    Mysql之视图、索引【第五篇】
    内网隧道代理技术(二十八)之 DNS 隧道反弹Shell
    【Node.js从基础到高级运用】六、创建第一个 Node.js 应用
    前端小游戏——植物大战僵尸
    如何设计存储架构
    目标检测论文解读复现之七:基于SE-YOLOv5s的绝缘子检测
    可信执行环境简介:ARM 的 TrustZone
    查看apk签名
    bellman_ford AcWing 853. 有边数限制的最短路
  • 原文地址:https://blog.csdn.net/bxp1321/article/details/133325972