Back to Blogs
elasticsearch
python
tutorial

ElasticSearch in Python

Soloman
2021-08-07

ElasticSearch in Python

先安装 elasticsearch_dsl

pip3 install elasticsearch_dsl

1 基本查询

连接ES服务器

from elasticsearch_dsl import connections

# 单连接
connections.configure(
    default={"hosts": "http://localhost:9200"},
)

# 多集群连接
connections.configure(
    default={"hosts": "http://localhost"},
    dev={
        "hosts": ["http://example.com:9200"],
        "sniff_on_start": True,
    }
)

使用 Search 在 Python 中查询 ES 数据,这个查询 API 是链式操作,可以允许你链接多个操作。在 Search() 方法里,我们通过 using 指定连接,但是没有指定 index,那么就默认搜索的是该连接的全部 index,我们可以通过 index() 函数指定 index,也可以向 Search() 中添加 index 参数来指定 index

# 基础链式查询
response = Search(using="default").query("match", address="603 Cooper Street").execute()
print(response)

# 指定 index 为 bank
s = Search(using="default", index="bank").query("match", address="603 Cooper Street")
response = s.execute()
s = Search(using="default").index("bank").query("match", address="603 Cooper Street")
response = s.execute()
print(response)

# 要查看执行的语句转换成的 es 的语句,可以使用 to_dict() 方法
# {'query': {'match': {'address': '603 Cooper Street'}}}
s = Search(using="default", index="bank").query("match", address="603 Cooper Street")
print(s.to_dict())

# 对于 response,可以使用 Python 中的列表的形式来访问它,也可以对这个对象使用迭代方法
# 它迭代的其实是 response.hits 属性,所以 response[0] 和 response.hits[0] 是等效操作
s = Search(using="default").index("bank").query("match", address="603 Cooper Street")
response = s.execute()
print(response[0])
print(response.hits[0])
# 获取符合查询条件的总数
count = s.count()
count = response.hits.total.value
print(count)

# 对于单条数据
hit = response.hits[0]
# 一次性获取返回的单条数据的各个字段及相应的值,可以使用 to_dict() 方法
print(hit.to_dict())
# 获取单个值,比如 name,可以像属性一样直接获取
print(hit.firstname, hit.lastname)
# 还可获取数据的 meta 信息,其中包含了这条数据所在的 index,id,匹配分值 score 等
print(hit.meta.to_dict())

query() 中接受两个参数,第一个是字段查询的方式,比如这里是 match,也可以是 term。第二个则是查询的字段与值,比如这里是查询的 address字段为 "603 Cooper Street" 的数据。

除了最简单的 query() 方法,还有很多其它查询方式

# 这两个 query() 通过链式操作连在一起转换成 es 语句就是使用 must 将多条件连接在一起
s = Search(using="default").index("bank")
s = s.query("match", firstname="Jimenez").query("match", address="603 Cooper Street")
response = s.execute()

# Q() 查询,这里和 Django 里的 Q() 方法查询一样,也是用于条件的联合,与或非条件等
from elasticsearch_dsl import Q as ES_Q
q1 = ES_Q("match", age=25)
q2 = ES_Q("match", city="Moscow")
s = Search(using="default").index("bank")
# 与 &
response = s.query(q1 & q2).execute()
# 或 |
response = s.query(q1 | q2).execute()
# 非 ~ 。取反直接在条件前加一个 ~
response = s.query(~q1).execute()
# 想取反还可以直接使用 exclude() 函数,这个和 Django 里的操作也是一样的
response = s.exclude(q1).execute()

# 搜索多字段 multi_match
q = ES_Q("multi_match", query="Jimenez Moscow", fields=["firstname", "address"])

# text.keyword 是将 text 字段作为一个整体进行查询
q = ES_Q({"term": {"address.keyword": "Moscow"}})
q = ES_Q("term", address__keyword="Moscow")

# filter() 过滤条件查询
s = s.filter(q)

# range 实现大小于的操作
q = ES_Q({"range": {"age": {"gte": 38}}})
s = s.query(q)

# 排序
response = s.query(q).sort('age', '-balance').execute()

# 分页,可以直接使用 Python 里的切片操作
s = s.query(q).sort('age', '-balance')
response = s[:10].execute()

# source() 指定返回字段
s = s.source(["firstname", "address"])
# source() 方法还可以接受 includes 和 excludes 参数来指定返回的字段或者不返回的字段
s = s.source(
    includes=["address", 'balance'],
    excludes=["firstname", 'age']
)

# extra() 函数接受一些查询的额外属性,如 size 参数决定返回条数,from 参数可以决定从第几条数据开始返回
# sort 参数决定排序方式,以及 _source 参数决定返回的字段
s = s.extra(
    sort="age",
    _source=["firstname", "lastname"],
    **{
        "from": 1,
        "size": 2
    }
)

