ElasticSearch:存储数据

存储数据需要定义的是文档的结构,即数据包含哪些字段,各字段都是什么类型,哪些字段被用作全文检索,并使用哪种分析器。在实际应用中,我们希望能够批量地导入文档,而不是一条文档一条文档地去执行 POST 操作。

Mapping基本概念

Mapping 对应关系型数据库的 schema 的定义,其作用包括定义索引中字段的名字,定义字段的数据类型,例如字符串、数字、布尔类型等;字段的倒排索引相关配置,例如是否被倒排索引,使用什么分析器等。

本节内容我们重点介绍 Elasticsearch 中的字段类型:

  • 字符串类型:

    • text:定义可以被全文检索的字段,该字段内容默认会被分析器分析,并生成倒排索引,参与到检索与相关性计算中
    • keyword:一般用于定义简短或结构化的文本字段,例如标签、email 地址、手机号码等,定义为该类型的字段默认不会被分析器分析,通常用来作过滤、排序、聚合等
  • 数值类型:ES 支持多种精度的数值类型,在满足需求的情况下,优先使用范围小的字段。字段长度越短,索引和搜索的效率越高。浮点数,优先考虑使用 scaled_float

    |类型 | 取值范围 |
    |———|————-|
    |long|-2^63^ 到 2^63^-1|
    |integer|-2^31^ 到 2^31^-1|
    |short|-2^15^ 到 2^15^-1|
    |byte|-2^7^ 到 2^7^-1|
    |double|64 位的双精度 IEEE754 浮点类型|
    |float|32 位的双精度 IEEE754 浮点类型|
    |half_float|16 位的双精度 IEEE754 浮点类型|
    |scaled_float|缩放类型的浮点类型|

  • 日期类型(date): 取值为多种格式日期时间表达式的字段,通过定义为 date 类型在 ES 内部被解析为毫秒计时的长整型数

  • 布尔类型(boolean):文档中取值为 “true”、“false”、true、false 的字段

  • 二进制类型(binary):二进制接受的是 base64 编码的字符串,默认不可搜索

  • 对象类型(object):文档中属于结构化的对象数据对应的字段应该定义为对象类型

  • 数组类型:ES 中没有专门的数组类型,任何字段都可以有一个或者多个值。也就是说我们无需对一个取值为数组的字段显式地定义其为数组类型,只需要按照数组中元素的类型去定义即可,同时要确保数据中数组的元素都是同一种类型

  • 嵌套类型(nested):它是对象类型的一个特例,如果一个字段的取值是对象的数组,在 ES 内部回对其进行扁平化处理,将一个对象转为字段名和值构成的简单列表,这样会导致对象的属性关系丢失,此时应该使用嵌套类型来独立保存数组中的每个对象

还有范围类型(integer_range)等一些特殊的数据类型,由于我们使用不多,所以不做详细的介绍了。专有的特殊类型包括:地理类型(geo_poing & geo_shape / percolator),IP类型(ip),统计词频类型(token_count),向量类型(dense_vector / sparse_vector)等。

值得注意的是,对于一个字段的类型定义,只能在创建 index 时一次性完成,此后不能更改。新增字段并指定新字段的类型是可以的。

Dynamic Mapping

Dynamic Mapping 的机制,使得我们无需手动定义 Mapping,ES 会自动根据文档信息,推算出字段的类型。我们前一节在创建 shopping 索引的时候,并没有显式地定义过映射,但是 ES 还是自动地识别字段生成了一些定义。我们在 Kibana 中发送请求 GET /shopping/_mapping 可以得到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"shopping" : {
"mappings" : {
"properties" : {
"category" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256 // 用作 keyword 时,限制最大字符长度,超过则不被索引
}
}
},
...

"price" : {
"type" : "float"
},
...
}
}
}
}

我们导入的文档数据里的字段都被放到了 Mapping 的 properties 里了,而且通过 type 指定了它们的数据类型。有的时候 ES 对字段的类型可能会推算不准,尤其是一些复杂类型的字段,当类型设置不对时,会导致一些功能无法正常运行。所以,ES 允许用户定义 Dynamic Templates 通过模板匹配来设置正确的字段定义。

Mapping常用参数

