说明:按照指定字段field_name进行去重统计,类似MySQL的:
select count(distinct field_name) from table_name;
GET index_name/_search
{
"aggs": {
"aggs_name": {
"cardinality": {
"field": "field_name"
}
}
},
"size": 0
}
说明:列如查询author_id=null的数据
mysql查询语句
select bvid,mid,author_id from table_name where author_id=null;
GET index_name/_search
{
"_source": [
"bvid",
"mid",
"author_id"
],
"query": {
"script": {
"script": "doc['author_id'].length==0"
}
}
}
说明:原本的字段已经有值了,但是由于没有建立索引,在查询时查询不到,为此增加该字段的索引,并更新数据,
操作步骤如下:
1、建立对应字段的索引
PUT weixin_data/data/_mapping
{
"properties": {
"provinceName": {
"type": "keyword"
}
}
}
2、更新数据
#查看任务进度
GET _tasks/HtfzkCKoQgaBgEFbR9G3nw:16712631800
#wait_for_completion=false&conflicts=proceed->任务后台运行,并会返回一个任务id
POST weixin_data20221016/_update_by_query?wait_for_completion=false&conflicts=proceed
{
"query": {
"match_all": {}
}
}
3、等待数据更新完成即可
具体需求如下:
需要看一下作品条数
作品包含关键词:患者(必含)+ 肥胖,减肥,减脂(这三个词包含任一)
时间范围:2021.12.1--2022.11.30
作者粉丝数 >1000
作品包含关键词:患者(必含)+ 肥胖,减肥,减脂(这三个词包含任一)
查找关键字的字段有:标题,描述,标签
具体代码如下:
import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
class ElasticObject:
def __init__(self, index_name, hosts):
"""
init a elasticsearch object
"""
self.es = Elasticsearch(
hosts=hosts,
http_auth=("user", "password"))
self.index_name = index_name
def get_data_by_search(self, query_condition):
_searched = self.es.search(index=self.index_name, body=query_condition)
return _searched
def get_data_by_scan(self, body):
"""
:param es: es客户端对象
:param index: 索引名,支持通配符匹配
:param body: 查询串
:param scroll: 滚动查询窗口
:param request_timeout: 每次scan查询超时时间
:return: 返回查询到的原始json数据
"""
_searched = scan(
client=self.es,
query=body,
scroll="1m",
index=self.index_name,
request_timeout=60,
)
# for i in _searched:
# print(i)
return _searched
if __name__ == "__main__":
hosts = "url"
index_name = "index_name"
client_bilibili = ElasticObject(index_name, hosts)
dsl = {
"_source": ["mid"],
"query": {
"bool": {
"filter": {
"range": {
"created": {
"gte": "2021-12-01 00:00:00",
"lt": "2022-11-30 00:00:00"
}
}
},
"should": [
{
"bool": {
"must": [
{
"match_phrase": {
"title": "患者"
}
}
],
"should": [
{
"match_phrase": {
"title": "肥胖"
}
},
{
"match_phrase": {
"title": "减肥"
}
},
{
"match_phrase": {
"title": "减脂"
}
}
],
"minimum_should_match": 1
}
},
{
"bool": {
"must": [
{
"match_phrase": {
"description": "患者"
}
}
],
"should": [
{
"match_phrase": {
"description": "肥胖"
}
},
{
"match_phrase": {
"description": "减肥"
}
},
{
"match_phrase": {
"description": "减脂"
}
}
],
"minimum_should_match": 1
}
},
{
"bool": {
"must": [
{
"match_phrase": {
"tags.tag_name": "患者"
}
}
],
"should": [
{
"match_phrase": {
"tags.tag_name": "肥胖"
}
},
{
"match_phrase": {
"tags.tag_name": "减肥"
}
},
{
"match_phrase": {
"tags.tag_name": "减脂"
}
}
],
"minimum_should_match": 1
}
}
],
"minimum_should_match": 1
}
},
"size": 1000
}
result_b = client_b.get_data_by_search(dsl)
mids = []
for v in result_b.get('hits', {}).get('hits', []):
mid_list = v.get('_source', {}).get('mid')
if len(mid_list) > 1:
for mid in mid_list:
if isinstance(mid, int):
mids.append(mid)
else:
mids.append(int(mid_list[0]))
index_name = "index_name "
client_b_user = ElasticObject(index_name, hosts)
aweme_count = 0
follower_list = []
for mid in mids:
dslu = {
"_source": ["mid", "follower"],
"query": {
"bool": {
"filter": {
"term": {
"mid": mid
}
}
}
}
}
follower = client_b_user.get_data_by_search(dslu).get('hits', {}).get('hits', [])[0].get('_source', {}).get(
'follower')
if follower >= 1000:
aweme_count += 1
follower_list.append(follower)
print(aweme_count)
print(follower_list)
PUT xxxx_comment_data_2021_before_20230228
{
"mappings": {
"_doc": {
"dynamic": "false",
"properties": {
"bvid": {
"type": "keyword"
},
"content_length": {
"type": "integer"
},
"from": {
"type": "keyword"
},
"gmt_create": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss ||yyyy-MM-dd||HH:mm:ss"
},
"gmt_modify": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss ||yyyy-MM-dd||HH:mm:ss"
},
"hot_words": {
"properties": {
"keyword": {
"type": "keyword"
},
"weight": {
"type": "float"
}
}
},
"id": {
"type": "integer"
},
"oid": {
"type": "keyword"
},
"page": {
"type": "integer"
},
"snapshot_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss ||yyyy-MM-dd||HH:mm:ss"
}
}
}
},
"settings": {
"refresh_interval": "120s",
"number_of_shards": "10",
"number_of_replicas": "0"
}
}
# wait_for_completion=false
# 该参数是设置任务后台运行
POST _reindex?wait_for_completion=false
{
"source": {
"index": "xxxx_comment_data_20221125",
"query": {
"bool": {
"filter": {
"range": {
"gmt_modify": {
"lt": "2021-03-01 00:00:00"
}
}
}
}
}
},
"dest": {
"index": "xxxx_comment_data_2021_before_20230228",
"op_type": "create"
},
"conflicts": "proceed",
"script": {
"source": "ctx._id=ctx._source.id"
}
}
GET _tasks/QmBdwoQGQ8SqxRbEslt5og:889328722
#返回结果
{
"completed": false,
"task": {
"node": "QmBdwoQGQ8SqxRbEslt5og",
"id": 889328722,
"type": "transport",
"action": "indices:data/write/reindex",
"status": {
"total": 4815784,
"updated": 0,
"created": 147000,
"deleted": 0,
"batches": 148,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1,
"throttled_until_millis": 0
},
"description": "reindex from [xxxx_comment_data_20221125] updated with Script{type=inline, lang='painless', idOrCode='ctx._id=ctx._source.id', options={}, params={}} to [xxxx_comment_data_2021_before_20230228]",
"start_time_in_millis": 1677564982643,
"running_time_in_nanos": 263394717193,
"cancellable": true,
"headers": {}
}
}