• 使用Elasticsearch Python SDK 查询EasySearch


    随着数据分析需求的不断增长,高效查询和分析大数据集变得越来越重要。Easysearch 作为一种强大的国产化搜索和分析引擎,同时作为 Elasticsearch 国产替代方案,支持原生 DSL 查询语法和 SQL 查询,确保原业务代码无需调整即可无缝迁移。Easysearch 兼容 ES 7.x 现有的 SDK 和索引存储格式,支持冷热架构和索引生命周期管理,为用户提供了全面的数据处理解决方案。本文将详细介绍如何使用 ES 7.x Python SDK 与 Easysearch 进行交互,包括安装、连接、数据操作和查询等方面。

    1. 安装 Elasticsearch Python 客户端

    要使用Elasticsearch Python客户端,首先需要通过pip进行安装。打开终端或命令提示符,并运行以下命令:

    pip install elasticsearch==7.13.1
    

    如果使用默认版本安装,会安装8.x的依赖,可能会报错 elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product.

    由于Elasticsearch 7.10.2以后变更了许可模式,引入了 Server Side Public License (SSPL) 和 Elastic License,很多基于Elasticsearch 7.10.2分支出来的搜索引擎需要使用7.x版本的SDK和agent,比如Beats全家桶。

    在这里插入图片描述
    这是一个获取集群信息的demo,使用es.cluster.health() 调用 Elasticsearch 集群的健康检查API,返回集群的健康状态。

    由于使用了自签名证书,所以在初始化时加上 verify_certs=False 参数,同时使用 warnings.filterwarnings("ignore") 设置 Python 的警告系统,忽略所有发出的警告。这在生产代码中通常不推荐,因为它会隐藏潜在的问题,但在开发或测试环境中,如果警告信息太多干扰调试,可能会暂时使用。

    import urllib3
    import elasticsearch
    from elasticsearch import Elasticsearch
    import warnings
    from pprint import pprint
    
    # 禁用所有警告
    warnings.filterwarnings("ignore")
    
    print(elasticsearch.VERSION)
    # 禁用警告
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    url = "https://ip:9200/"
    user_passwd = ('user', 'passwd')
    
    # 建立连接
    es = Elasticsearch(
        [url],
        http_auth=user_passwd,
        verify_certs=False,
    )
    
    # 检查集群健康状态
    health = es.cluster.health()
    pprint(health)
    

    2. 准备示例数据

    在进行查询之前,我们需要在 Easysearch 中创建一些示例数据。

    # 定义示例数据
    sample_data = [
        {"index": {"_index": "my_index"}},
        {"field": "value1", "another_field": 10},
        {"index": {"_index": "my_index"}},
        {"field": "value2", "another_field": 20},
        {"index": {"_index": "my_index"}},
        {"field": "value3", "another_field": 30},
        {"index": {"_index": "my_index"}},
        {"field": "bulk_value1", "another_field": 100},
        {"index": {"_index": "my_index"}},
        {"field": "bulk_value2", "another_field": 200},
        {"index": {"_index": "my_index"}},
        {"field": "bulk_value3", "another_field": 300}
    ]
    
    # 批量插入示例数据
    response = es.bulk(body=sample_data)
    print(response)
    

    3. 使用 REST API 进行查询

    REST API 是与 Easysearch 进行通信的常用方式。通过 REST API,开发者可以发送 HTTP 请求来执行各种操作,包括索引文档、搜索数据等。以下示例展示了如何在 Python 中执行 REST 查询。

    由于是REST API,我们可以先使用Postman进行测试。

    在这里插入图片描述

    我们可以看到HTTP端点可以正常返回,然后就可以使用编程方式进行访问了:

    import requests
    from requests.auth import HTTPBasicAuth
    from pprint import pprint
    
    url = "https://ip:9200/"
    user_passwd = ('user', 'passwd')
    
    # 构建查询参数
    query = {
        "query": {
            "match": {
                "field": "value1"
            }
        }
    }
    
    dsl = f"{url}/my_index/_search"
    
    response = requests.get(dsl, json=query, auth=HTTPBasicAuth(*user_passwd), verify=False)
    pprint(response.json())
    
    # 处理查询结果
    if response.status_code == 200:
        results = response.json()
        for hit in results['hits']['hits']:
            print(hit)
    else:
        print(f"Error: {response.status_code}")
    

    4. 通过 DSL 对索引数据进行增删改查

    DSL(Domain-Specific Language)是 Easysearch 的原生查询语言,允许用户构建复杂的查询。以下是一些示例:

    # 构建 DSL 查询
    dsl_query = {
        "query": {
            "match": {
                "field": "value1"
            }
        }
    }
    
    # 执行 DSL 查询
    response = es.search(index="my_index", body=dsl_query)
    
    results = response.get("hits")
    # 处理查询结果
    if results:
        for hit in results['hits']:
            print(hit)
    else:
        print(f"Error: {response.status_code}")
    

    插入数据

    如果不指定document ID,那么随机生成一个ID并写入。

    doc = {"field": "value4", "another_field": 9999}
    response = es.index(index="my_index", body=doc)
    print(response)
    

    更新数据

    指定ID为1来手动更新索引:

    doc = {"field": "value4", "another_field": 9999}
    response = es.index(index="my_index", body=doc, id=1)
    print(response)
    

    更新单条数据

    # 更新单条数据
    update_body = {"doc": {"another_field": 50}}
    response = es.update(index="my_index", id="1", body=update_body)
    pprint(response)
    

    删除数据

    # 删除单条数据
    response = es.delete(index="my_index", id="1")
    pprint(response)
    

    5. 索引数据 SQL 查询

    创建客户端实例后,我们可以使用 sql 方法执行 SQL 查询。以下示例展示了如何执行一个简单的 SELECT 查询。

    # 执行 SQL 查询
    query_sql = {
        "query": "SELECT * FROM my_index"
    }
    
    res = es.sql.query(body=query_sql)
    pprint(res)
    

    6. 索引数据批量操作

    Bulk API 允许用户一次性对多个文档进行创建、更新或删除操作,极大提高了操作效率。以下是一些示例:

    批量插入数据

    # 定义批量插入数据
    bulk_data = [
        {"index": {"_index": "my_index"}},
        {"field": "bulk_value1", "another_field": 100},
        {"index": {"_index": "my_index"}},
        {"field": "bulk_value2", "another_field": 200},
        {"index": {"_index": "my_index"}},
        {"field": "bulk_value3", "another_field": 300}
    ]
    
    # 执行批量插入操作
    response = es.bulk(body=bulk_data)
    pprint(response)
    

    批量更新数据

    # 定义批量更新数据
    bulk_update_data = [
        {"update": {"_id": "1", "_index": "my_index"}},
        {"doc": {"another_field": 110}},
        {"update": {"_id": "2", "_index": "my_index"}},
        {"doc": {"another_field": 220}}
    ]
    
    # 执行批量更新操作
    response = es.bulk(body=bulk_update_data)
    pprint(response)
    

    批量删除数据

    # 定义批量删除数据
    bulk_delete_data = [
        {"delete": {"_id": "1", "_index": "my_index"}},
        {"delete": {"_id": "2", "_index": "my_index"}}
    ]
    
    # 执行批量删除操作
    response = es.bulk(body=bulk_delete_data)
    print(response)
    

    7. 索引级别的操作

    接下来,介绍索引创建、删除和检查索引是否存在操作。以下是一些示例:

    创建索引

    # 创建索引
    index_body = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "mappings": {
            "properties": {
                "field": {"type": "text"},
                "another_field": {"type": "integer"}
            }
        }
    }
    response = es.indices.create(index="new_index", body=index_body)
    pprint(response)
    

    删除索引

    # 删除索引
    response = es.indices.delete(index="new_index")
    pprint(response)
    

    检查索引是否存在

    # 检查索引是否存在
    response = es.indices.exists(index="new_index")
    pprint(response)
    

    8. 总结

    Easysearch 虽然没有专门的 Python SDK,但完全兼容 ES 7.x 的 Python SDK 客户端,这为开发者提供了极大的便利。通过使用 ES 7.x Python SDK,开发者可以轻松地使用 DSL 和 SQL 语法对 Easysearch 进行查询和数据操作。Easysearch 主要优势包括:

    1. 兼容性强:无需修改现有代码,即可从 ES 迁移到 Easysearch。
    2. 功能全面:支持 DSL 查询、SQL 查询、批量操作等高级功能。
    3. 易于使用:提供简洁明了的 API,降低学习成本。
    4. 高效性能:批量操作 API 大幅提高数据处理效率。

    Easysearch 结合 ES 7.x Python SDK 的强大功能,为开发者提供了一个高效、灵活的大数据处理平台。无论是执行简单的 SQL 查询,还是构建复杂的 DSL 查询,都能满足各种数据分析需求。如果您正在寻找一个强大的搜索和分析解决方案,Easysearch 绝对值得一试。它不仅能帮助您更高效地处理和分析大数据集,还能为数据驱动的决策提供有力支持。

  • 相关阅读:
    电脑上玩GBA游戏(GBA模拟器)
    Java基于微信小程序的一起考研学习平台
    视野修炼-技术周刊第57期
    【数据挖掘】6. 核函数
    Android广播BroadcastReceiver介绍
    哈希表、unordered_map和unordered_set模拟
    什么品牌的台灯适合学生用?视力专家推荐的护眼台灯
    一文吃透JavaScript程序控制流与函数
    三、RTMP协议 视频Chunk和音频Chunk到底长啥样?
    金字塔原理
  • 原文地址:https://blog.csdn.net/weixin_38781498/article/details/140337093