ElasticSearch in Python
先安装 elasticsearch_dsl
pip3 install elasticsearch_dsl
1 基本查询
连接ES服务器
from elasticsearch_dsl import connections
# 单连接
connections.configure(
default={"hosts": "http://localhost:9200"},
)
# 多集群连接
connections.configure(
default={"hosts": "http://localhost"},
dev={
"hosts": ["http://example.com:9200"],
"sniff_on_start": True,
}
)
使用 Search 在 Python 中查询 ES 数据,这个查询 API 是链式操作,可以允许你链接多个操作。在 Search() 方法里,我们通过 using 指定连接,但是没有指定 index,那么就默认搜索的是该连接的全部 index,我们可以通过 index() 函数指定 index,也可以向 Search() 中添加 index 参数来指定 index
# 基础链式查询
response = Search(using="default").query("match", address="603 Cooper Street").execute()
print(response)
# 指定 index 为 bank
s = Search(using="default", index="bank").query("match", address="603 Cooper Street")
response = s.execute()
s = Search(using="default").index("bank").query("match", address="603 Cooper Street")
response = s.execute()
print(response)
# 要查看执行的语句转换成的 es 的语句,可以使用 to_dict() 方法
# {'query': {'match': {'address': '603 Cooper Street'}}}
s = Search(using="default", index="bank").query("match", address="603 Cooper Street")
print(s.to_dict())
# 对于 response,可以使用 Python 中的列表的形式来访问它,也可以对这个对象使用迭代方法
# 它迭代的其实是 response.hits 属性,所以 response[0] 和 response.hits[0] 是等效操作
s = Search(using="default").index("bank").query("match", address="603 Cooper Street")
response = s.execute()
print(response[0])
print(response.hits[0])
# 获取符合查询条件的总数
count = s.count()
count = response.hits.total.value
print(count)
# 对于单条数据
hit = response.hits[0]
# 一次性获取返回的单条数据的各个字段及相应的值,可以使用 to_dict() 方法
print(hit.to_dict())
# 获取单个值,比如 name,可以像属性一样直接获取
print(hit.firstname, hit.lastname)
# 还可获取数据的 meta 信息,其中包含了这条数据所在的 index,id,匹配分值 score 等
print(hit.meta.to_dict())
query() 中接受两个参数,第一个是字段查询的方式,比如这里是 match,也可以是 term。第二个则是查询的字段与值,比如这里是查询的 address字段为 "603 Cooper Street" 的数据。
除了最简单的 query() 方法,还有很多其它查询方式
# 这两个 query() 通过链式操作连在一起转换成 es 语句就是使用 must 将多条件连接在一起
s = Search(using="default").index("bank")
s = s.query("match", firstname="Jimenez").query("match", address="603 Cooper Street")
response = s.execute()
# Q() 查询,这里和 Django 里的 Q() 方法查询一样,也是用于条件的联合,与或非条件等
from elasticsearch_dsl import Q as ES_Q
q1 = ES_Q("match", age=25)
q2 = ES_Q("match", city="Moscow")
s = Search(using="default").index("bank")
# 与 &
response = s.query(q1 & q2).execute()
# 或 |
response = s.query(q1 | q2).execute()
# 非 ~ 。取反直接在条件前加一个 ~
response = s.query(~q1).execute()
# 想取反还可以直接使用 exclude() 函数,这个和 Django 里的操作也是一样的
response = s.exclude(q1).execute()
# 搜索多字段 multi_match
q = ES_Q("multi_match", query="Jimenez Moscow", fields=["firstname", "address"])
# text.keyword 是将 text 字段作为一个整体进行查询
q = ES_Q({"term": {"address.keyword": "Moscow"}})
q = ES_Q("term", address__keyword="Moscow")
# filter() 过滤条件查询
s = s.filter(q)
# range 实现大小于的操作
q = ES_Q({"range": {"age": {"gte": 38}}})
s = s.query(q)
# 排序
response = s.query(q).sort('age', '-balance').execute()
# 分页,可以直接使用 Python 里的切片操作
s = s.query(q).sort('age', '-balance')
response = s[:10].execute()
# source() 指定返回字段
s = s.source(["firstname", "address"])
# source() 方法还可以接受 includes 和 excludes 参数来指定返回的字段或者不返回的字段
s = s.source(
includes=["address", 'balance'],
excludes=["firstname", 'age']
)
# extra() 函数接受一些查询的额外属性,如 size 参数决定返回条数,from 参数可以决定从第几条数据开始返回
# sort 参数决定排序方式,以及 _source 参数决定返回的字段
s = s.extra(
sort="age",
_source=["firstname", "lastname"],
**{
"from": 1,
"size": 2
}
)
# 直接运行 kibana 里执行的命令,使用 from_dict() 函数
s = s.from_dict({
"query": {
"bool": {
"filter": {
"range": {
"age": {
"gte": 21,
"lte": 22
}
}
}
}
}
})
response = s.execute()
2 数据更新
先要获取数据库连接,然后通过 conn 连接可以直接对数据进行更新,可用的方法有 update(),update_by_query() 以及一个批量的 bulk() 方法。
# 根据配置别名获取连接
from elasticsearch_dsl import connections
connections.configure(
default={"hosts": "http://localhost:9200"},
)
conn = connections.get_connection(alias='default')
# 直接使用 elasticsearch.Elasticsearch 模块来建立一个连接
from elasticsearch import Elasticsearch
conn = Elasticsearch(hosts="http://localhost:9200")
# update() 函数一般只用于指定 id 的更新
conn = connections.get_connection(alias='default')
res = conn.update(
index="bank",
id='hRWMkYwBEYBhnKm0pl6b',
body={
"doc": {
"age": 22,
"balance": 88888,
"city": "China",
}
}
)
print(res)
# update_by_query() 可以更新任意符合条件的数据,如将city=Moscow的所有数据的名字改为Michael Corleone,年龄改为28
field_list = ["firstname", "lastname", "age"]
source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]
params = {
"firstname": "Michael",
"lastname": "Corleone",
"age": "28",
}
res = conn.update_by_query(
index="bank",
body={
"query": {
"term": {"city": "Moscow"}
},
"script": {
"source": ";".join(source_list),
"params": params
}
}
)
print(res)
# 如果想批量更新数据,这批数据各个字段的值都不一致,自定义的程度很大,无法使用 update_by_query() 函数
# 则可使用 helpers.bulk() 批量更新方法,_op_type:如果是更新操作,其值则是 update
# _index:表示需要更新的数据所在的索引,_id:表示这条需要更新的数据的 id
# doc:是一个 dict 数据,其下包含了需要更新的字段及其对应的值
from elasticsearch import helpers
action_1 = {
"_op_type": "update",
"_index": "bank",
"_id": "exWMkYwBEYBhnKm0pl2Z",
"doc": {"age": 18, "firstname": "令狐冲", "address": "中国华山"},
}
action_2 = {
"_op_type": "update",
"_index": "bank",
"_id": "nRWMkYwBEYBhnKm0pl2Z",
"doc": {"age": 18, "firstname": "杨过", "address": "中国终南山"},
}
action_3 = {
"_op_type": "update",
"_index": "bank",
"_id": "SxWMkYwBEYBhnKm0pl2Z",
"doc": {"age": 18, "firstname": "张无忌", "address": "中国武当山"},
}
action_list = [action_1, action_2, action_3]
res = helpers.bulk(conn, actions=action_list)
print(res)
# UpdateByQuery() 和 Search() 差不多,都是通过 using 和 index 参数来获取 es 连接和索引
# 更新数据的具体语法和 update_by_query 差不多,都是通过 script 的方式来操作
from elasticsearch_dsl import UpdateByQuery
ubq = UpdateByQuery(using="default", index="bank")
q1 = ES_Q("term", city="Moscow")
ubq = ubq.query(q1)
ubq = ubq.script(
source="ctx._source.firstname=params.firstname;ctx._source.address=params.address",
params={
"firstname": "郭靖",
"address": "襄阳城"
}
)
res = ubq.execute()
print(res)
3 创建与删除
还是需要使用连接 conn 进行操作
# 索引的创建和删除
index_name = "kungfu_master"
conn.indices.create(index=index_name)
exist_index = conn.indices.exists(index=index_name)
print(exist_index)
# 删除索引
conn.indices.delete(index_name)
# 创建单条数据
res = conn.index(
index=index_name,
body={
"name": "火云邪神",
"good_at": "昆仑派蛤蟆功"
}
)
print(res)
# 批量创建数据,用到在批量更新时候的使用过的 elasticsearch.helpers 函数。创建数据_op_type 的值为 index
from elasticsearch import helpers
action_1 = {
"_op_type": "index",
"_index": index_name,
"doc": {"name": "那个谁", "good_at": "如来神掌"},
}
action_2 = {
"_op_type": "index",
"_index": index_name,
"doc": {"name": "酱爆", "good_at": "理发"},
}
action_list = [action_1, action_2]
res = helpers.bulk(conn, actions=action_list)
print(res)
# 删除通过 Search() 方法加入条件后执行 delete() 函数
s = Search(using="default").index(index_name).query("match", name="火云邪神")
res = s.delete()
print(res)
# 使用 delete_by_query() 函数
conn = connections.get_connection("default")
q1 = ES_Q("multi_match", query="酱爆")
res = conn.delete_by_query(
index=index_name,
body={
"query": q1
}
)
print(res)