type 参数(字段的类型)仅仅只是 Mapping 中需要设置的一个参数,还有很多常用的参数我们需要了解,并且在实践中做相关个性化的配置。

  • dynamic:与 properties 是同级的,默认是 true,即如果数据中有新字段会自动推算其类型,也可以设置为 false,表示忽略新字段;当设置为 strict 时则会在发现新字段时抛出异常
  • analyzer:定义文本字段的分析器,默认对索引和查询对生效,分析器是一种用于分析数据或者按照用户想要的方式处理数据的工具,我们会在《ElasticSearch:分析数据》一章中详细展开
  • search_analyzer:查询时的分析器,如果定义了查询则使用这个分析器
  • index:是否使用分析器构建倒排索引,默认为 true;设置为 false 只表示不使用分析器构建倒排,但仍然可以通过完全匹配搜索到(正排索引)
  • enabled:ES 默认会索引所有的字段,但是有的字段可能只需要存储,不需要索引(不索引能减少 CPU 使用、不影响相关性评分),此时可以设置该参数为 false
  • fields:可以让同一字段有多种不同的索引方式
  • normalizer:用于解析前(索引或者查询)的标准化配置,如大小写归一化
  • boost:设置字段的权重,一般不推荐配置,常用的方法是在查询时设置,可以动态变化
  • coerce 用来清除脏数据,默认为 true,如当数值类型的字段以字符串形式表达时会自动清洗,如果设为 false 则会报错
  • copy_to:可以将多个字段的值,复制到同一个字段中
  • ignore_above:用于指定分词和索引的字符串最大长度,超过最大长度的话,该字段将不会被索引,这个字段只适用于 keyword 类型
  • index_options:控制索引时哪些信息被存储到倒排索引中,有四种取值:docs 表示只存储文档编号(默认);freqs 表示在 docs 基础上,存储词项频率;positions 是在 freqs 基础上,存储词项偏移位置;offsets 是在 positions 基础上,存储词项开始和结束的字符位置
  • position_increment_gap:被解析的 text 字段会将 term 的位置考虑进去,目的是为了支持近似查询和短语查询,当我们去索引一个含有多个值的 text 字段时,会在各个值之间添加一个假想的空间,将值隔开,这样就可以有效避免一些无意义的短语匹配,间隙大小通过该参数来控制,默认是 100
  • similarity:指定文档的评分模型,默认有三种:BM25 为默认的评分模型;classic 为 TF/IDF 评分;boolean 为布尔模型评分
  • store:控制数据存储方式,设置为 true 的字段会单独存储。如果查询是从一个很多字段的文档中检索几个小的字段,就可以将小的字段设置为 store 来单独存储,查询时不再通过 _source 获得冗余的信息,而只从 stored_fileds 中取
  • ignore_malformed:忽略掉不正常的字段类型,默认为 false,即文档中有类型不匹配的字段就抛出异常,如果设为 true 则忽略掉出错的字段,正常处理其他字段

自定义Mapping

根据自己的数据与业务需求我们可以定义自己的 Mapping,并在创建索引时进行声明,我们这里给出一套常用的基本框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
PUT my_index
{
"mappings" : {
// 关闭动态mapping
"dynamic": false,
// 并不需要返回图书内容,所以关闭_source,在stored_fields中指定要返回的字段
"_source": {"enabled": false},
"properties" : {
// 作者名字不进行分析器处理,只能完全匹配
"author" : {"type" : "keyword", "store": true},
// URL不需要倒排索引
"cover_url" : {"type" : "keyword", "index": false, "store": true},
// 图书简介与内容需要构建倒排,支持全文检索
"description" : {"type" : "text", "analyzer": "ik_smart", "store": true},
"content" : {"type" : "text", "analyzer": "ik_smart", "store": true},
// 出版日期是date类型
"public_date" : {"type" : "date", "store": true},
// 图书标题同时支持全文检索(倒排式)与完全匹配(正排式)的检索
"title" : {
"type" : "text",
"analyzer": "ik_smart",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 100
}
},
"store": true
}
}
}
}

