2021年7月8日星期四

python elasticsearch_dsl模块

python elasticsearch_dsl模块

在整理elasticsearch_dsl模块过程中,着实让我头大。
个人感觉就是资料太少而且很乱,不成体系,接口很多,没有规范。
此文凑合着看,以后有时间后加以修改。
还有一些部分官方文档都没有说清楚,以后弄懂了再补充。


简介

elasticsearch-dsl是基于elasticsearch-py封装实现的,提供了更简便的操作elasticsearch的方法。


安装

pip install elasticsearch_dsl


连接elasticsearch

使用connections.create_connection()进行连接,根据源码:

# alias是设置的别名conn = self._conns[alias] = Elasticsearch(**kwargs)return conn

我们知道,create_connection的参数应该与elasticsearch.Elasticsearch是相一致的。详见此文档:Python Elasticsearch Client。

下面做一下简单的连接工作:

from elasticsearch_dsl import connections, Searches = connections.create_connection(hosts=["localhost:9200"], timeout=20)

除此之外,还可以通过alias给连接设置别名,后续可以通过别名来引用该连接,默认别名为default

from elasticsearch_dsl import connectionsconnections.create_connection(alias='my_new_connection', hosts=["localhost:9200"], timeout=60)

使用别名后这样用:

from elasticsearch_dsl import connections, Search, Qes = connections.create_connection(alias="eaaa", hosts=["localhost:9200"])e = Search(using="eaaa").index("account").query()for i in e: print(i.to_dict())


elasticsearch_dsl.Search

search对象代表整个搜索请求,包括:queries、filters、aggregations、sort、pagination、additional parameters、associated client。
API被设置为可链接的即和用.连续操作。search对象是不可变的,除了聚合,对对象的所有更改都将导致创建包含该更改的浅表副本。

当初始化Search对象时,你可以传递elasticsearch客户端作为using的参数。比如:

from elasticsearch_dsl import connections, Searches = connections.create_connection(hosts=["localhost:9200"], timeout=20)s = Search(using=es)

当然,不传也没关系,后面可以在指定。
一般使用:

from elasticsearch_dsl import connections, Searches = connections.create_connection(hosts=["localhost:9200"], timeout=20)s = Search(using=es).query("match", firstname="Hattie")# 这样写也行# s = Search(using=es).query({"match": {"firstname": "Hattie"}})for i in s: print(i.to_dict())	"""结果:{'account_number': 6, 'balance': 5686, 'firstname': 'Hattie', 'lastname': 'Bond', 'age': 36, 'gender': 'M', 'address': '671 Bristol Street', 'employer': 'Netagy', 'email': 'hattiebond@netagy.com', 'city': 'Dante', 'state': 'TN'}"""

执行execute方法将请求发送给elasticsearch:
response = s.execute()

上面的例子没有调用execute()是因为我们调用了循环,在search对象是一个可迭代对象,它的源码:

def __iter__(self):	"""	Iterate over the hits.	"""	return iter(self.execute())

所以我们不需要执行execute()方法,
迭代后可以通过to_dict()方法将Search对象序列化为一个dict对象,这样可以方便调试。

  • query方法
    查询,参数可以是Q对象,也可以是query模块中的一些类,还可以是自已写上如何查询。
  • filter方法
    如果你想要在过滤上下文中添加查询,可以使用filter()函数来使之变的简单。s = Search()s = s.filter('terms', tags=['search', 'python'])

    在背后,这会产生一个bool查询,并将指定的条件查询放入其filter分支,等价与下面的操作:
    s = Search()s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])

    s = Search()s = s.filter('terms', tags=['search', 'python'])# 过滤,在此为范围过滤,range是方法,timestamp是所要查询的field的名字,gte意为大于等于,lt意为小于,根据需要设定即可(似乎过滤只能接受数字形式的内容,如果是文本就会返回空)# 关于term和match的区别,term是精确匹配,match会模糊化,会进行分词,返回匹配度分数,(term查询字符串之接受小写字母,如果有大写会返回空即没有命中,match则是不区分大小写都可以进行查询,返回结果也一样)# 例1:范围查询s = s.filter("range", timestamp={"gte": 0, "lt": time.time()}).query("match", country="in")# 例2:普通过滤res_3 = s.filter("terms", balance_num=["39225", "5686"]).execute()

  • index方法
    指定索引
  • usring方法
    指定哪个elasticsearch

注意:
当调用.query()方法多次时,内部会使用&操作符:

s = s.query().query()print(s.to_dict())# {"query": {"bool": {...}}}


elasticsearch_dsl.query

该库为所有的Elasticsearch查询类型都提供了类。以关键字参数传递所有的参数,最终会把参数序列化后传递给Elasticsearch,这意味着在原始查询和它对应的dsl之间有这一个清理的一对一的映射。

