• Elasticsearch:使用 Streamlit、语义搜索和命名实体提取开发 Elastic Search 应用程序


    作者:Camille Corti-Georgiou

    介绍

    一切都是一个搜索问题。 我在 Elastic 工作的第一周就听到有人说过这句话,从那时起,这句话就永久地印在了我的脑海中。 这篇博客的目的并不是我出色的同事对我所做的相关陈述进行分析,但我首先想花点时间剖析一下这个陈述。

    自成立以来,Elasticsearch 一直处于技术前沿 - 打破软件领域的模式,为世界各地家喻户晓的公司的技术支柱提供动力。 我们倾向于将 Elastic 的产品分为几个 “OTB” 解决方案 - 安全性、可观察性等,但剥离这些,我们正在解决的问题基本上是面向搜索的。 它是关于能够询问你的数据问题并返回有意义且相关的结果,无论横幅如何,正是这一点使 Elastic 成为一项如此强大的技术。

    作为对搜索的颂歌和 Elastic 的一些功能的展示,本博客将带你完成搜索应用程序的端到端开发,使用机器学习模型进行命名实体提取 (NER) 和语义搜索; 将 Elastic 和非 Elastic 组件结合起来,并通过简单的 UI 对它们进行分层,以展示搜索的强大功能。

    该指南专为与 Elastic Cloud 一起使用而设计,但是,相同的方法也可以应用于本地托管的实例,只需更改身份验证方法和其他特定于云的概念。 完整的 git 存储库位于:Kibana Search Project

    涵盖的主题

    • Logstash
    • 摄取管道
    • ELSER
    • 自定义机器学习模型
    • Streamlit

    注意:为了演示方便,在今天的展示中,我将使用本地资管理的 Elasticsearch 来进行演示。在下面的展示中,我们将使用 Elastic Stack 8.12 来进行展示。

    第 1 步:从 Kaggle 下载 BBC 新闻数据集

    我们将使用的数据集是 BBC 新闻数据集,可从 BBC 新闻数据集以 CSV 形式获取。 这是一个自动更新的数据集,收集来自 BBC 新闻的 RSS 源。 该数据集包括已发表文章的标题、描述、日期、url 和各种其他属性。 我们将使用 Logstash 来提取数据,但是,其他方法(即 Python 客户端或标准上传)同样有效(将数据添加到 Elasticsearch | Elasticsearch 服务文档 | Elastic)。

    原始数据模式有点问题,因此,如果使用 Logstash,需要对文件结构进行细微调整。 解压缩下载的文件并运行脚本,修改输入/输出以反映本地计算机上保存的文件的位置和名称。 该脚本将对列重新排序,使文章的 “Publish Date” 排在第一位,以方便对日期字段的解释。

    convert.py

    1. import csv
    2. input_csv = 'path/bbc_news.csv'
    3. output_csv = 'path/new-bbc_news.csv'
    4. # Read in the old bbc_news CSV file
    5. with open(input_csv, 'r') as infile:
    6. reader = csv.DictReader(infile)
    7. data = [row for row in reader]
    8. # Write the file in the desired format
    9. with open(output_csv, 'w', newline='') as outfile:
    10. fieldnames = ['pubDate', 'title', 'guid', 'link', 'description']
    11. writer = csv.DictWriter(outfile, fieldnames=fieldnames)
    12. writer.writeheader()
    13. # Write data in new format
    14. for row in data:
    15. writer.writerow({
    16. 'pubDate': row['pubDate'],
    17. 'title': row['title'],
    18. 'guid': row['guid'],
    19. 'link': row['link'],
    20. 'description': row['description']
    21. })
    22. print(f'Success. Output saved to {output_csv}')
    1. $ pwd
    2. /Users/liuxg/data/bbs
    3. $ ls
    4. bbc_news.csv bbs.zip convert.py
    5. $ python convert.py
    6. Success. Output saved to ./new-bbc_news.csv
    7. $ cat new-bbc_news.csv
    8. pubDate,title,guid,link,description
    9. "Mon, 07 Mar 2022 08:01:56 GMT",Ukraine: Angry Zelensky vows to punish Russian atrocities,https://www.bbc.co.uk/news/world-europe-60638042,https://www.bbc.co.uk/news/world-europe-60638042?at_medium=RSS&at_campaign=KARANGA,The Ukrainian president says the country will not forgive or forget those who murder its civilians.

    步骤 2:使用 Docker 上传并启动自定义 NER ML 模型

    接下来,我们将为 NER 任务导入自定义 ML 模型,详细文档可以在此处找到:如何部署命名实体识别 | Elastic Stack 中的机器学习 [8.11]。 本教程使用 docker 上传自定义模型,但是,有关其他安装方法,请参阅:自定义机器学习模型和地图简介 | Elastic 博客。你也可以参考文章 “Elasticsearch:如何部署 NLP:命名实体识别 (NER) 示例”。

    虽然我们可以使用许多模型来进行 NER,但我们将使用来自 distilbert-base-uncased · Hugging Face 的 “distilbert-base-uncased”。 该模型针对小写文本进行了优化,事实证明有利于从非结构化数据中精确提取实体,在我们的例子中 - 在命名实体识别的帮助下,我们可以从新闻文章中提取人物、地点、组织等,以供下游使用。

    要为此任务创建一次性 API 密钥,我们可以调用 _security 端点,指定我们的密钥要求 API 密钥生成。 确保复制请求生成的编码值,因为以后无法检索该值。 我们创建的 API 密钥将仅用于此上传,因此我们可以分配有限的权限和到期日期:

    1. POST /_security/api_key
    2. {
    3. "name": "ml-upload-key",
    4. "expiration": "2d",
    5. "role_descriptors": {
    6. "ml_admin": {
    7. "cluster": ["manage_ml", "monitor"],
    8. "index": [
    9. {
    10. "names": ["*"],
    11. "privileges": ["write", "read", "view_index_metadata"]
    12. }
    13. ]
    14. }
    15. }
    16. }

    要将模型导入集群,请确保 Docker Desktop 已启动并正在运行,并在终端中运行以下命令; 设置 “CLOUD_ID” 和 “API_KEY” 的值以反映与你的云集群关联的值。

    1. docker run -it --rm docker.elastic.co/eland/eland:latest \
    2. eland_import_hub_model \
    3. --cloud-id $CLOUD_ID \
    4. --es-api-key $API_KEY \
    5. --hub-model-id "elastic/distilbert-base-uncased-finetuned-conll03-english" \
    6. --task-type ner \
    7. --start

    如果遇到错误,请确保您的 Cloud ID 和身份验证凭据正确,并且 Docker 按预期运行。

    针对我们的自签名的 Elasticsearch 集群,我们可以使用如下的命令来进行:

    1. docker run -it --rm docker.elastic.co/eland/eland:latest \
    2. eland_import_hub_model \
    3. --url https://192.168.0.3:9200/ \
    4. --es-api-key RG9WU0NZNEJ1ODV6ZzUtNllLa3E6UmxQR1lSaVJTeE96TzdPZ05EdzN5dw== \
    5. --hub-model-id "elastic/distilbert-base-uncased-finetuned-conll03-english" \
    6. --task-type ner \
    7. --insecure \
    8. --start

    我们可以在 Kibana 中进行查看:

    从上面,我们可以看出来上传的模型已经成功地被部署了。

    第 3 步:下载 ELSER

    在此步骤中,我们将把 Elastic 的 “域外模型” ELSER 下载到堆栈中。 导航到  Machine Learning -> Model Management -> Trained Models 并选择 elser_model_2 上的下载。 有关在非云环境中安装 ELSER 的更多信息,请访问:ELSER – Elastic Learned Sparse EncodeR | Elastic Stack 中的机器学习 [8.12]

    针对本地部署的 Elasticsearch,你可以参考文章 “Elasticsearch:部署 ELSER - Elastic Learned Sparse EncoderR”。你也可以参考文章 “Elastic Search:构建语义搜索体验”。

    最终,我们可以看到如下的画面:

    第 4 步:在 Elastic 中添加映射和管道

    Elastic 中的映射定义了数据的 schema。 我们需要为 BBC 新闻索引添加正式映射,以确保数据按预期键入,并且当我们将数据发送到集群时,Elastic 能够理解其结构。 作为此映射的一部分,我们排除了 ELSER 模型生成的标记以防止映射爆炸,并定义了 NER 模型生成的许多标签。 导航到开发工具并创建映射:

    1. PUT bbc-news-elser
    2. {
    3. "mappings": {
    4. "_source": {
    5. "excludes": [
    6. "ml-elser-title.tokens",
    7. "ml-elser-description.tokens"
    8. ]
    9. },
    10. "properties": {
    11. "@timestamp": {
    12. "type": "date"
    13. },
    14. "@version": {
    15. "type": "text",
    16. "fields": {
    17. "keyword": {
    18. "type": "keyword",
    19. "ignore_above": 256
    20. }
    21. }
    22. },
    23. "description": {
    24. "type": "text",
    25. "fields": {
    26. "keyword": {
    27. "type": "keyword",
    28. "ignore_above": 256
    29. }
    30. }
    31. },
    32. "event": {
    33. "properties": {
    34. "original": {
    35. "type": "text",
    36. "fields": {
    37. "keyword": {
    38. "type": "keyword",
    39. "ignore_above": 256
    40. }
    41. }
    42. }
    43. }
    44. },
    45. "ml": {
    46. "properties": {
    47. "ner": {
    48. "properties": {
    49. "entities": {
    50. "properties": {
    51. "class_name": {
    52. "type": "text",
    53. "fields": {
    54. "keyword": {
    55. "type": "keyword",
    56. "ignore_above": 256
    57. }
    58. }
    59. },
    60. "class_probability": {
    61. "type": "float"
    62. },
    63. "end_pos": {
    64. "type": "long"
    65. },
    66. "entity": {
    67. "type": "text",
    68. "fields": {
    69. "keyword": {
    70. "type": "keyword",
    71. "ignore_above": 256
    72. }
    73. }
    74. },
    75. "start_pos": {
    76. "type": "long"
    77. }
    78. }
    79. },
    80. "model_id": {
    81. "type": "text",
    82. "fields": {
    83. "keyword": {
    84. "type": "keyword",
    85. "ignore_above": 256
    86. }
    87. }
    88. },
    89. "predicted_value": {
    90. "type": "text",
    91. "fields": {
    92. "keyword": {
    93. "type": "keyword",
    94. "ignore_above": 256
    95. }
    96. }
    97. }
    98. }
    99. }
    100. }
    101. },
    102. "ml-elser-description": {
    103. "properties": {
    104. "model_id": {
    105. "type": "text",
    106. "fields": {
    107. "keyword": {
    108. "type": "keyword",
    109. "ignore_above": 256
    110. }
    111. }
    112. },
    113. "tokens": {
    114. "type": "rank_features"
    115. }
    116. }
    117. },
    118. "ml-elser-title": {
    119. "properties": {
    120. "model_id": {
    121. "type": "text",
    122. "fields": {
    123. "keyword": {
    124. "type": "keyword",
    125. "ignore_above": 256
    126. }
    127. }
    128. },
    129. "tokens": {
    130. "type": "rank_features"
    131. }
    132. }
    133. },
    134. "pubDate": {
    135. "type": "date",
    136. "format": "EEE, dd MMM yyyy HH:mm:ss 'GMT'",
    137. "ignore_malformed": true
    138. },
    139. "tags": {
    140. "properties": {
    141. "LOC": {
    142. "type": "text",
    143. "fields": {
    144. "keyword": {
    145. "type": "keyword",
    146. "ignore_above": 256
    147. }
    148. }
    149. },
    150. "MISC": {
    151. "type": "text",
    152. "fields": {
    153. "keyword": {
    154. "type": "keyword",
    155. "ignore_above": 256
    156. }
    157. }
    158. },
    159. "ORG": {
    160. "type": "text",
    161. "fields": {
    162. "keyword": {
    163. "type": "keyword",
    164. "ignore_above": 256
    165. }
    166. }
    167. },
    168. "PER": {
    169. "type": "text",
    170. "fields": {
    171. "keyword": {
    172. "type": "keyword",
    173. "ignore_above": 256
    174. }
    175. }
    176. }
    177. }
    178. },
    179. "title": {
    180. "type": "text",
    181. "fields": {
    182. "keyword": {
    183. "type": "keyword",
    184. "ignore_above": 256
    185. }
    186. }
    187. },
    188. "url": {
    189. "type": "text",
    190. "fields": {
    191. "keyword": {
    192. "type": "keyword",
    193. "ignore_above": 256
    194. }
    195. }
    196. }
    197. }
    198. }
    199. }

    Pipelines 在索引之前定义了一系列数据处理步骤。 我们的摄取管道包括字段删除、ELSER 和自定义 NER 模型的模型推理,以及将 NER 模型运行的输出值添加到标签字段的脚本。

    1. PUT _ingest/pipeline/news-pipeline
    2. {
    3. "processors": [
    4. {
    5. "remove": {
    6. "field": [
    7. "host",
    8. "message",
    9. "log",
    10. "@version"
    11. ],
    12. "ignore_missing": true
    13. }
    14. },
    15. {
    16. "inference": {
    17. "model_id": "elastic__distilbert-base-uncased-finetuned-conll03-english",
    18. "target_field": "ml.ner",
    19. "field_map": {
    20. "title": "text_field"
    21. }
    22. }
    23. },
    24. {
    25. "script": {
    26. "lang": "painless",
    27. "if": "return ctx['ml']['ner'].containsKey('entities')",
    28. "source": "Map tags = new HashMap(); for (item in ctx['ml']['ner']['entities']) { if (!tags.containsKey(item.class_name)) tags[item.class_name] = new HashSet(); tags[item.class_name].add(item.entity);} ctx['tags'] = tags;"
    29. }
    30. },
    31. {
    32. "inference": {
    33. "model_id": ".elser_model_2",
    34. "target_field": "ml-elser-title",
    35. "field_map": {
    36. "title": "text_field"
    37. },
    38. "inference_config": {
    39. "text_expansion": {
    40. "results_field": "tokens"
    41. }
    42. }
    43. }
    44. },
    45. {
    46. "inference": {
    47. "model_id": ".elser_model_2",
    48. "target_field": "ml-elser-description",
    49. "field_map": {
    50. "description": "text_field"
    51. },
    52. "inference_config": {
    53. "text_expansion": {
    54. "results_field": "tokens"
    55. }
    56. }
    57. }
    58. }
    59. ]
    60. }

    第 5 步:使用 Logstash 提取数据

    我们现在需要配置 Logstash 将数据发送到 Elastic。 下载 Logstash(如果尚未下载),然后按照此处记录的步骤进行安装:Logstash 入门

    1. $ pwd
    2. /Users/liuxg/elastic
    3. $ ls
    4. elasticsearch-8.12.0 kibana-8.12.0
    5. elasticsearch-8.12.0-darwin-aarch64.tar.gz kibana-8.12.0-darwin-aarch64.tar.gz
    6. enterprise-search-8.12.1 logstash-8.12.0-darwin-aarch64.tar.gz
    7. enterprise-search-8.12.1.tar.gz metricbeat-8.12.0-darwin-aarch64.tar.gz
    8. filebeat-8.12.0-darwin-aarch64.tar.gz
    9. $ tar xzf logstash-8.12.0-darwin-aarch64.tar.gz
    10. $ cd logstash-8.12.0
    11. $ touch logstash.conf
    12. $ ls logstash.conf
    13. logstash.conf

    我们将编辑文件 logstash.conf 作为 Logstash 的配置文件。

    我们的配置文件包含三个元素:输入块、过滤器块和输出块。 让我们花一点时间来浏览一下每个内容。

    Input:我们的输入将 Logstash 配置为从位于指定路径的 CSV 文件中读取数据。 它从文件的开头开始读取,禁用sincedb 功能,并假设文件是纯文本形式。

    1. input {
    2. file {
    3. path => "/path_to_file/new-bbc_news.csv"
    4. start_position => "beginning"
    5. sincedb_path => "/dev/null"
    6. codec => "plain"
    7. }
    8. }

    filter:此部分对传入数据应用过滤器。 它使用 CSV 过滤器来解析 CSV 数据,指定逗号作为分隔符并定义列名称。 为了解决 BBC 新闻数据集中存在重复条目的问题,我们应用指纹过滤器根据 “title” 和 “link” 字段的串联来计算唯一指纹,并将其存储在 [@metadata][fingerprint] 中。 mutate 过滤器将 “link” 字段重命名为 “url” 并删除 “guid” 字段。

    1. filter {
    2. csv {
    3. separator => ","
    4. columns => ["pubDate", "title", "guid", "link", "description"]
    5. skip_header => true
    6. quote_char => '"'
    7. }
    8. fingerprint {
    9. source => ["title", "link"]
    10. target => "[@metadata][fingerprint]"
    11. }
    12. mutate { rename => { "link" => "url" } }
    13. }

    ouput:最后一部分配置处理后数据的输出目的地。 它将数据发送到由 Cloud ID 和凭证指定的 Elasticsearch Cloud 实例。 数据存储在 “bbc-news-elser” 索引中(在第 2 节中映射),并且应用了名为 “news-pipeline” 的摄取管道。 document_id 设置为我们的指纹过滤器生成的唯一指纹。 此外,使用 rubydebug 编解码器将数据的副本打印到控制台以进行调试。

    1. output {
    2. elasticsearch {
    3. cloud_id => "${CLOUD_ID}"
    4. api_key => ${API_KEY}"
    5. index => "bbc-news-elser"
    6. pipeline => "news-pipeline"
    7. document_id => "%{[@metadata][fingerprint]}"
    8. }
    9. stdout { codec => rubydebug }
    10. }

    请记住将 CLOUD_ID 和 API_KEY 设置为环境变量 - 或存储在密钥存储中,Logstash Keystore Guide - 并确保 CSV 文件的路径准确。 注意 - 你需要为 Logstash 创建一个具有相关权限的新 API 密钥。 你可以使用 “-f” 标志直接从命令行运行 Logstash 来指定配置位置,也可以使用管道文件指向配置。 如果选择管道方法,请将以下行添加到 pipelines.yml 文件中:

    1. - pipeline.id: bbc-news
    2. path.config: "path-to-config"

    针对我们的情况,我们使用本地部署的 Elasticsearch。我们可以详细参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。

    1. $ ./bin/elasticsearch-keystore list
    2. keystore.seed
    3. xpack.security.http.ssl.keystore.secure_password
    4. xpack.security.transport.ssl.keystore.secure_password
    5. xpack.security.transport.ssl.truststore.secure_password
    6. $ ./bin/elasticsearch-keystore show xpack.security.http.ssl.keystore.secure_password
    7. Yx33RxJsQmakbbZR4bjlew
    8. $ cd config/certs/
    9. $ ls
    10. http.p12 http_ca.crt transport.p12
    11. $ keytool -import -file http_ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
    12. Certificate was added to keystore
    13. $ ls
    14. http.p12 http_ca.crt transport.p12 truststore.p12
    15. $ keytool -keystore truststore.p12 -list
    16. Enter keystore password:
    17. Keystore type: PKCS12
    18. Keystore provider: SUN
    19. Your keystore contains 1 entry
    20. mykey, Mar 4, 2024, trustedCertEntry,
    21. Certificate fingerprint (SHA-256): BC:E6:6E:D5:50:97:F2:55:FC:8E:44:20:BD:AD:AF:C8:D6:09:CC:80:27:03:8C:2D:D0:9D:80:56:68:F3:45:9E

    我们为 logstash 的摄取获得一个 api-key:

    我们完整的 logstash.conf 文件为:

    logstash.conf

    1. input {
    2. file {
    3. path => "/Users/liuxg/data/bbs/new-bbc_news.csv"
    4. start_position => "beginning"
    5. sincedb_path => "/dev/null"
    6. codec => "plain"
    7. }
    8. }
    9. filter {
    10. csv {
    11. separator => ","
    12. columns => ["pubDate", "title", "guid", "link", "description"]
    13. skip_header => true
    14. quote_char => '"'
    15. }
    16. fingerprint {
    17. source => ["title", "link"]
    18. target => "[@metadata][fingerprint]"
    19. }
    20. mutate { rename => { "link" => "url" } }
    21. }
    22. output {
    23. elasticsearch {
    24. hosts => ["https://192.168.0.3:9200"]
    25. index => "bbc-news-elser"
    26. api_key => "G4WdCY4Bu85zg5-6pKne:RIj_XbEbREuDySzRxYbkQA"
    27. ssl_verification_mode => "full"
    28. ssl_truststore_path => "/Users/liuxg/elastic/elasticsearch-8.12.0/config/certs/truststore.p12"
    29. ssl_truststore_password => "password"
    30. pipeline => "news-pipeline"
    31. document_id => "%{[@metadata][fingerprint]}"
    32. }
    33. stdout { codec => rubydebug }
    34. }

    我在 Logstash 的安装目录中,使用如下的命令:

    ./bin/logstash -f logstash.conf

    第 6 步:验证数据摄取

    如果一切顺利,我们现在应该能够在集群中探索 BBC 新闻数据。

    使用 pubDate 字段作为 “Timestamp field” 在 Discover 或 Stack Management 中创建数据视图。

    我们可以通过如下的命令来查看是否已经完全写入:

    如果完全写入,Logstash 的 termninal 将不再滚动。我们可以在 Kibana 中进行查看:

    为了更好地了解 NER 模型的内部情况,我们可以在开发工具中查询数据,定制响应以返回感兴趣的字段:

    1. GET bbc-news-elser/_search?size=1
    2. {
    3. "_source": ["ml.ner", "title"],
    4. "fields": [
    5. "ml.ner", "title"
    6. ]
    7. }

    分解这个片段,我们可以看到原始的 “title” 值,以及 NER 模型产生的结果。 “predicted_value:” 字段显示带有注释的识别实体的文本。 在本案中,“putin” 和 “tucker carlson” 已被识别为人员 (PER),而 “fox” 被识别为一个组织。 “entities” 对象包含一个对象,每个对象代表在原始 “title” 字段中识别的命名实体,并包括:

    • “entity” - 文本中识别的命名实体的字符串。
    • “class_naAme” - 分配给实体的分类,即 PER、LOC、ORG。
    • “class_probability” - 表示模型对实体分类的置信度的十进制值。 上述响应中两个实体的值都接近 1,表明置信度较高。
    • “start_pos” 和 “end_pos” - 预测值文本中实体的开始和结束位置(零索引),这对于需要突出显示或进一步处理文本中特定实体的应用程序非常有用。

    第 7 步:部署搜索 UI

    在最后一步中,我们引入了 Streamlit 应用程序,该应用程序利用 BBC 新闻数据集进行语义和标准文本搜索。

    首先,按照此处所述的步骤安装 Streamlit:Streamlit Git Repo,或使用位于 git 存储库中的 requirements.text 文件。 安装后,创建一个名为 elasticapp.py 的文件并添加 Python 代码块。 如上,当我们需要对云集群进行身份验证时,需要在运行之前设置 “CLOUD_ID”、“API_KEY” 变量(或者,可以使用用户和密码来验证对集群的访问)。这可以通过以下方式实现 创建 dotenv 文件,或者通过导出变量。对于后一种方法,请运行以下命令:

    1. export CLOUD_ID={{cloud_id}}
    2. export API_KEY={{api_key}}

    我们正在实现的用户界面有助于输入语义和标准查询、选择 Elasticsearch 索引以及随后启动对我们的文章数据集的搜索。 Elasticsearch 连接是使用从环境变量加载的云凭据建立的。 后端逻辑包括根据用户查询获取数据以及使用搜索结果更新 Streamlit 应用程序显示的功能。

    elasticapp.py

    1. import streamlit as st
    2. from elasticsearch import Elasticsearch
    3. import os
    4. from datetime import datetime
    5. cloud_id = os.getenv("CLOUD_ID")
    6. api_key = os.getenv("API_KEY")
    7. es = Elasticsearch(
    8. cloud_id=cloud_id,
    9. api_key=api_key
    10. )
    11. def main():
    12. st.title("Elasticsearch News App")
    13. selected_index = st.sidebar.selectbox("Elasticsearch Index", ["bbc-news-elser"], key="selected_index")
    14. if 'selected_tags' not in st.session_state:
    15. st.session_state['selected_tags'] = {"LOC": set(), "PER": set(), "MISC": set()}
    16. if 'search_results' not in st.session_state:
    17. st.session_state['search_results'] = fetch_recent_data(selected_index, size=20)
    18. semantic_query = st.text_input("Semantic Query:", key="semantic_query")
    19. regular_query = st.text_input("Standard Query:", key="regular_query")
    20. min_date, max_date = get_date_range(selected_index)
    21. start_date = st.date_input("Start Date", min_date, key="start_date")
    22. end_date = st.date_input("End Date", max_date, key="end_date")
    23. if st.button("Search"):
    24. st.session_state['search_results'] = fetch_data(selected_index, semantic_query, regular_query, start_date, end_date)
    25. st.session_state['selected_tags'] = {tag_type: set() for tag_type in ["LOC", "PER", "MISC"]} # Reset filters on new search
    26. for tag_type in ["LOC", "PER", "MISC"]:
    27. current_tags = get_unique_tags(tag_type, st.session_state['search_results'])
    28. st.session_state['selected_tags'][tag_type] = st.sidebar.multiselect(f"Filter by {tag_type}", current_tags, key=f"filter_{tag_type}")
    29. filtered_results = filter_results_by_tags(st.session_state['search_results'], st.session_state['selected_tags'])
    30. update_results(filtered_results)
    31. def fetch_recent_data(index_name, size=100):
    32. try:
    33. query_body = {
    34. "size": size,
    35. "sort": [
    36. {"pubDate": {"order": "desc"}}, # Primary sort by date
    37. ]
    38. }
    39. response = es.search(index=index_name, body=query_body)
    40. return [hit['_source'] for hit in response['hits']['hits']]
    41. except Exception as e:
    42. st.error(f"Error fetching recent data from Elasticsearch: {e}")
    43. return []
    44. # Helper function to calculate the earliest and latest dates in the index
    45. def get_date_range(index_name):
    46. max_date_aggregation = {
    47. "max_date": {
    48. "max": {
    49. "field": "pubDate"
    50. }
    51. }
    52. }
    53. min_date_aggregation = {
    54. "min_date": {
    55. "min": {
    56. "field": "pubDate"
    57. }
    58. }
    59. }
    60. max_date_result = es.search(index=index_name, body={"aggs": max_date_aggregation})
    61. min_date_result = es.search(index=index_name, body={"aggs": min_date_aggregation})
    62. max_date_bucket = max_date_result['aggregations']['max_date']
    63. min_date_bucket = min_date_result['aggregations']['min_date']
    64. max_date = max_date_bucket['value_as_string']
    65. min_date = min_date_bucket['value_as_string']
    66. if max_date:
    67. max_date = datetime.strptime(max_date, "%a, %d %b %Y %H:%M:%S GMT")
    68. else:
    69. max_date = datetime.today().date()
    70. if min_date:
    71. min_date = datetime.strptime(min_date, "%a, %d %b %Y %H:%M:%S GMT")
    72. else:
    73. min_date = datetime.today().date()
    74. return min_date, max_date
    75. # Updates results based on search
    76. def update_results(results):
    77. try:
    78. for result_item in results:
    79. # Display document titles as links
    80. title_with_link = f"[{result_item['title']}]({result_item['url']})"
    81. st.markdown(f"### {title_with_link}")
    82. st.write(result_item['description'])
    83. # Display timestamp with results
    84. timestamp = result_item.get('pubDate', '')
    85. if timestamp:
    86. st.write(f"Published: {timestamp}")
    87. # Adds tags for entities
    88. tags = result_item.get('tags', {})
    89. if tags:
    90. for tag_type, tag_values in tags.items():
    91. for tag_value in tag_values:
    92. # Define colors for extracted entity tags
    93. tag_color = {
    94. "LOC": "#3498db",
    95. "PER": "#2ecc71",
    96. "MISC": "#e74c3c"
    97. }.get(tag_type, "#555555")
    98. st.markdown(
    99. f"{tag_type}: {tag_value}",
    100. unsafe_allow_html=True)
    101. st.write("---")
    102. except Exception as e:
    103. st.error(f"Error performing search in Elasticsearch: {e}")
    104. # Fetch data from ES based on index + queries. Specify size - can be modified.
    105. def fetch_data(index_name, semantic_query, regular_query, start_date=None, end_date=None, size=100):
    106. try:
    107. query_body = {
    108. "size": size,
    109. "query": {
    110. "bool": {
    111. "should": []
    112. }
    113. }
    114. }
    115. # Add semantic query if provided by the user
    116. if semantic_query:
    117. query_body["query"]["bool"]["should"].append(
    118. {"bool": {
    119. "should": {
    120. "text_expansion": {
    121. "ml-elser-title.tokens": {
    122. "model_text": semantic_query,
    123. "model_id": ".elser_model_2",
    124. "boost": 9
    125. }
    126. },
    127. "text_expansion": {
    128. "ml-elser-description.tokens": {
    129. "model_text": semantic_query,
    130. "model_id": ".elser_model_2",
    131. "boost": 9
    132. }
    133. }
    134. }
    135. }}
    136. )
    137. # Add regular query if provided by the user
    138. if regular_query:
    139. query_body["query"]["bool"]["should"].append({
    140. "query_string": {
    141. "query": regular_query,
    142. "boost": 8
    143. }
    144. })
    145. # Add date range if provided
    146. if start_date or end_date:
    147. date_range_query = {
    148. "range": {
    149. "pubDate": {}
    150. }
    151. }
    152. if start_date:
    153. date_range_query["range"]["pubDate"]["gte"] = start_date.strftime("%a, %d %b %Y %H:%M:%S GMT")
    154. if end_date:
    155. date_range_query["range"]["pubDate"]["lte"] = end_date.strftime("%a, %d %b %Y %H:%M:%S GMT")
    156. query_body["query"]["bool"]["must"] = date_range_query
    157. result = es.search(
    158. index=index_name,
    159. body=query_body
    160. )
    161. hits = result['hits']['hits']
    162. data = [{'_id': hit['_id'], 'title': hit['_source'].get('title', ''),
    163. 'description': hit['_source'].get('description', ''),
    164. 'tags': hit['_source'].get('tags', {}), 'pubDate': hit['_source'].get('pubDate', ''),
    165. 'url': hit['_source'].get('url', '')} for hit in hits]
    166. return data
    167. except Exception as e:
    168. st.error(f"Error fetching data from Elasticsearch: {e}")
    169. return []
    170. # Function to get unique tags of a specific type
    171. def get_unique_tags(tag_type, results):
    172. unique_tags = set()
    173. for result_item in results:
    174. tags = result_item.get('tags', {}).get(tag_type, [])
    175. unique_tags.update(tags)
    176. return sorted(unique_tags)
    177. # Function to filter results based on selected tags
    178. def filter_results_by_tags(results, selected_tags):
    179. filtered_results = []
    180. for result_item in results:
    181. tags = result_item.get('tags', {})
    182. add_result = True
    183. for tag_type, selected_values in selected_tags.items():
    184. if selected_values:
    185. result_values = tags.get(tag_type, [])
    186. if not any(value in selected_values for value in result_values):
    187. add_result = False
    188. break
    189. if add_result:
    190. filtered_results.append(result_item)
    191. return filtered_results
    192. if __name__ == "__main__":
    193. main()

    针对我们的本地部署来说,我们需要做如下的修改。我们可以参照之前的文章 “Elasticsearch:与多个 PDF 聊天 | LangChain Python 应用教程(免费 LLMs 和嵌入)”。在运行之前,我们先配置如下的环境变量:

    1. export ES_SERVER="localhost"
    2. export ES_USER="elastic"
    3. export ES_PASSWORD="q2rqAIphl-fx9ndQ36CO"
    4. export ES_FINGERPRINT="bce66ed55097f255fc8e4420bdadafc8d609cc8027038c2dd09d805668f3459e"
    1. $ export ES_SERVER="localhost"
    2. $ export ES_USER="elastic"
    3. $ export ES_PASSWORD="q2rqAIphl-fx9ndQ36CO"
    4. $ export ES_FINGERPRINT="bce66ed55097f255fc8e4420bdadafc8d609cc8027038c2dd09d805668f3459e"

    elasticapp.py

    1. import streamlit as st
    2. from elasticsearch import Elasticsearch
    3. import os
    4. from datetime import datetime
    5. endpoint = os.getenv("ES_SERVER")
    6. username = os.getenv("ES_USER")
    7. password = os.getenv("ES_PASSWORD")
    8. fingerprint = os.getenv("ES_FINGERPRINT")
    9. url = f"https://{endpoint}:9200"
    10. es = Elasticsearch( url ,
    11. basic_auth = (username, password),
    12. ssl_assert_fingerprint = fingerprint,
    13. http_compress = True )
    14. # print(es.info())
    15. def main():
    16. st.title("Elasticsearch News App")
    17. selected_index = st.sidebar.selectbox("Elasticsearch Index", ["bbc-news-elser"], key="selected_index")
    18. if 'selected_tags' not in st.session_state:
    19. st.session_state['selected_tags'] = {"LOC": set(), "PER": set(), "MISC": set()}
    20. if 'search_results' not in st.session_state:
    21. st.session_state['search_results'] = fetch_recent_data(selected_index, size=20)
    22. semantic_query = st.text_input("Semantic Query:", key="semantic_query")
    23. regular_query = st.text_input("Standard Query:", key="regular_query")
    24. min_date, max_date = get_date_range(selected_index)
    25. start_date = st.date_input("Start Date", min_date, key="start_date")
    26. end_date = st.date_input("End Date", max_date, key="end_date")
    27. if st.button("Search"):
    28. st.session_state['search_results'] = fetch_data(selected_index, semantic_query, regular_query, start_date, end_date)
    29. st.session_state['selected_tags'] = {tag_type: set() for tag_type in ["LOC", "PER", "MISC"]} # Reset filters on new search
    30. for tag_type in ["LOC", "PER", "MISC"]:
    31. current_tags = get_unique_tags(tag_type, st.session_state['search_results'])
    32. st.session_state['selected_tags'][tag_type] = st.sidebar.multiselect(f"Filter by {tag_type}", current_tags, key=f"filter_{tag_type}")
    33. filtered_results = filter_results_by_tags(st.session_state['search_results'], st.session_state['selected_tags'])
    34. update_results(filtered_results)
    35. def fetch_recent_data(index_name, size=100):
    36. try:
    37. query_body = {
    38. "size": size,
    39. "sort": [
    40. {"pubDate": {"order": "desc"}}, # Primary sort by date
    41. ]
    42. }
    43. response = es.search(index=index_name, body=query_body)
    44. return [hit['_source'] for hit in response['hits']['hits']]
    45. except Exception as e:
    46. st.error(f"Error fetching recent data from Elasticsearch: {e}")
    47. return []
    48. # Helper function to calculate the earliest and latest dates in the index
    49. def get_date_range(index_name):
    50. max_date_aggregation = {
    51. "max_date": {
    52. "max": {
    53. "field": "pubDate"
    54. }
    55. }
    56. }
    57. min_date_aggregation = {
    58. "min_date": {
    59. "min": {
    60. "field": "pubDate"
    61. }
    62. }
    63. }
    64. max_date_result = es.search(index=index_name, body={"aggs": max_date_aggregation})
    65. min_date_result = es.search(index=index_name, body={"aggs": min_date_aggregation})
    66. max_date_bucket = max_date_result['aggregations']['max_date']
    67. min_date_bucket = min_date_result['aggregations']['min_date']
    68. max_date = max_date_bucket['value_as_string']
    69. min_date = min_date_bucket['value_as_string']
    70. if max_date:
    71. max_date = datetime.strptime(max_date, "%a, %d %b %Y %H:%M:%S GMT")
    72. else:
    73. max_date = datetime.today().date()
    74. if min_date:
    75. min_date = datetime.strptime(min_date, "%a, %d %b %Y %H:%M:%S GMT")
    76. else:
    77. min_date = datetime.today().date()
    78. return min_date, max_date
    79. # Updates results based on search
    80. def update_results(results):
    81. try:
    82. for result_item in results:
    83. # Display document titles as links
    84. title_with_link = f"[{result_item['title']}]({result_item['url']})"
    85. st.markdown(f"### {title_with_link}")
    86. st.write(result_item['description'])
    87. # Display timestamp with results
    88. timestamp = result_item.get('pubDate', '')
    89. if timestamp:
    90. st.write(f"Published: {timestamp}")
    91. # Adds tags for entities
    92. tags = result_item.get('tags', {})
    93. if tags:
    94. for tag_type, tag_values in tags.items():
    95. for tag_value in tag_values:
    96. # Define colors for extracted entity tags
    97. tag_color = {
    98. "LOC": "#3498db",
    99. "PER": "#2ecc71",
    100. "MISC": "#e74c3c"
    101. }.get(tag_type, "#555555")
    102. st.markdown(
    103. f"{tag_type}: {tag_value}",
    104. unsafe_allow_html=True)
    105. st.write("---")
    106. except Exception as e:
    107. st.error(f"Error performing search in Elasticsearch: {e}")
    108. # Fetch data from ES based on index + queries. Specify size - can be modified.
    109. def fetch_data(index_name, semantic_query, regular_query, start_date=None, end_date=None, size=100):
    110. try:
    111. query_body = {
    112. "size": size,
    113. "query": {
    114. "bool": {
    115. "should": []
    116. }
    117. }
    118. }
    119. # Add semantic query if provided by the user
    120. if semantic_query:
    121. query_body["query"]["bool"]["should"].append(
    122. {"bool": {
    123. "should": {
    124. "text_expansion": {
    125. "ml-elser-title.tokens": {
    126. "model_text": semantic_query,
    127. "model_id": ".elser_model_2",
    128. "boost": 9
    129. }
    130. },
    131. "text_expansion": {
    132. "ml-elser-description.tokens": {
    133. "model_text": semantic_query,
    134. "model_id": ".elser_model_2",
    135. "boost": 9
    136. }
    137. }
    138. }
    139. }}
    140. )
    141. # Add regular query if provided by the user
    142. if regular_query:
    143. query_body["query"]["bool"]["should"].append({
    144. "query_string": {
    145. "query": regular_query,
    146. "boost": 8
    147. }
    148. })
    149. # Add date range if provided
    150. if start_date or end_date:
    151. date_range_query = {
    152. "range": {
    153. "pubDate": {}
    154. }
    155. }
    156. if start_date:
    157. date_range_query["range"]["pubDate"]["gte"] = start_date.strftime("%a, %d %b %Y %H:%M:%S GMT")
    158. if end_date:
    159. date_range_query["range"]["pubDate"]["lte"] = end_date.strftime("%a, %d %b %Y %H:%M:%S GMT")
    160. query_body["query"]["bool"]["must"] = date_range_query
    161. result = es.search(
    162. index=index_name,
    163. body=query_body
    164. )
    165. hits = result['hits']['hits']
    166. data = [{'_id': hit['_id'], 'title': hit['_source'].get('title', ''),
    167. 'description': hit['_source'].get('description', ''),
    168. 'tags': hit['_source'].get('tags', {}), 'pubDate': hit['_source'].get('pubDate', ''),
    169. 'url': hit['_source'].get('url', '')} for hit in hits]
    170. return data
    171. except Exception as e:
    172. st.error(f"Error fetching data from Elasticsearch: {e}")
    173. return []
    174. # Function to get unique tags of a specific type
    175. def get_unique_tags(tag_type, results):
    176. unique_tags = set()
    177. for result_item in results:
    178. tags = result_item.get('tags', {}).get(tag_type, [])
    179. unique_tags.update(tags)
    180. return sorted(unique_tags)
    181. # Function to filter results based on selected tags
    182. def filter_results_by_tags(results, selected_tags):
    183. filtered_results = []
    184. for result_item in results:
    185. tags = result_item.get('tags', {})
    186. add_result = True
    187. for tag_type, selected_values in selected_tags.items():
    188. if selected_values:
    189. result_values = tags.get(tag_type, [])
    190. if not any(value in selected_values for value in result_values):
    191. add_result = False
    192. break
    193. if add_result:
    194. filtered_results.append(result_item)
    195. return filtered_results
    196. if __name__ == "__main__":
    197. main()

    我们使用如下的命令来进行运行:

    streamlit run elasticapp.py

    在上面,我们可以通过地名,人名或 MISC 来进行筛选。

    我们可以针对一些查询来进行语义搜索,比如如:

    原文:Developing an Elastic Search App with Streamlit, Semantic Search, and Named Entity Extraction — Elastic Search Labs

  • 相关阅读:
    【PID优化】基于人工蜂群算法PID控制器优化设计含Matlab源码
    【操作系统】进程:生成者-消费者问题
    基于springboot+vue的房屋出租租赁系统 elementui
    swin Transformer
    SpringCloud进阶-消费者模块
    前端培训丁鹿学堂:js异步及解决方案简析
    JavaSE笔记——抽象类和接口
    给你 2 万条数据,怎么快速导入到 MySQL?写得太好了...
    ChatGPT AIGC 快速合并Excel工作薄 Vlookup+INDIRECT
    云呐-人工智能设备运维是干什么的?自动化设备运维
  • 原文地址:https://blog.csdn.net/UbuntuTouch/article/details/136459971