定义字段的数据类型,指定分析器,设置检索模式,以及存储方式是最常用的基本配置,可以参考上面的示例针对自己的业务数据进行 Mapping 的自定义配置。对于支持倒排索引的字段,最基本的分析器功能就是分词,可以直接将 analyzer 参数设置为 ES 内置的分词器 ik_smart。

使用Python API

结合前面一章《ElasticSearch:基础知识》的介绍,我们可以搭建 ES 环境,并根据自己的数据与业务特点设计 Mapping,我们给出了一套使用 Python API 进行 ES 操作的 client 框架,包括创建索引、导入数据以及查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch import helpers
from tqdm import tqdm

class ElasticSearchClient(object):
def __init__(self, host, port, user=None, pwd=None, bulk_volume=2000):
self.es_servers = [{
"host": host,
"port": port
}]
self.es_auth = (user, pwd)
self.bulk_volume = bulk_volume

try:
if user == None:
# 无用户名密码状态
self.es_instance = Elasticsearch(hosts=self.es_servers)
else:
# 用户名密码状态
self.es_instance = Elasticsearch(hosts=self.es_servers, http_auth=self.es_auth)
except:
print('连接失败')


def create_index(self, index_name):
'''
TODO: 进行创建一个索引
:param index_name: 索引名
'''
self.es_instance.indices.create(index=index_name)


def create_index_with_mapping(self, index_name, body):
'''
TODO: 按照指定的body(包含settings和mappings)创建一个index
:param index_name: 索引名
:param body: 索引的配置,包含(`settings` 和 `mappings`)
'''
self.es_instance.indices.create(index=index_name, body=body)


def delete_index(self, index_name):
'''
TODO: 删除一个索引
:param index_name: 索引名
'''
self.es_instance.indices.delete(index=index_name)


def add_document(self, index_name, doc_obj):
"""
TODO: 单条插入ES
:param index_name: 索引名
:param doc_obj: 一条document
"""
self.es_instance.index(index=index_name, document=doc_obj, doc_type='_doc')


def add_document_in_bulk(self, index_name, doc_obj_list):
"""
TODO: 批量插入ES
:param index_name: 索引名
:param doc_obj_list 多条document组成的列表
"""
load_data = []
i = 1
for row_obj in tqdm(doc_obj_list):
action = {
"_index": index_name,
"_type": '_doc',
"_source": row_obj
}
load_data.append(action)
i += 1

# 批量处理
if len(load_data) == self.bulk_volume:
print('插入', i / self.bulk_volume, '批bulk')
success, failed = helpers.bulk(self.es_instance, load_data, index=index_name, raise_on_error=True)
print(success, failed)
load_data.clear()

# 处理剩下不足 bulk_volume 的
if len(load_data) > 0:
success, failed = bulk(self.es_instance, load_data, index=index_name, raise_on_error=True)
del load_data[0:len(load_data)]
print(success, failed)


def update_document_by_id(self, index_name, _id, doc_obj):
"""
TODO: 根据给定的_id,更新ES文档
:param index_name: 索引名
:param _id: ES中document的id
:param doc_obj: 一条document
"""
self.es_instance.update(index=index_name, body={"doc": doc_obj}, id=_id, doc_type='_doc')


def delete_document_by_id(self, index_name, _id):
"""
TODO: 根据给定的id,删除文档
:param index_name: 索引名
:param _id: ES中document的id
"""
self.es_instance.delete(index=index_name, id=_id, doc_type='_doc')


def search_document_by_query(self, index_name, query):
'''
TODO: 根据查询的query语句,来搜索查询内容
:param index_name: 索引名
:param query: 结构化的检索表达式
'''
search_result = self.es_instance.search(index=index_name, body=query, doc_type='_doc')
return search_result


def clear_documents(self, index_name):
'''
TODO: 清空指定索引里的所有文档
:param index_name: 索引名
'''
delete_by_all = {"query": {"match_all": {}}}
result = self.es_instance.delete_by_query(index=index_name, body=delete_by_all, doc_type='_doc')
print(result)

这部分代码之所以如此简洁,是因为个性化的数据定义与查询都被封装成了 JSON(Python中的字典对象)以参数的形式传入,所以我们后面的章节都将围绕如何组合我们想要的 JSON 结构进行详细的讲解。