from elasticsearch_dsl import connections, Searchfrom elasticsearch_dsl.query import MultiMatch, Match, MatchAll, Filtered, Term, Terms, Prefixes = connections.create_connection(hosts=["localhost:9200"], timeout=20)# 相对与{"multi_match": {"query": "ha", "fields": ["firstname", "lastname"]}}m1 = MultiMatch(query='Ha', fields=['firstname', 'lastname'])# 相当于{"match": {"firstname": {"query": "Hughes"}}}m2 = Match(firstname={"query": "Hughes"})s = s.query(m2)for i in s: print(i.to_dict())

这个库里面还有很多类,对应着dsl的查询方式,可以自己点击去看看。


elasticsearch_dsl.Q

使用快捷方式Q通过命名参数或者原始dict类型数据来构建一个查询实例。

from elasticsearch_dsl import connections, Search, Qes = connections.create_connection(hosts=["localhost:9200"], timeout=20)# q = Q("match", city="Summerfield")q = Q("multi_match", query='Summerfield', fields=['city', 'firstname'])s = Search(using=es).index("account")s = s.query(q)for i in s: print(i.to_dict())

可以看到,Q的格式一般是Q("查询类型", 字段="xxx")Q("查询类型", query="xxx", fields=['字段1', '字段2'])

查询对象可以通过逻辑运算符组合起来:

Q("match", title='python') | Q("match", title='django')# {"bool": {"should": [...]}}Q("match", title='python') & Q("match", title='django')# {"bool": {"must": [...]}}~Q("match", title="python")# {"bool": {"must_not": [...]}}

比如:

from elasticsearch_dsl import Q, Search# q = Q("multi_match", query="123.244.101.255", fields=["clientip", 'timestamp'])q = Q("match", clientip="123.244.101.255") | Q("match", clientip="42.248.168.61")s = Search().query(q)for item in s: print(item.clientip)


嵌套类型

有时候你想要引用一个在其他字段中的字段,例如多字段(title.keyword)或者在一个json文档中的address.city。为了方便,Q允许你使用双下划线'__'代替关键词参数中的'.'

s = Search()s = s.filter('term', category__keyword='Python')s = s.query('match', address__city='prague')


查询

简单的例子:

from elasticsearch_dsl import Searchfrom elasticsearch import Elasticsearches_object = Elasticsearch(['url:9200'], sniffer_timeout=60, timeout=30)# 获取es中所有的索引# 返回类型为字典,只返回索引名es_object.cat.indices(format='json', h='index')# 查询多个索引es_s_object = Search(using=es_object, index=['log-2018-07-31', 'log-2018-07-30', 'log-2018-07-29'])# 查询一个索引es_s_object = Search(using=es_object, index='log-2018-07-31')# 条件查询1es_r_object = es_s_object.filter("range", timestamp={"gte": 1532879975, "lt": 1532880095})# 条件查询2es_r_object = es_r_object.filter("term", title='12345')# 结果转换为字典es_r_dict = es_r_object.execute().to_dict()


数据准备

  1. 下载示例数据
    点击这里查看官方的示例数据,下载" accounts.zip"。

  2. 创建索引

    from elasticsearch import Elasticsearches = Elasticsearch(host="localhost", port=9200)body = {	"mappings": {		"properties": {			"account_number": {				"type": "integer"			},			"balance": {				"type": "integer"			},			"firstname": {				"type": "text"			},			"lastname": {				"type": "text"			},			"age": {				"type": "integer"			},			"gender": {				"type": "keyword"			},			"address": {				"type": "text"			},			"employer": {				"type": "text"			},			"email": {				"type": "text"			},			"city": {				"type": "text"			},			"state": {"type": "text"}		}	}}# 创建 indexes.indices.create(index="account", body=body)

  3. 把数据写入文档中

accounts.zip解压目录下,执行命令:

curl -H 'Content-Type: application/json' -XPOST 'localhost:9200/account/_doc/_bulk?pretty' --data-binary @accounts.json

Windows平台:注意是否有curl命令,git bash自带,PowerShell好像也有


开始查询

我随便出的题,尽量让一些常用的查询方式用到,主要是用于加深印象的。

  1. 余额在15000到20000的顾客
  2. 余额在20000到30000的男性顾客
  3. state为WY,28岁或38岁的女性顾客
  4. 地址中有Street,年龄不在20-30的女顾客
  5. 按照年龄聚合,并且计算每个年龄的平均余额
  6. 按照年龄聚合,求20到30岁不同性别的平均余额

如何使用聚合,可以在文章的"其它方法"中的"聚合"中查看

# 1. 余额在15000到2000的顾客from elasticsearch_dsl import connections, Search, Qes = connections.create_connection(hosts=["localhost:9200"])s = Search(using=es, index="account")q = Q("range", balance={"gte": 15000, "lte": 20000})s1 = s.query(q)print("共查到%d条数据" % s1.count())for i in s1[90:]: print(i.to_dict())