# 直接运行 kibana 里执行的命令,使用 from_dict() 函数
s = s.from_dict({
  "query": {
    "bool": {
      "filter": {
        "range": {
          "age": {
            "gte": 21,
            "lte": 22
          }
        }
      }
    }
  }
})
response = s.execute()

2 数据更新

先要获取数据库连接,然后通过 conn 连接可以直接对数据进行更新,可用的方法有 update(),update_by_query() 以及一个批量的 bulk() 方法。

# 根据配置别名获取连接
from elasticsearch_dsl import connections
connections.configure(
    default={"hosts": "http://localhost:9200"},
)
conn = connections.get_connection(alias='default')

# 直接使用 elasticsearch.Elasticsearch 模块来建立一个连接
from elasticsearch import Elasticsearch
conn = Elasticsearch(hosts="http://localhost:9200")


# update() 函数一般只用于指定 id 的更新
conn = connections.get_connection(alias='default')
res = conn.update(
    index="bank",
    id='hRWMkYwBEYBhnKm0pl6b',
    body={
        "doc": {
            "age": 22,
            "balance": 88888,
            "city": "China",
        }
    }
)
print(res)

# update_by_query() 可以更新任意符合条件的数据,如将city=Moscow的所有数据的名字改为Michael Corleone,年龄改为28
field_list = ["firstname", "lastname", "age"]
source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]
params = {
    "firstname": "Michael",
    "lastname": "Corleone",
    "age": "28",
}
res = conn.update_by_query(
    index="bank",
    body={
        "query": {
            "term": {"city": "Moscow"}
        },
        "script": {
            "source": ";".join(source_list),
            "params": params
        }
    }
)
print(res)

# 如果想批量更新数据,这批数据各个字段的值都不一致,自定义的程度很大,无法使用 update_by_query() 函数
# 则可使用 helpers.bulk() 批量更新方法,_op_type:如果是更新操作,其值则是 update
# _index:表示需要更新的数据所在的索引,_id:表示这条需要更新的数据的 id
# doc:是一个 dict 数据,其下包含了需要更新的字段及其对应的值
from elasticsearch import helpers

action_1 = {
    "_op_type": "update",
    "_index": "bank",
    "_id": "exWMkYwBEYBhnKm0pl2Z",
    "doc": {"age": 18, "firstname": "令狐冲", "address": "中国华山"},
}
action_2 = {
    "_op_type": "update",
    "_index": "bank",
    "_id": "nRWMkYwBEYBhnKm0pl2Z",
    "doc": {"age": 18, "firstname": "杨过", "address": "中国终南山"},
}
action_3 = {
    "_op_type": "update",
    "_index": "bank",
    "_id": "SxWMkYwBEYBhnKm0pl2Z",
    "doc": {"age": 18, "firstname": "张无忌", "address": "中国武当山"},
}
action_list = [action_1, action_2, action_3]
res = helpers.bulk(conn, actions=action_list)
print(res)

# UpdateByQuery() 和 Search() 差不多,都是通过 using 和 index 参数来获取 es 连接和索引
# 更新数据的具体语法和 update_by_query 差不多,都是通过 script 的方式来操作

from elasticsearch_dsl import UpdateByQuery
ubq = UpdateByQuery(using="default", index="bank")
q1 = ES_Q("term", city="Moscow")
ubq = ubq.query(q1)
ubq = ubq.script(
    source="ctx._source.firstname=params.firstname;ctx._source.address=params.address",
    params={
        "firstname": "郭靖",
        "address": "襄阳城"
    }
)
res = ubq.execute()
print(res)

3 创建与删除

还是需要使用连接 conn 进行操作

# 索引的创建和删除
index_name = "kungfu_master"
conn.indices.create(index=index_name)
exist_index = conn.indices.exists(index=index_name)
print(exist_index)
# 删除索引
conn.indices.delete(index_name)


# 创建单条数据
res = conn.index(
    index=index_name,
    body={
        "name": "火云邪神",
        "good_at": "昆仑派蛤蟆功"
    }
)
print(res)

# 批量创建数据,用到在批量更新时候的使用过的 elasticsearch.helpers 函数。创建数据_op_type 的值为 index
from elasticsearch import helpers

action_1 = {
    "_op_type": "index",
    "_index": index_name,
    "doc": {"name": "那个谁", "good_at": "如来神掌"},
}
action_2 = {
    "_op_type": "index",
    "_index": index_name,
    "doc": {"name": "酱爆", "good_at": "理发"},
}
action_list = [action_1, action_2]
res = helpers.bulk(conn, actions=action_list)
print(res)

# 删除通过 Search() 方法加入条件后执行 delete() 函数
s = Search(using="default").index(index_name).query("match", name="火云邪神")
res = s.delete()
print(res)

# 使用 delete_by_query() 函数
conn = connections.get_connection("default")
q1 = ES_Q("multi_match", query="酱爆")
res = conn.delete_by_query(
    index=index_name,
    body={
        "query": q1
    }
)
print(res)

4 参考网站