# 2. 余额在20000到30000的男性顾客from elasticsearch_dsl import connections, Search, Qes = connections.create_connection(hosts=["localhost:9200"])s = Search(using=es, index="account")# 确定多少钱q1 = Q("range", balance={"gte": 20000, "lte": 30000})# 确定为男性q2 = Q("term", gender="M")# andq = q1 & q2s1 = s.query(q)print("共查到%d条数据" % s1.count())for i in s1: print(i.to_dict())

# 3. state为WY,28岁或38岁的女性顾客from elasticsearch_dsl import connections, Search, Qes = connections.create_connection(hosts=["localhost:9200"])s = Search(using=es, index="account")# state为WYq1 = Q("match", state="WY")# 28或38的女性q2 = Q("bool", must=[Q("terms", age=[28, 38]), Q("term", gender="F")])# andq = q1 & q2s1 = s.query(q)print("共查到%d条数据" % s1.count())for i in s1: print(i.to_dict())# ############ 第二种写法 #############################from elasticsearch_dsl import connections, Search, Qes = connections.create_connection(hosts=["localhost:9200"])s = Search(using=es, index="account")# state为WYq1 = Q("match", state="WY")q2 = Q("term", age=28) | Q("term", age=38)# 多次query就是& ==> and 操作s1 = s.query(q1).query(q2)print("共查到%d条数据" % s1.count())for i in s1: print(i.to_dict())

# 4. 地址中有Street,年龄不在20-30的女顾客from elasticsearch_dsl import connections, Search, Qes = connections.create_connection(hosts=["localhost:9200"])s = Search(using=es, index="account")# 地址中有Street且为女性q1 = Q("match", address="Street") & Q("match", gender="F")# 年龄在20-30q2 = Q("range", age={"gte": 20, "lte": 30})# 使用filter过滤# query和filter的前后关系都行s1 = s.query(q1).filter(q2)print("共查到%d条数据" % s1.count())# scan()列出全部数据for i in s1.scan(): print(i.to_dict())

# 5. 按照年龄聚合,并且计算每个年龄的平均余额from elasticsearch_dsl import connections, Search, A, Qes = connections.create_connection(hosts=["localhost:9200"])s = Search(using=es, index="account")# 先用年龄聚合,然后拿到返平均数# size指定最大返回多少条数据,默认10条# 实质上account的数据中,age分组没有100个这么多a = A("terms", field="age", size=100).metric("age_per_balance", "avg", field="balance")s.aggs.bucket("res", a)# 执行并拿到返回值response = s.execute()# res是bucket指定的名字# response.aggregations.to_dict是一个# {'doc_count_error_upper_bound': 0, 'sum_other_doc_count': 463, 'buckets': [{'...# 的数据,和用restful查询的一样for i in response.aggregations.res: print(i.to_dict())

# 6. 按照年龄聚合,求20到30岁不同性别的平均余额from elasticsearch_dsl import connections, Search, A, Qes = connections.create_connection(hosts=["localhost:9200"])s = Search(using=es, index="account")# 这次就用这种方法# range 要注意指定ranges参数和from toa1 = A("range", field="age", ranges={"from": 20, "to": 21})a2 = A("terms", field="gender")a3 = A("avg", field="balance")s.aggs.bucket("res", a1).bucket("gender_group", a2).metric("balance_avg", a3)# 执行并拿到返回值response = s.execute()# res是bucket指定的名字for i in response.aggregations.res: print(i.to_dict())s = s.query("match", age=41)"""{'key': '20.0-21.0', 'from': 20.0, 'to': 21.0, 'doc_count': 44, 'gender_group': {'doc_count_error_upper_bound': 0, 'sum_other_doc_count': 0, 'buckets': [{'key': 'M', 'doc_count': 27, 'balance_avg': {'value': 29047.444444444445}}, {'key': 'F', 'doc_count': 17, 'balance_avg': {'value': 25666.647058823528}}]}}"""

写一下总结
由于account的数据是随机生成的,很多都没有共同的数据,本来想用一下multi_match,结果发现没有条件。
列子中,用到了match, term, filter, range, bool,虽然有很多查询方式没用上,但是应该都差不多这样用。
在使用过程中,我发现使用Q这个类构造查询方式实质上就和直接使用elasticsearchd-dsl一样(指的是格式上)。
假如是数组,如:bool的must、terms,那么就要字段=[ ]
假如是字典,如:range,那么就要字段={xxx: yyy, .... }
假如是单值,如:term、match,那么就要字段=值
假如查的是多个字段,如:multi_mathc,那么就要加上query="要查的值", fields=["字段1", "字段2", ...]
然后各个条件的逻辑关系,可以通过多次query和filter或直接用Q("bool", must=[Q...], should=[Q...])再加上& | ~表示

假如忘记了,该查询方式是否有数组、字典或直接就等于一个值的话
可以看一看这篇文章中对应查询方式的记录:DSL高级检索(Query)


其它方法

没有评论:

发表评论