ElasticSearch
Elasticsearch (简称ES)是一个分布式、RESTful 风格的搜索和数据分析引擎。
概述
全文搜索引擎
全文搜索引擎指的是目前广泛应用的主流搜索引擎。它的工作原理是计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。
ElasticSearch vs Solr :
Elasticsearch和Solr,这两款都是基于Lucene搭建的,Lucene是提供全文搜索功能类库
- Solr 利用 Zookeeper 进行分布式管理,而 Elasticsearch 自身带有分布式协调管理功能。
- Solr 支持更多格式的数据,比如JSON、XML、CSV,而 Elasticsearch 仅支持json文件格式。
- Solr在查询死数据时,速度相对ES更快一些。但是数据如果是实时改变的,Solr的查询速度会降低很多,ES的查询的效率基本没有变化。

正向索引
搜索时,遍历文档,判读文档中是否包含指定搜索内容
doc1->word1,word2,word3...
doc2->word2,word3,word4...
反向索引(倒排索引)
倒排索引步骤:
- 数据根据词条进行分词,同时记录文档索引位置
- 将词条相同的数据化进行合并
- 对词条进行排序

搜索过程:
先将搜索词语进行分词,分词后再倒排索引列表查询文档位置(docId)。根据docId查询文档数据。
| 搜索:中国人 | 分词库 | 分词匹配 | 数据 |
|---|---|---|---|
| 得到的结果: 我是中国人<br/> 我是大好人 | 我 1 2 3<br/>是 1 2 3<br/>中国人 1 <br/>中国 1 <br/>人 1 2 <br/>大 2 <br/>好人 2<br/>好 2 <br/>谁 3 <br/>要 4 <br/>去 4 <br/>哪 4 | 中国人 1 <br/>中国 1 <br/>人 1 2 <br/> | 1、我是中国人 <br/>2、我是大好人<br/> 3、我是谁<br/> 4、要去哪 |
ElasticSearch结构


索引(Index)
一个索引就是一个拥有几分相似特征的文档的集合。 比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。
类型(Type)
一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组共同字段的文档定义一个类型。不同的版本,类型发生了不同的变化
| 版本 | Type |
|---|---|
| 5.x | 支持多种type |
| 6.x | 只能有一种type |
| 7.x | 默认不再支持自定义索引类型(默认类型为:_doc),即去除了索引 |
文档(Document)
一个文档是一个可被索引的基础信息单元,也就是一条数据
比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。文档以JSON(Javascript Object Notation)格式来表示,而JSON是一个到处存在的互联网数据交互格式。
在一个index/type里面,你可以存储任意多的文档。
字段(Field)
相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。
映射(Mapping)
mapping是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、分析器、是否被索引等等。这些都是映射里面可以设置的,其它就是处理ES里面数据的一些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射,并且需要思考如何建立映射才能对性能更好。
ElasticSearch基本操作
参考文档: https://www.elastic.co/guide/en/elasticsearch/reference/7.8/index.html
分词器
官方提供的分词器有这么几种: Standard、Letter、Lowercase、Whitespace、UAX URL Email、Classic、Thai等。不同语言最适合的分词器不同, 中文分词器可以使用第三方的比如IK分词器:ik_max_word,ik_max_word
#每一个字均单独分出来:我 是 中 国 人
POST _analyze
{
"analyzer": "standard",
"text": "我是中国人"
}
#分出所有可能的词:我 是 中国人 中国 国人
POST _analyze
{
"analyzer": "ik_max_word",
"text": "我是中国人"
}
#分出最有可能的词:我 是 中国人
POST _analyze
{
"analyzer": "ik_smart",
"text": "我是中国人"
}索引操作
- 创建索引:PUT /{索引名称}:PUT /my_index
- 查看所有索引: GET /_cat/indices?v 携带参数v将会显示表头
- 查看单个索引: GET /{索引名称}
- 删除单个索引: DELETE /{索引名称}
文档操作
创建文档:类型使用_doc
PUT /{索引名称}/{类型}/{id} { jsonbody } 例: PUT /my_index/_doc/1 { "title": "小米手机", "category": "小米", "images": "http://www.gulixueyuan.com/xm.jpg", "price": 3999 }查看单个文档: GET /{索引名称}/{类型}/{id}
查看所有文档:
GET /索引名称/_search { "query": { "match_all": {} } }全局修改文档: PUT必须自定义id,无法自动生成id,POST可以自动生成id。第一次是添加,后续是更新。该更新方式为全部更新,直接将修改后的数据覆盖原数据。
PUT /{索引名称}/{类型}/{id} { jsonbody } POST /{索引名称}/{类型}/{id} { jsonbody } 例: 添加数据 PUT /user/_doc/1 { "username":"张三", "age":18 } 修改数据,修改完后仅剩username:lisi PUT /user/_doc/1 { "username":"lisi" }局部修改文档: 该更新方式为局部更新,仅更新或添加指定字段,不会修改未涉及到的原字段。
POST /{索引名称}/_update/{66} { "doc": { "属性": "值" } }删除文档: DELETE /{索引名称}/{类型}/{id}
批量操作: actionName可以有CREATE、DELETE等。
{"actionName":{"_index":"indexName", "_type":"typeName","_id":"id"}} {"field1":"value1", "field2":"value2"} 批量创建: POST _bulk {"create":{"_index":"my_index","_id":2}} {"id":2,"title":"华为手机","category":"华为","images":"http://www.gulixueyuan.com/xm.jpg","price":5500} {"create":{"_index":"my_index","_id":3}} {"id":3,"title":"VIVO手机","category":"vivo","images":"http://www.gulixueyuan.com/xm.jpg","price":3600} 批量删除: POST _bulk {"delete":{"_index":"my_index","_id":2}} {"delete":{"_index":"my_index","_id":3}}
映射
查看映射: GET /{索引名称}/_mapping
动态映射
Elasticsearch中不需要定义Mapping映射(即关系型数据库的表、字段等),在文档写入 Elasticsearch时,会根据文档字段自动识别类型,这种机制称之为<font color='red'>动态映射</font>。
映射规则对应:
| 数据 | 对应的类型 |
|---|---|
| null | 字段不添加 |
| true|flase | boolean |
| 字符串 | text |
| 数值 | long |
| 小数 | float |
| 日期 | date |
静态映射:
静态映射是在Elasticsearch中也可以事先定义好映射,包含文档的各字段类型、分词器等,这种方式称之为<font color="red">静态映射</font>。
type分类如下:
字符串:string,string类型包含 text 和 keyword。
text:该类型被用来索引长文本,在创建索引前会将这些文本进行分词,转化为词的组合,建立索引;允许es来检索这些词,text类型不能用来排序和聚合。
keyword:该类型不能分词,可以被用来检索过滤、排序和聚合,keyword类型不可用text进行分词模糊检索。
数值型:long、integer、short、byte、double、float
日期型:date
布尔型:boolean
#删除原创建的索引
DELETE /my_index
#创建索引,并同时指定映射关系和分词器等。
PUT /my_index
{
"mappings": {
"properties": {
"title": {
"type": "text", //字段类型。
"index": true, //为true会做成索引可以进行搜索,为false不会建立索引,无法被搜索,默条认true
"store": true, //决定了字段的原始值是否会被存储在索引中。默认为false,字段在_source字段中一起存储。如果设置为true,可以在搜索结果中直接返回这个字段的原始值,而不需要从_source字段中提取。
"analyzer": "ik_max_word", //插入数据时使用的分词器
"search_analyzer": "ik_max_word" //查询数据时使用的分词器
},
"category": {
"type": "keyword",
"index": true,
"store": true
},
"images": {
"type": "keyword",
"index": true,
"store": true
},
"price": {
"type": "integer",
"index": true,
"store": true
}
}
}
}
结果:
{
"acknowledged" : true,
"shards_acknowledged" : true,
"index" : "my_index"
}DSL高级查询
Query DSL概述: Domain Specific Language(领域专用语言),Elasticsearch提供了基于JSON的DSL来定义查询。
准备数据
POST _bulk
{"create":{"_index":"my_index","_id":1}}
{"id":1,"title":"华为笔记本电脑","category":"华为","images":"http://www.gulixueyuan.com/xm.jpg","price":5388}
{"create":{"_index":"my_index","_id":2}}
{"id":2,"title":"华为手机","category":"华为","images":"http://www.gulixueyuan.com/xm.jpg","price":5500}
{"create":{"_index":"my_index","_id":3}}
{"id":3,"title":"VIVO手机","category":"vivo","images":"http://www.gulixueyuan.com/xm.jpg","price":3600}
查询所有文档:
POST /索引库/_search
{
"query": {
"match_all": {}
}
}匹配查询(match):
match仅支持单条件,会将匹配字段进行分词
POST /my_index/_search
{
"query": {
"match": {
"title": "华为智能手机" //title的类型为text,将分词后查询
}
}
}补充条件删除:
POST /my_index/_delete_by_query
{
"query": {
"match": {
"title": "vivo"
}
}
}多字段匹配(multi_match)
POST /my_index/_search
{
"query": {
"multi_match": {
"query": "华为智能手机",
"fields": ["title","category"]
}
}
}前缀匹配(prefix)
POST /my_index/_search
{
"query": {
"prefix": {
"title": {
"value": "vivo智能"
}
}
}
}关键字精确查询(term)
term:关键字不会进行分词,但是存入的数据是分词后存入的,所以可能搜索不到。数字等需要精确查询的也使用term
POST /my_index/_search
{
"query": {
"term": {
"title": {
"value": "华为手机"
}
}
}
}多关键字精确匹配(terms)
或的关系,只要满足一个条件就会查询出来,同样不会分词。
POST /my_index/_search
{
"query": {
"terms": {
"title": [
"华为手机",
"华为"
]
}
}
}范围查询(range)
范围查询使用range。
- gte: 大于等于(greater than or equal)
- lte: 小于等于(less than or equal)
- gt: 大于(greater than)
- lt: 小于(less than)
POST /my_index/_search
{
"query": {
"range": {
"price": {
"gte": 3000,
"lte": 5000
}
}
}
}返回指定字段(_source)
POST /my_index/_search
{
"query": {
"terms": {
"title": [
"华为手机",
"华为"
]
}
},
"_source": ["title","category"]
}嵌套查询(nested)
//存在以下数据,需要查询{"name" : "Alice","age" : "18"}
{
"group" : "fans",
"users" : [
{
"name" : "John",
"age" : "23"
},
{
"name" : "Alice",
"age" : "18"
}
]
}
//key 以 "nested" 开头
//path 就是嵌套对象数组的字段名
//score_mode (可选的)匹配子对象的分数相关性分数。avg (默认,使用所有匹配子对象的平均相关性分数)
//ignore_unmapped (可选的)是否忽略 path 未映射,不返回任何文档而不是错误。默认为 false,如果 path 不对就报错
GET /my_index/_search?pretty
{
"query": {
"bool": {
"must": [
{
"nested": {
"path": "users",
"query": {
"bool": {
"must": [
{
"match": {
"users.name": "Alice"
}
},
{
"match": {
"users.age": 18
}
}
]
}
}
}
}
]
}
}
}
组合查询(bool)
bool 各条件之间有and,or或not的关系
- must: 各个条件都必须满足,所有条件是and的关系
- should: 各个条件有一个满足即可,即各条件是or的关系
- must_not: 不满足所有条件,即各条件是not的关系
- filter: 与must效果等同,但是它不计算得分,效率更高点。
must
各个条件都必须满足,所有条件是and的关系
POST /my_index/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"title": "华为"
}
},
{
"range": {
"price": {
"gte": 3000,
"lte": 5000
}
}
}
]
}
}
}should
各个条件有一个满足即可,即各条件是or的关系
POST /my_index/_search
{
"query": {
"bool": {
"should": [
{
"match": {
"title": "华为"
}
},
{
"range": {
"price": {
"gte": 3000,
"lte": 5000
}
}
}
]
}
}
}如果should和must同时存在,他们之间是and关系:
POST /my_index/_search
{
"query": {
"bool": {
"should": [
{
"match": {
"title": "华为"
}
},
{
"range": {
"price": {
"gte": 3000,
"lte": 5000
}
}
}
],
"must": [
{
"match": {
"title": "华为"
}
},
{
"range": {
"price": {
"gte": 3000,
"lte": 5000
}
}
}
]
}
}
}must_not
不满足所有条件,即各条件是not的关系
POST /my_index/_search
{
"query": {
"bool": {
"must_not": [
{
"match": {
"title": "华为"
}
},
{
"range": {
"price": {
"gte": 3000,
"lte": 5000
}
}
}
]
}
}
}filter
与must效果等同,但是它不计算得分,效率更高点。得分默认为0。
POST /my_index/_search
{
"query": {
"bool": {
"filter": [
{
"match": {
"title": "华为"
}
}
]
}
}
}聚合查询(aggs)
聚合允许使用者对es文档进行统计分析,类似与关系型数据库中的group by,当然还有很多其他的聚合,例如取最大值、平均值等等。
max
POST /my_index/_search
{
"query": {
"match_all": {}
},
"size": 0, // size为0表示不返回任何匹配的文档,只返回聚合结果
"aggs": {
"max_price": {// 自己为数据定义的名字
"max": {// max函数,对查询的结果进行聚合操作
"field": "price" //要求的字段
}
}
}
}min
POST /my_index/_search
{
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"min_price": {
"min": {
"field": "price"
}
}
}
}avg
POST /my_index/_search
{
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}sum
POST /my_index/_search
{
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"sum_price": {
"sum": {
"field": "price"
}
}
}
}stats
stats是一种聚合类型,它用于计算指定字段的以下五个统计信息:
- count:匹配的文档总数。
- min:字段的最小值。
- max:字段的最大值。
- avg:字段的平均值。
- sum:字段值的总和。
POST /my_index/_search
{
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"stats_price": {
"stats": {
"field": "price"
}
}
}
}terms
桶聚和相当于sql中的group by语句
// 这个查询请求的作用是在 my_index 索引中查找所有文档,
// 不返回具体文档内容,而是按照 category 字段对文档进行分组统计,
// 统计每个分类下的文档数量,并返回出现频率最高的前 10 个分类的统计结果。
POST /my_index/_search
{
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"groupby_category": { // 聚合结果的自定义的名称
"terms": {
"field": "category", //对category字段进行统计
"size": 10 // 返回出现频率最高的前 10 个分类的统计结果
}
}
}
}还可以对桶继续下钻:
// 按 category 字段分组统计文档数量的基础上,
// 进一步对每个分类分组内的 price 字段计算平均值。
POST /my_index/_search
{
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"groupby_category": {
"terms": {
"field": "category",
"size": 10
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
}
}
结果:
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"groupby_category" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "华为",
"doc_count" : 2,
"avg_price" : {
"value" : 5444.0
}
},
{
"key" : "vivo",
"doc_count" : 1,
"avg_price" : {
"alue" : 3600.0
}
}
]
}
}
}通配符匹配(wildcard)
?:匹配任意字符
*:匹配0个或多个字符
GET /index3/_search
{
"query": {
"wildcard": {
"author": {
"value": "张*"
}
}
}
}排序(sort)
指定多个排序关键词,先按第一个,相同后再按其他
POST /my_index/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"title": "华为"
}
}
]
}
},
"sort": [
{
"price": {
"order": "asc"
}
},
{
"_score": {//按照相似度评分进行排名
"order": "desc"
}
}
]
}分页查询(from、size、scoll)
from、size分页
分页的两个关键属性:from、size。
- from: 当前页的起始索引,默认从0开始。 from = (pageNum - 1) * size
- size: 每页显示多少条
POST /my_index/_search
{
"query": {
"match_all": {}
},
"from": 0,
"size": 2
}scoll分页
第一次使用分页查询,查询出所有结果
POST /my_index/_search?scroll=1m
{
"query": {
"match_all": {}
},
"size": 1
}
结果:
{
"_scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFGRKV2JWWHdCeUZ2WWVjeDY1V3NlAAAAAAAAGskWTERWbzhrWFZTdFd3WnVoOV9EaGV0dw==",
…………………………
}接着就用第一次返回的_scroll_id接着查,每次查询的条数由第一次size决定,查询结果从开始展示的结果继续向下
GET /_search/scroll?scroll=1m
{ "scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFHNKV2VWWHdCeUZ2WWVjeDZYbXNGAAAAAAAAGwUWTERWbzhrWFZTdFd3WnVoOV9EaGV0dw=="
}
结果:
{
"_scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFHNKV2VWWHdCeUZ2WWVjeDZYbXNGAAAAAAAAGwUWTERWbzhrWFZTdFd3WnVoOV9EaGV0dw==",
……………………
}高亮查询(highlight)
在进行关键字搜索时,搜索出的内容中的关键字会显示不同的颜色,称之为高亮。
POST /my_index/_search
{
"query": {
"match": {
"title": "华为"
}
},
"highlight": {
"pre_tags": "<b style='color:red'>",
"post_tags": "</b>",
"fields": {
"title": {}
}
}
}
结果:
{
…………………………
"hits" : [
{
"_index" : "my_index",
"_type" : "_doc",
"_id" : "2",
"_score" : 0.8025915,
"_source" : {
"id" : 2,
"title" : "华为手机",
"category" : "华为",
"images" : "http://www.gulixueyuan.com/xm.jpg",
"price" : 5500
},
"highlight" : {
"title" : [
"<b style='color:red'>华为</b>手机"
]
}
},
]
}
}近似查询(fuzzy)
返回包含与搜索字词相似的字词的文档。编辑距离是将一个术语转换为另一个术语所需的一个字符更改的次数。这些更改可以包括:
- 更改字符(box → fox)
- 删除字符(black → lack)
- 插入字符(sic → sick)
- 转置两个相邻字符(act → cat)
为了找到相似的术语,fuzzy查询会在指定的编辑距离内创建一组搜索词的所有可能的变体或扩展。然后查询返回每个扩展的完全匹配。通过fuzziness修改编辑距离。一般使用默认值AUTO,即一个字节,根据术语的长度生成编辑距离。
PUT /test/_doc/1
{
"title":"hello world"
}
GET /test/_search
{
"query": {
"fuzzy": {
"title": {
"value": "word",
"fuzziness":3
}
}
}
}elasticsearch-rest-high-level-client
ElasticSearch7.x版本使用该依赖,在更早期的版中使用的是RestClient
导入依赖
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.1</version>
</dependency>
<!--spring-boot-starter-data-elasticsearch中依赖了elasticsearch-rest-high-level-client,为了避免处理版本冲突问题,可以直接导入它-->
<!--注意,SpringBoot2.7版本后被废弃,移出了-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>构建client
RestHighLevelClient client = new RestHighLevelClient(RestClient
.builder(new HttpHost("192.168.85.157", 9200, "http")));
//或者配置完后,直接注入
spring:
elasticsearch:
# 指定ElasticSearch连接信息
uris: http://192.168.17.101:9200
@Resource
private RestHighLevelClient restHighLevelClient;索引操作
判断索引是否存在
@Test
public void testExist() throws IOException {
//1. 准备request对象
String index = "person";
GetIndexRequest request = new GetIndexRequest(index);
//2. 通过client去操作
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
//3. 输出
System.out.println(exists);
}创建索引
//方式一
@Test
public void createIndex(){
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
try {
createIndexRequest.mapping("{\n" +
" \"properties\": {\n" +
" \"name\": {\n" +
" \"type\": \"keyword\",\n" +
" \"index\": true,\n" +
" \"store\": true\n" +
" },\n" +
" \"age\": {\n" +
" \"type\": \"integer\",\n" +
" \"index\": true,\n" +
" \"store\": true\n" +
" },\n" +
" \"remark\": {\n" +
" \"type\": \"text\",\n" +
" \"index\": true,\n" +
" \"store\": true,\n" +
" \"analyzer\": \"ik_max_word\",\n" +
" \"search_analyzer\": \"ik_max_word\"\n" +
" }\n" +
" }\n" +
" }", XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
System.out.println(createIndexResponse.isAcknowledged());
} catch (IOException e) {
e.printStackTrace();
}
}
//方式二
void createIndex() throws IOException {
// 1、设置setting对象
Settings settings = Settings.builder()
.put("number_of_shards", 5)
.put("number_of_replicas", 1)
.build();
// 2、设置mappings对象
XContentBuilder mappings = JsonXContent.contentBuilder()
.startObject()
.startObject("properties")
// name filed
.startObject("name")
.field("type", "text")
.endObject()
// age filed
.startObject("age")
.field("type", "integer")
.endObject()
// birthday filed
.startObject("birthday")
.field("type", "date")
.field("format", "yyyy-MM-dd")
.endObject()
.endObject()
.endObject();
// 3、创建索引的请求对象 CreateIndexRequest
CreateIndexRequest request = new CreateIndexRequest("person").settings(settings).mapping(mappings);
// 4、使用client客户端发送创建索引的请求
// 通过client 对象 把上面准备的 request 对象 发到es执行
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
// 5、打印返回值对象
System.out.println(response);
}查看索引
//查看索引
@Test
public void getIndex(){
GetIndexRequest request = new GetIndexRequest(INDEX);
try {
GetIndexResponse getIndexResponse = client.indices().get(request, RequestOptions.DEFAULT);
System.out.println(getIndexResponse.getMappings());
System.out.println(getIndexResponse.getSettings());
} catch (IOException e) {
e.printStackTrace();
}
}删除索引
//删除索引
@Test
public void deleteIndex(){
DeleteIndexRequest request = new DeleteIndexRequest(INDEX);
try {
AcknowledgedResponse acknowledgedResponse = client.indices().delete(request, RequestOptions.DEFAULT);
System.out.println(acknowledgedResponse.isAcknowledged());
} catch (IOException e) {
e.printStackTrace();
}
}文档操作
创建文档
//创建文档
@Test
public void createDocument(){
IndexRequest request = new IndexRequest(INDEX);
//设置文档ID
request.id("1");
Student student = new Student();
student.setAge(18);
student.setName("robin");
student.setRemark("good man");
//也可以直接传入map作为参数
request.source(JSONObject.toJSONString(student), XContentType.JSON);
try {
IndexResponse index = client.index(request, RequestOptions.DEFAULT);
System.out.println(index.getResult());
} catch (IOException e) {
e.printStackTrace();
}
}修改文档
//修改文档
@Test
public void updateDocuemnt(){
UpdateRequest request = new UpdateRequest(INDEX,"1");
try {
Student student = new Student();
student.setRemark("very good man");
//也可以直接传入map作为参数
request.doc(JSONObject.toJSONString(student), XContentType.JSON);
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
System.out.println(response.getResult());
} catch (IOException e) {
e.printStackTrace();
}
}根据ID查询
//根据ID查询
@Test
public void getDocument(){
GetRequest request = new GetRequest(INDEX,"1");
try {
GetResponse response = client.get(request, RequestOptions.DEFAULT);
System.out.println(response.getSourceAsString());
} catch (IOException e) {
e.printStackTrace();
}
}批量操作
//批量操作
@Test
public void bulkDocument(){
BulkRequest request = new BulkRequest();
Student student = new Student();
for(int i=0;i<10;i++){
student.setAge(18 + i);
student.setName("robin" + i);
student.setRemark("good man " + i);
request.add(new IndexRequest(INDEX).id(String.valueOf(10 + i)).source(JSONObject.toJSONString(student), XContentType.JSON));
}
try {
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
for(BulkItemResponse itemResponse : response.getItems()){
System.out.println(itemResponse.isFailed());
}
} catch (IOException e) {
e.printStackTrace();
}
}删除文档
//删除文档
@Test
public void deleteDocument(){
DeleteRequest request = new DeleteRequest(INDEX,"11");
try {
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
System.out.println(response.getResult());
} catch (IOException e) {
e.printStackTrace();
}
}DSL查询
返回参数
{
"took": 16, // Elasticsearch执行搜索请求所花费的时间,单位是毫秒
"timed_out": false, // 如果请求在规定的时间内没有完成,这个值会被设置为true
"_shards": { // 关于分片的信息
"total": 1, // 请求涉及到的分片总数
"successful": 1, // 成功执行请求的分片数
"skipped": 0, // 在执行搜索请求时跳过的分片数
"failed": 0 // 执行请求失败的分片数
},
"hits": { // 搜索结果的信息
"total": { // 匹配搜索条件的文档总数
"value": 1, // 匹配的文档数
"relation": "eq" // 表示value的精确性,如果是eq,则value是精确的,如果是gte,则value是一个下界(实际的匹配数可能更多)
},
"max_score": 1.2039728, // 所有匹配的文档中,最高的相关性得分
"hits": [{ // 匹配的文档列表
"_index": "person", // 文档所在的索引名
"_type": "_doc", // 文档的类型
"_id": "13", // 文档的ID
"_score": 1.2039728, // 文档的相关性得分,这个分数是根据搜索条件计算出来的
"_source": { // 文档的原始数据,它的内容取决于你在索引文档时存储的数据
"id": 13,
"name": "张龙",
"age": 20,
"birth": 1721721069523
}
}]
}
}基础查询、分页、排序
private static final String MY_INDEX = "my_index";
@Test
void testMatchAll() throws IOException {
//创建查询请求的对象
SearchRequest searchRequest = new SearchRequest("index3");
//查询条件构造器
SearchSourceBuilder builder = new SearchSourceBuilder();
//1、查询所有条件
builder.query(QueryBuilders.matchAllQuery());
/*2、使用match匹配查询*/
//builder.query(QueryBuilders.matchQuery("title","华为智能手机"));
/*3、使用term精确查询*/
//builder.query(QueryBuilders.termQuery("author","张三"));
/*4、使用terms多条件精确查询*/
builder.query(QueryBuilders.termsQuery("author","张三","李四"));
/*5、分页查询*/
// 当前页其实索引(第一条数据的顺序号), from
builder.from(0);
// 每页显示多少条 size
builder.size(2);
/*6、排序*/
builder.sort("age", SortOrder.ASC);
searchRequest.source(builder);
//执行查询
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
//获取查询结果
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, Object> map = hit.getSourceAsMap();
System.out.println(map);
}
}高亮、模糊查询
/**
* 高亮查询
**/
@Test
public void highlightSearch(){
SearchRequest request = new SearchRequest(MY_INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchQuery("title","华为智能手机"));
/*1、高亮查询*/
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title");
highlightBuilder.preTags("<b style='color:red'>");
highlightBuilder.postTags("</b>");
builder.highlighter(highlightBuilder);
/*2、模糊查询*/
builder.query(QueryBuilders.fuzzyQuery("name","wangwu").fuzziness(Fuzziness.ONE));
request.source(builder);
try {
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
for(SearchHit hit : response.getHits().getHits()){
System.out.println(hit.getSourceAsMap().get("title") + ":" +hit.getHighlightFields().get("title").fragments()[0].string());
}
} catch (IOException e) {
e.printStackTrace();
}
}聚合(分组)查询
/*最大值查询*/
public class QueryDoc {
public static final ElasticsearchTask SEARCH_WITH_MAX = client -> {
// 高亮查询
SearchRequest request = new SearchRequest().indices("user");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.max("maxAge").field("age"));
//设置请求体
request.source(sourceBuilder);
//3.客户端发送请求,获取响应对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.打印响应结果
SearchHits hits = response.getHits();
System.out.println(response);
};
public static void main(String[] args) {
ConnectElasticsearch.connect(SEARCH_WITH_MAX);
}
}
/**
*聚合查询
**/
public void aggsSearch(){
// 创建一个搜索请求对象,并指定索引名
SearchRequest request = new SearchRequest(MY_INDEX);
// 创建一个搜索源构建器
SearchSourceBuilder builder = new SearchSourceBuilder();
// 设置查询条件为匹配所有文档
builder.query(QueryBuilders.matchAllQuery());
// 创建一个聚合构建器,groupby_category是分组名,用于构建按"category"字段进行分组的聚合查询
AggregationBuilder aggregationBuilder = AggregationBuilders
.terms("groupby_category").field("category");
// 在分组聚合查询中添加一个子聚合查询,用于计算每个分组中"price"字段的平均值
aggregationBuilder.subAggregation(AggregationBuilders.avg("avg_price").field("price"));
// 将聚合查询添加到搜索源构建器中
builder.aggregation(aggregationBuilder);
// 将搜索源构建器添加到搜索请求对象中
request.source(builder);
try {
// 执行搜索请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 获取聚合查询结果
Aggregations aggregations = response.getAggregations();
// 从聚合查询结果中获取名为"groupby_category"的分组聚合查询结果
Terms terms = aggregations.get("groupby_category");
// 遍历每个分组
terms.getBuckets().forEach(bucket -> {
// 从每个分组中获取名为"avg_price"的平均值聚合查询结果
Avg avg = bucket.getAggregations().get("avg_price");
// 打印分组的键(即"category"字段的值)、文档数量和"price"字段的平均值
System.out.println(bucket.getKeyAsString() + ":" + bucket.getDocCount() + "," + avg.getValue());
});
} catch (IOException e) {
// 打印异常堆栈信息
e.printStackTrace();
}
}批量添加、删除
/**
* 批量添加
* @throws IOException
*/
@Test
void testBulkCreateDoc() throws IOException {
// 1、准备数据
String index = "person";
Person person1 = new Person(11, "王朝", 18, new Date());
Person person2 = new Person(12, "马汉", 19, new Date());
Person person3 = new Person(13, "张龙", 20, new Date());
Person person4 = new Person(14, "赵虎", 21, new Date());
// 转JSON
ObjectMapper mapper = new ObjectMapper();
String json1 = mapper.writeValueAsString(person1);
String json2 = mapper.writeValueAsString(person2);
String json3 = mapper.writeValueAsString(person3);
String json4 = mapper.writeValueAsString(person4);
// 2、构造一个执行批量操作的对象,将准备好的数据封装进去
BulkRequest request = new BulkRequest();
request.add(new IndexRequest(index).id(person1.getId().toString()).source(json1,XContentType.JSON));
request.add(new IndexRequest(index).id(person2.getId().toString()).source(json2,XContentType.JSON));
request.add(new IndexRequest(index).id(person3.getId().toString()).source(json3,XContentType.JSON));
request.add(new IndexRequest(index).id(person4.getId().toString()).source(json4,XContentType.JSON));
// 3、发送批量添加的请求
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
// 4、查看结果
System.out.println(bulkResponse.status());
}
// 批量删除
@Test
void testBulkDeleteDoc() throws IOException {
// 1、准备数据
String index= "person";
// 2、创建批量请求
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest(index,"11"));
request.add(new DeleteRequest(index,"12"));
request.add(new DeleteRequest(index,"13"));
request.add(new DeleteRequest(index,"14"));
// 3、发送请求
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
// 4、查看结果
System.out.println(response.status());
}组合查询、范围查询
public class QueryDoc {
public static final ElasticsearchTask SEARCH_BY_RANGE = client -> {
// 创建搜索请求对象
SearchRequest request = new SearchRequest();
request.indices("user");
// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
/*1、范围查询*/
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// 大于等于(其他和ElasticSearch的语法一样)
//rangeQuery.gte("30");
// 小于等于
rangeQuery.lte("40");
/*2、组合查询*/
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 必须包含
boolQueryBuilder.must(QueryBuilders.matchQuery("age", "30"));
// 一定不含
boolQueryBuilder.mustNot(QueryBuilders.matchQuery("name", "zhangsan"));
// 可能包含
boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男"));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.query(rangeQuery);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);;
};
}Elasticsearch Java API Client
ElasticSearch8.x版本中使用该依赖替代了原先的高级客户端
基础配置
导入依赖
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.3.3</version>
</dependency>
<!--或者使用高版本的spring-boot-starter-data-elasticsearch,其中也封装了ElasticSearchClient-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>yaml配置文件
spring:
elasticsearch:
uris: http://localhost:9200索引操作
索引相关请求使用:elasticsearchClient.indices().xxx(s -> s.index("索引名")),其中 xxx 代表增删查
创建索引
void createIndex() throws IOException {
CreateIndexResponse response = elasticsearchClient.indices().create(index -> index.index("test"));
//响应结果
{
"index": "test",//创建索引名称
"shards_acknowledged": true,//是否已成功处理了所有的分片副本
"acknowledged": true//是否成功创建
}
}查看索引
void getIndex() throws IOException
{
// 查看指定索引
GetIndexResponse getIndexResponse = elasticsearchClient.indices().get(s -> s.index("products"));
Map<String, IndexState> result = getIndexResponse.result();
result.forEach((k, v) -> log.info("key = {},value = {}",k ,v));
// 查看全部索引
IndicesResponse indicesResponse = elasticsearchClient.cat().indices();
// 返回对象具体查看 co.elastic.clients.elasticsearch.cat.indices.IndicesRecord
indicesResponse.valueBody().forEach(
info -> log.info("health:{}\n status:{} \n uuid:{} \n ",info.health(),info.status(),info.uuid())
);
}删除索引
@Test
void deleteIndex() throws IOException {
DeleteIndexResponse deleteIndexResponse = elasticsearchClient.indices().delete(s -> s.index("products"));
}文档操作
添加文档
@Test
void addOneDocument () throws IOException
{
// 方法1、using the fluent DSL
User user = new User("1","王五",28,"男");
IndexResponse indexResponse = elasticsearchClient.index(s ->
// 索引
s.index("users")
// ID
.id(user.getId())
// 文档
.document(user)
);
log.info("result:{}",indexResponse.result().jsonValue());
// 方法2、You can also assign objects created with the DSL to variables. Java API Client classes have a static of() method for this, that creates an object with the DSL syntax.
IndexRequest<User> request = IndexRequest.of(i -> i
.index("users")
.id(user.getId())
.document(user));
IndexResponse response = elasticsearchClient.index(request);
log.info("Indexed with version " + response.version());
// 方法3、Using classic builders
IndexRequest.Builder<User> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("users");
indexReqBuilder.id(user.getId());
indexReqBuilder.document(user);
IndexResponse responseTwo = elasticsearchClient.index(indexReqBuilder.build());
log.info("Indexed with version " + responseTwo.version());
}通过ID查询文档
void getDocument () throws IOException
{
// co.elastic.clients.elasticsearch.core.get.GetResult<TDocument>
GetResponse<User> getResponse = elasticsearchClient.get(s -> s.index("users").id("e051445c-ae8c-47ef-ab18-97b34025d49a"),User.class);
// 如果查找到数据
if (getResponse.found())
{
//得到对象
User user = getResponse.source();
assert user != null;
log.info("user name={}",user.getName());
}
// Reading raw JSON
// if (getResponse.found())
// {
// ObjectNode json = getResponse.source();
// String name = json.get("name").asText();
// log.info("Product name " + name);
// }
// 判断文档是否存在
BooleanResponse booleanResponse = elasticsearchClient.exists(s -> s.index("users").id("e051445c-ae8c-47ef-ab18-97b34025d49a"));
log.info("判断Document是否存在:{}",booleanResponse.value());
}通过ID修改文档
void updateDocument () throws IOException
{
// 构建需要修改的内容,这里使用了Map
Map<String, Object> map = new HashMap<>();
map.put("name", "liuyife");
// 构建修改文档的请求
UpdateResponse<Test> response = elasticsearchClient.update(e -> e
.index("users")
.id("33")
//可以传入map,也可以传入更新后的对象
.doc(map),
Test.class
);
// 打印请求结果
log.info(String.valueOf(response.result()));
}通过ID删除文档
void deleteDocument () throws IOException
{
DeleteResponse deleteResponse = elasticsearchClient.delete(s -> s.index("users").id("e051445c-ae8c-47ef-ab18-97b34025d49a"));
log.info("删除文档操作结果:{}",deleteResponse.result());
}批量添加
void batchAddDocument () throws IOException
{
// 方法1、use BulkOperation
List<User> users = new ArrayList<>();
users.add(new User("1","赵四",20,"男"));
users.add(new User("2","阿旺",25,"男"));
users.add(new User("3","刘菲",22,"女"));
users.add(new User("4","冬梅",20,"女"));
List<BulkOperation> bulkOperations = new ArrayList<>();
users.forEach(u ->
bulkOperations.add(BulkOperation.of(b ->
b.index(
c ->
c.id(u.getId()).document(u)
)))
);
BulkResponse bulkResponse = elasticsearchClient.bulk(s -> s.index("users").operations(bulkOperations));
bulkResponse.items().forEach(i ->
log.info("i = {}" , i.result()));
log.error("bulkResponse.errors() = {}" , bulkResponse.errors());
// 方法2、use BulkRequest
BulkRequest.Builder br = new BulkRequest.Builder();
for (User user : users) {
br.operations(op -> op
.index(idx -> idx
.index("users")
.id(user.getId())
.document(user)));
}
BulkResponse result = elasticsearchClient.bulk(br.build());
// Log errors, if any
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
}
}批量删除
void batchDeleteDocument () throws IOException
{
// 方法1、use BulkOperation
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
List<BulkOperation> bulkOperations = new ArrayList<>();
list.forEach(a ->
bulkOperations.add(BulkOperation.of(b ->
b.delete(c -> c.id(a))
))
);
BulkResponse bulkResponse = elasticsearchClient.bulk(a -> a.index("users").operations(bulkOperations));
bulkResponse.items().forEach(a ->
log.info("result = {}" , a.result()));
log.error("bulkResponse.errors() = {}" , bulkResponse.errors());
// 方法2、use BulkRequest
BulkRequest.Builder br = new BulkRequest.Builder();
for (String s : list) {
br.operations(op -> op
.delete(c -> c.id(s)));
}
BulkResponse bulkResponseTwo = elasticsearchClient.bulk(br.build());
bulkResponseTwo.items().forEach(a ->
log.info("result = {}" , a.result()));
log.error("bulkResponse.errors() = {}" , bulkResponseTwo.errors());
}查询结果结构
{
// took字段表示Elasticsearch执行请求所花费的毫秒数
"took": 73,
// timed_out字段表示请求是否超时
"timed_out": false,
// _shards字段包含了关于分片执行的信息
"_shards": {
// total字段表示搜索执行的分片总数
"total": 1.0,
// successful字段表示搜索成功执行的分片数
"successful": 1.0,
// failed字段表示搜索执行失败的分片数
"failed": 0.0,
// skipped字段表示由于预过滤而跳过的分片数
"skipped": 0.0
},
// hits字段包含了实际的搜索命中和其他相关信息
"hits": {
// total字段表示匹配查询的命中总数
"total": {
// value字段表示实际的命中总数
"value": 1,
//eq:表示total字段的值是准确的,即搜索结果的总数就是total字段的值。
//gte:表示total字段的值是搜索结果总数的下界,即实际的搜索结果总数可能大于或等于total字段的值
"relation": "eq"
},
// hits字段是一个数组,包含了搜索命中
"hits": [{
// _index字段表示文档所在的索引名称
"_index": "users",
// _id字段表示返回文档的唯一标识符
"_id": "66",
// _score字段表示命中的相关性得分
"_score": 1.540445,
// _source字段表示被索引的源文档。在这个例子中,它似乎是一个序列化的Java对象,类型为com.fjut.elasticsearch.DemoApplicationTests$User
"_source": "com.fjut.elasticsearch.DemoApplicationTests$User@71d55b7e"
}],
// max_score字段表示所有命中的最高得分
"max_score": 1.540445
}
}高级查询
搜索查询
void searchOne() throws IOException {
String searchText = "liuyihu";
SearchResponse<User> response = elasticsearchClient.search(s -> s
// 我们要搜索的索引的名称
.index("users")
// 搜索请求的查询部分(搜索请求也可以有其他组件,如聚合)
.query(q -> q
// 在众多可用的查询变体中选择一个。我们在这里选择匹配查询(全文搜索)
.match(t -> t
// name配置匹配查询:我们在字段中搜索一个词
.field("name")
.query(searchText)
)
),
// 匹配文档的目标类
User.class
);
List<Hit<User>> hits = response.hits().hits();
for (Hit<User> hit: hits) {
User user = hit.source();
assert user != null;
log.info("Found userId " + user.getId() + ", name " + user.getName());
}
}组合查询
void searchTwo() throws IOException {
String searchText = "liuyihu";
int maxAge = 30;
// byName、byMaxAge:分别为各个条件创建查询
Query byName = MatchQuery.of(m -> m
.field("name")
.query(searchText)
)
//MatchQuery是一个查询变体,我们必须将其转换为 Query 联合类型
._toQuery();
Query byMaxAge = RangeQuery.of(m -> m
.field("age")
// Elasticsearch 范围查询接受大范围的值类型。我们在这里创建最高价格的 JSON 表示。
.gte(JsonData.of(maxAge))
)._toQuery();
SearchResponse<User> response = elasticsearchClient.search(s -> s
.index("users")
.query(q -> q
.bool(b -> b
// 搜索查询是结合了文本搜索和最高价格查询的布尔查询
.must(byName)
// .should(byMaxAge)
.must(byMaxAge)
)
),
User.class
);
List<Hit<User>> hits = response.hits().hits();
for (Hit<User> hit: hits) {
User user = hit.source();
assert user != null;
log.info("Found userId " + user.getId() + ", name " + user.getName());
}
}模板化搜索
以ElasticSearch中的DSL语句作为模板,替换其中的变量即可在不同的环境下搜索
void templatedSearch() throws IOException {
// 事先创建搜索模板
elasticsearchClient.putScript(r -> r
// 要创建的模板脚本的标识符
.id("query-script")
.script(s -> s
.lang("mustache")
.source("{\"query\":{\"match\":{\"{{field}}\":\"{{value}}\"}}}")
));
// 开始使用模板搜索
String field = "name";
String value = "liuyifei";
SearchTemplateResponse<User> response = elasticsearchClient.searchTemplate(r -> r
.index("users")
// 要使用的模板脚本的标识符
.id("query-script")
// 模板参数值
.params("field", JsonData.of(field))
.params("value", JsonData.of(value)),
User.class
);
List<Hit<User>> hits = response.hits().hits();
for (Hit<User> hit: hits) {
User user = hit.source();
assert user != null;
log.info("Found userId " + user.getId() + ", name " + user.getName());
}
}分页,排序
void paginationSearch() throws IOException
{
int maxAge = 20;
Query byMaxAge = RangeQuery.of(m -> m
.field("age")
.gte(JsonData.of(maxAge))
)._toQuery();
SearchResponse<User> response = elasticsearchClient.search(s -> s
.index("users")
.query(q -> q
.bool(b -> b
.must(byMaxAge)
)
)
//分页查询,从第0页开始查询4个document
.from(0)
.size(4)
//按age降序排序
.sort(f -> f.field(o -> o.field("age")
.order(SortOrder.Desc))),
User.class
);
List<Hit<User>> hits = response.hits().hits();
for (Hit<User> hit: hits) {
User user = hit.source();
assert user != null;
log.info("Found userId " + user.getId() + ", name " + user.getName());
}
}模糊、过滤、高亮查询
void fuzzyQuerySearch() throws IOException
{
SearchResponse<User> response = elasticsearchClient.search(s -> s
.index("users")
.query(q -> q
// 模糊查询
.fuzzy(f -> f
// 需要判断的字段名称
.field("name")
// 需要模糊查询的关键词
// 目前文档中没有liuyi这个用户名
.value("liuyi")
// fuzziness代表可以与关键词有误差的字数,可选值为0、1、2这三项
.fuzziness("2")
)
)
//高亮查询
.highlight(h -> h
.fields("name", f -> f
.preTags("<font color='red'>")
.postTags("</font>")))
//过滤字段
.source(source -> source
.filter(f -> f
.includes("name","id")
.excludes(""))),
User.class
);
List<Hit<User>> hits = response.hits().hits();
List<User> userList = new ArrayList<>(hits.size());
for (Hit<User> hit: hits) {
User user = hit.source();
userList.add(user);
}
log.info("过滤字段后:{}",JSONUtil.toJsonStr(userList));
}嵌套查询
// 创建nestedQuery 对象.
NestedQuery nestedQuery = NestedQuery.of(f -> f.path("attributeValueIndexList")
.query(q -> q.bool(
m -> m.must(s -> s.match(
a -> a.field("attributeValueIndexList.attributeId").query(split[0])
))
.must(s -> s.match(
a -> a.field("attributeValueIndexList.valueId").query(split[1])
))
))
);
boolQuery.filter(f -> f.nested(nestedQuery));聚合查询
查询结果结构
{
"took": 131,
"timed_out": false,
"_shards": {
"failed": 0.0,
"successful": 1.0,
"total": 1.0,
"skipped": 0.0
},
"hits": {
"total": {
"relation": "eq",
"value": 6
},
"hits": [],
"max_score": null
},
{
//aggregations"字段包含所有的聚合结果
"aggregations": {
//lterms#groupName"是一个特定的聚合的名称,这里是一个术语聚合
"lterms#groupName": {
// buckets"是一个包含所有聚合桶的数组。每个桶代表一个特定的术语(在这种情况下是一个特定的组名)
"buckets": [
{
//doc_count"是每个桶中的文档数量,也就是每个组名出现的次数
"doc_count": 2,
//key"是每个桶的键,也就是每个组名
"key": 20
},
{
"doc_count": 1,
"key": 22
},
{
"doc_count": 1,
"key": 25
},
{
"doc_count": 1,
"key": 30
},
{
"doc_count": 1,
"key": 32
}
],
//doc_count_error_upper_bound"是文档计数的可能错误的上限。在这种情况下,它是0,这意味着计数是准确的
"doc_count_error_upper_bound": 0,
//sum_other_doc_count"是所有不在顶部桶中的文档的数量。在这种情况下,它是0,这意味着所有的文档都在顶部的桶中
"sum_other_doc_count": 0
}
}
}
}最大值
void getMaxAgeUserTest() throws IOException {
SearchResponse<Void> response = elasticsearchClient.search(b -> b
.index("users")
.size(0)
.aggregations("maxAge", a -> a
.max(MaxAggregation.of(s -> s
.field("age"))
)
),
Void.class
);
MaxAggregate maxAge = response.aggregations()
.get("maxAge")
.max();
log.info("maxAge.value:{}",maxAge.value());
}分组查询
//LongTermsAggregate类型,调用lterns
void groupingTest() throws IOException {
SearchResponse<Void> response = elasticsearchClient.search(b -> b
.index("users")
.size(0)
.aggregations("groupName", a -> a
.terms(TermsAggregation.of(s -> s
.field("age")))
),
Void.class
);
LongTermsAggregate longTermsAggregate = response.aggregations()
.get("groupName")
.lterms();
log.info("multiTermsAggregate:{}",longTermsAggregate.buckets());
}
//StringTermsAggregate,调用sterms
void groupBySexTest() throws IOException {
SearchResponse<Void> response = elasticsearchClient.search(b -> b
.index("users")
.size(0)
.aggregations("groupSex", a -> a
.terms(TermsAggregation.of(s -> s
// ⚠️特别注意这一块,我们加上了.keyword,因为字符串sex类型是text类型。当使用到 term 查询的时候,由于是精准匹配,所以查询的关键字在es上的类型,必须是keyword而不能是text
.field("sex.keyword")))
),
Void.class
);
StringTermsAggregate stringTermsAggregate = response.aggregations()
.get("groupSex")
.sterms();
log.info("stringTermsAggregate:{}",stringTermsAggregate.buckets());
}实际使用
排行榜查询
查询三级分类中专辑数量最多的前十个,并且对挑出的三级分类中,每一个返回热度最高的前六条
//查询的时候,需要将三级分类Id的List集合变为一个List<FieldValue>
List<FieldValue> category3IdFieldValueList = category3IdList.stream().map(id -> FieldValue.of(id)).collect(Collectors.toList());
// es查询数据:
SearchRequest.Builder searchRequest = new SearchRequest.Builder();
//精确检索字段category3Id在category3IdFieldValueList集合中的文档
searchRequest.index("albuminfo").query(q->q.terms(
f->f.field("category3Id").terms(s->s.value(category3IdFieldValueList))));
//聚合,挑选出十个专辑数量最多的category3Id,然后每一个category3Id挑选出热度最高的6个文档
searchRequest.aggregations("groupByCategory3IdAgg",
//按照category3Id分组,返回数量最多的前十个组
f->f.terms(
t->t.field("category3Id").size(10))
//d对每一个组再进行聚合,对每一个组中的文档按照hotScore排序,返回前6个文档
.aggregations("topTenHotScoreAgg",s->s.topHits(
o->o.size(6).sort(a->a.field(
h->h.field("hotScore").order(SortOrder.Desc)
))
))
);
SearchResponse<AlbumInfoIndex> searchResponse = elasticsearchClient.search(searchRequest.build(), AlbumInfoIndex.class);
// 后续获取结果集.
Aggregate groupByCategory3IdAgg = searchResponse.aggregations().get("groupByCategory3IdAgg");
// 从聚合中获取数据
groupByCategory3IdAgg.lterms().buckets().array().forEach(item->{
// 获取三级分类Id
long category3Id = item.key();
// 获取top_hits
Aggregate topTenHotScoreAgg = item.aggregations().get("topTenHotScoreAgg");
List<AlbumInfoIndex> albumInfoIndexList = topTenHotScoreAgg.topHits().hits().hits().stream().map(hit -> {
// 获取到专辑的Json 字符串
String albumInfoIndexJson = hit.source().toString();
// 将上述字符串转换为AlbumInfoIndex 对象
AlbumInfoIndex albumInfoIndex = JSON.parseObject(albumInfoIndexJson, AlbumInfoIndex.class);
// 返回数据
return albumInfoIndex;
}).collect(Collectors.toList());多条件检索
private SearchRequest buildQueryDsl(AlbumIndexQuery albumIndexQuery) {
// 检索入口: 关键词
String keyword = albumIndexQuery.getKeyword();
SearchRequest.Builder requestBuilder = new SearchRequest.Builder();
// {query - bool }
BoolQuery.Builder boolQuery = new BoolQuery.Builder();
// {query - bool - should - match}
// MatchQuery.Builder matchQuery = new MatchQuery.Builder();
// matchQuery.field("albumTitle").query(keyword);
// boolQuery.should(f-> f.match(matchQuery.build()));
if (!StringUtils.isEmpty(keyword)) {
// {query - bool - should - match}
boolQuery.should(f -> f.match(s -> s.field("albumTitle").query(keyword)));
boolQuery.should(f -> f.match(s -> s.field("albumIntro").query(keyword)));
// 高亮
requestBuilder.highlight(h->h.fields("albumTitle",
// f->f.preTags("<font color:red>").postTags("</font>")
f->f.preTags("<span style=color:red>").postTags("</span>")
));
}
// 入口:分类Id 复制小括号,写死右箭头,落地大括号
// 一级分类Id
Long category1Id = albumIndexQuery.getCategory1Id();
if (!StringUtils.isEmpty(category1Id)) {
boolQuery.filter(f -> f.term(s -> s.field("category1Id").value(category1Id)));
}
// 二级分类Id
Long category2Id = albumIndexQuery.getCategory2Id();
if (!StringUtils.isEmpty(category2Id)) {
boolQuery.filter(f -> f.term(s -> s.field("category2Id").value(category2Id)));
}
// 三级分类Id
Long category3Id = albumIndexQuery.getCategory3Id();
if (!StringUtils.isEmpty(category3Id)) {
boolQuery.filter(f -> f.term(s -> s.field("category3Id").value(category3Id)));
}
// 根据属性Id 检索 前端传递数据的时候 属性Id:属性值Id 属性Id:属性值Id
List<String> attributeList = albumIndexQuery.getAttributeList();
// 判断集合不为空
if (!CollectionUtils.isEmpty(attributeList)) {
// 循环遍历.
for (String attribute : attributeList) {
// 需要使用 : 分割
String[] split = attribute.split(":");
// 判断
if (null != split && split.length == 2) {
// 创建nestedQuery 对象.
NestedQuery nestedQuery = NestedQuery.of(f -> f.path("attributeValueIndexList")
.query(q -> q.bool(
m -> m.must(s -> s.match(
a -> a.field("attributeValueIndexList.attributeId").query(split[0])
))
.must(s -> s.match(
a -> a.field("attributeValueIndexList.valueId").query(split[1])
))
))
);
boolQuery.filter(f -> f.nested(nestedQuery));
}
}
}
// 排序 分页 高亮 排序(综合排序[1:desc] 播放量[2:desc] 发布时间[3:desc];asc:升序 desc:降序)
String order = albumIndexQuery.getOrder();
// 定义一个排序字段
String orderField = "";
// 定义一个排序规则
String sort = "";
// 判断
if (!StringUtils.isEmpty(order)) {
// 分割数据
String[] split = order.split(":");
// 判断这个数组
if (null != split && split.length == 2) {
switch (split[0]) {
case "1":
orderField = "hotScore";
break;
case "2":
orderField = "playStatNum";
break;
case "3":
orderField = "createTime";
break;
}
sort = split[1];
}
// 判断 desc SortOrder.Desc asc SortOrder.Asc
String finalSort = sort;
String finalOrderField = orderField;
requestBuilder.sort(f->f.field(o->o.field(finalOrderField).order("asc".equals(finalSort)?SortOrder.Asc:SortOrder.Desc)));
}else {
// 默认排序规则
requestBuilder.sort(f->f.field(o->o.field("hotScore").order(SortOrder.Desc)));
}
// 字段选择
requestBuilder.source(s->s.filter(f->f.excludes("attributeValueIndexList")));
// 分页: (pageNo-1)*pageSize()
Integer from = (albumIndexQuery.getPageNo() - 1)*albumIndexQuery.getPageSize();
requestBuilder.from(from);
requestBuilder.size(albumIndexQuery.getPageSize());
// {query }
// GET /albuminfo/_search
requestBuilder.index("albuminfo").query(f -> f.bool(boolQuery.build()));
// 创建对象
SearchRequest searchRequest = requestBuilder.build();
System.out.println("dsl:\t"+searchRequest.toString());
// 返回
return searchRequest;
}自动补全
//在上架的时候添加题词数据
SuggestIndex suggestIndex = new SuggestIndex();
suggestIndex.setId(UUID.randomUUID().toString().replaceAll("-",""));
suggestIndex.setTitle(albumInfoIndex.getAlbumTitle());
suggestIndex.setKeyword(new Completion(new String[]{albumInfoIndex.getAlbumTitle()}));
suggestIndex.setKeywordPinyin(new Completion(new String[]
{PinYinUtils.toHanyuPinyin(albumInfoIndex.getAlbumTitle())}));
suggestIndex.setKeywordSequence(new Completion(new String[]
{PinYinUtils.getFirstLetter(albumInfoIndex.getAlbumTitle())}));
this.suggestIndexRepository.save(suggestIndex);
//根据搜索词进行补全
public List<String> getCompleteSuggest(String keyword) {
// Java 动态生成dsl 语句.
SearchRequest.Builder searchRequest = new SearchRequest.Builder();
// 设置搜索的索引库为"suggestinfo"
searchRequest.index("suggestinfo").suggest(
// 添加一个名为"suggestionKeyword"的建议器
s->s.suggesters("suggestionKeyword",
// 设置前缀为用户输入的关键字
f->f.prefix(keyword).completion(
// 设置补全字段为"keyword"
c->c.field("keyword")
// 设置跳过重复的建议
.skipDuplicates(true)
// 设置返回的建议数量为10
.size(10)
// 设置启用模糊匹配,自动调整模糊度
.fuzzy(z->z.fuzziness("auto"))
)
)
// 添加一个名为"suggestionkeywordPinyin"的建议器,用于处理拼音输入
.suggesters("suggestionkeywordPinyin",
f->f.prefix(keyword).completion(
c->c.field("keywordPinyin").skipDuplicates(true).size(10)
.fuzzy(z->z.fuzziness("auto"))
)
)
// 添加一个名为"suggestionkeywordSequence"的建议器,用于处理首字母输入
.suggesters("suggestionkeywordSequence",
f->f.prefix(keyword).completion(
c->c.field("keywordSequence").skipDuplicates(true).size(10)
.fuzzy(z->z.fuzziness("auto"))
)
)
);
// 获取查询结果
SearchResponse<SuggestIndex> searchResponse = null;
try {
searchResponse = elasticsearchClient.search(searchRequest.build(), SuggestIndex.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
// 获取到结果集,数据转换
HashSet<String> titleSet = new HashSet<>();
titleSet.addAll(this.parseResultData(searchResponse,"suggestionKeyword"));
titleSet.addAll(this.parseResultData(searchResponse,"suggestionkeywordPinyin"));
titleSet.addAll(this.parseResultData(searchResponse,"suggestionkeywordSequence"));
// 判断:
if (titleSet.size()<10){
// 使用查询数据的方式来填充集合数据,让这个提示信息够10条数据.
SearchResponse<SuggestIndex> response = null;
try {
response = elasticsearchClient.search(s -> s.index("suggestinfo")
.query(f -> f.match(m -> m.field("title").query(keyword)))
, SuggestIndex.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
// 从查询结果集中获取数据
for (Hit<SuggestIndex> hit : response.hits().hits()) {
// 获取数据结果
SuggestIndex suggestIndex = hit.source();
// 获取titile
titleSet.add(suggestIndex.getTitle());
// 判断当前这个结合的长度.
if (titleSet.size()==10){
break;
}
}
}
// 返回数据
return new ArrayList<>(titleSet);
}
private List<String> parseResultData(SearchResponse<SuggestIndex> searchResponse, String suggestionName) {
// 声明一个来存储title
List<String> list = new ArrayList<>();
// 从options中获取数据
List<Suggestion<SuggestIndex>> suggestionList = searchResponse.suggest().get(suggestionName);
suggestionList.forEach(suggestIndexSuggestion -> {
// 循环遍历获取数据
for (CompletionSuggestOption<SuggestIndex> option : suggestIndexSuggestion.completion().options()) {
// 获取到数据
SuggestIndex suggestIndex = option.source();
// 获取titile
String title = suggestIndex.getTitle();
// 添加titile 到集合
list.add(title);
}
});
// 获取到title数据
return list;
}Spring Date ElasticSearch
文档映射文件
@Data//lombok的映射文件,自动生成get,set方法
@Document(indexName = "goods")
public class Goods implements Serializable {
@Field(type = FieldType.Keyword)
private String id;
@Field(type = FieldType.Text)
private String goodsName;
@Field(type = FieldType.Integer)
private Integer store;
@Field(type = FieldType.Double)
private double price;
}dao层数据
继承ElasticsearchRepository<Goods, String>
//第一个泛型是索引类型,第二个泛型是ID字段类型
@Repository
public interface GoodsDao extends ElasticsearchRepository<Goods, String> {
}接口测试
@Autowired
private GoodsDao goodsDao;
/**
* 添加文档
* */
@Test
public void saveTest(){
Goods goods = new Goods();
goods.setId("1");
goods.setGoodsName("华为手机");
goods.setStore(100);
goods.setPrice(5000);
goodsDao.save(goods);
System.out.println("添加成功...");
}
/**
* 根据ID查询文档
* */
@Test
public void findById(){
Goods goods = goodsDao.findById("1").get();
System.out.println(goods);
}elasticsearch集群
核心概念
集群Cluster
一个Elasticsearch集群有一个唯一的名字标识,默认是elasticsearch。因为节点只能通过指定某个集群的名字,来加入这个集群。所以一个集群内所有节点的集群名字需要相同
节点Node
集群中包含很多节点,一个节点就是一个服务器。每个节点默认有各自命名(随机的漫威漫画角色)用于管理,可以通过配置集群名称的方式来加入一个指定的集群。未指定加入集群的默认情况下,每个节点都会被安排加入到一个叫做“elasticsearch”的集群中。
分片(Shards)
将索引划分成多份进行储存,每一份就称之为分片。每个分片本身也是一个功能完善并且独立的“索引”,可以被放置到集群中的任何节点上。
副本(Replicas)
创建分片(主分片)的一份或多份拷贝,这些拷贝叫做复制分片(副本)。可以动态地改变复制的数量,但是创建后索引不能改变主分片的数量
分配(Allocation)
将分片分配给某个节点的过程,包括分配主分片或者副本。如果是副本,还包含从主分片复制数据的过程。这个过程是由master节点完成的。
节点类型
es集群中的节点类型分为:Master、DataNode。
master:
Elasticsearch启动时,会选举出来一个Master节点。
1、当某个节点启动后,使用Zen Discovery机制找到集群中的其他节点,并建立连接。
discovery.seed_hosts: ["host1", "host2", "host3"]
2、并从候选主节点中选举出一个主节点。
cluster.initial_master_nodes: ["node-1", "node-2","node-3"]
Master节点主要负责:
管理索引(创建索引、删除索引)、分配分片
维护元数据
管理集群节点状态
不负责数据写入和查询,比较轻量级。一个ElasticSearch集群中,只有一个Master节点。在生产环境中,内存可以相对
小一点,但要确保机器稳定。
DataNode:
在Elasticsearch集群中,会有N个DataNode节点。DataNode节点主要负责:
- 数据写入、数据检索,大部分Elasticsearch的压力都在DataNode节点上
在生产环境中,内存最好配置大一些。
系统架构

一个运行中的 Elasticsearch 实例称为一个节点,而集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。
当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。 而主节点并不需要涉及到文档级别的变更和搜索等操作。
作为用户,我们可以将请求发送到集群中的任何节点 ,包括主节点。 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。
elasticsearch分片
我们可以在建立索引的时候创建分片信息:
#number_of_shards:主分片数量(7.x版本之后如果不指定数量默认为1),number_of_replicas:每个主分片对应的副本数量
PUT /users
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}
head中数据查看说明:
★ 代表当前节点为master节点
● 表示DataNode节点
粗线框格子为主分片,细线框为副本分片,主分片与副本分片不能同时在一台机器上。
刚才的例子中,我们创建了3个主分片,然后又为每个主分片配置了两个副本分片,加起来一共9个分片。分片序号分别为0、1、2代表不同的数据段存储。其中0号分片的主分片在node-3机器上,node-1和node-2是它的备份分片。
<font color='red'>注意:主分片数量一旦指定后就不允许更改,因为文档储存的分片位置是由文档Hash取模主分片数量获得,如果改变,数据映射就全乱了。</font>
- ElasticSearch使用哈希算法将文档分配到特定的主分片。文档的ID会通过哈希函数计算出一个哈希值,然后根据主分片的数量取模,确定该文档属于哪个主分片。
- ElasticSearch维护一个分片路由表,记录每个分片所在的节点信息。查询时,ElasticSearch根据路由表将请求路由到对应的节点。
虽然主分片数量不可用更改,但是副本数量可以修改:
#修改副本数
PUT users/_settings
{
"number_of_replicas": 0
}分片控制
写流程
新建和删除请求都是写操作, 必须在主分片上面完成之后才能被复制到相关的副本分片

第一步: 客户端选择DataNode节点发送请求,如上图架构,假设发送到node-2节点上。此时被选择的node-2节点也称为coordinating node(协调节点)
第二步: 协调节点根据路由规则计算分片索引位置。并将请求发送到分片索引对应的主分片机器上(这里假设分片计算后的值为0,那么请求会命中到node-3节点上)。
- 计算分片索引位置: shard = hash(routing) % number_of_primary_shards,routing可以自己设定,一般默认为文档的ID。
第三步: 当主分片文档写入完成后,同时将数据推送到与之对应的副本分片进行写入操作
第四步: 当分片完成了写入后再由协调节点将操作结果返回给客户端
读流程

- 第一步:客户端选择DataNode节点发送请求,如上图架构,假设发送到node-2节点上。此时被选择的node-2节点也称为coordinating node(协调节点)
- 第二步: 协调节点将从客户端获取到的请求数据转发到其它节点
- 第三步: 其它节点将查询结果文档ID、节点、分片信息返回给协调节点
- 第四步: 协调节点通过文档ID、节点信息等发送get请求给其它节点进行数据获取,最后进行汇总排序将数据返回给客户端
分片原理
倒排索引
Elasticsearch 使用一种称为倒排索引的结构,它适用于快速的全文搜索。
正向索引,就是搜索引擎会将待搜索的文件都对应一个文件ID,搜索时将这个ID和搜索关键字进行对应,形成K-V对,然后对关键字进行统计计数

但是互联网上收录在搜索引擎中的文档的数目是个天文数字,这样的索引结构根本无法满足实时返回排名结果的要求。所以,搜索引擎会将正向索引重新构建为倒排索引,即把文件ID对应到关键词的映射转换为关键词到文件ID的映射,每个关键词都对应着一系列的文件,这些文件中都出现这个关键词。

动态更新索引
早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。如果需要让一个新的文档可被搜索,你需要重建整个索引。这要么对一个索引所能包含的数据量造成了很大的限制,要么对索引可被更新的频率造成了很大的限制。
通过增加新的补充索引来反映新近的修改,而不是直接重写整个倒排索引。每一个倒排索引都会被轮流查询到,从最早的开始查询完后再对结果进行合并。
- 按段搜索:每一段本身都是一个倒排索引,包含了一部分索引的数据
- 提交点:一个列出了所有已知段的文件
保存
- 新文档被收集到内存索引缓存

- 经过一段时间后, 缓存被提交
(1) 缓存中的倒排索引作为一个新的段,追加写入磁盘。
(2) 将新段的名称写入提交点文件
(3) 磁盘同步完成后,新的段被开启,让它包含的文档可见以被搜索
(4) 内存缓存被清空,等待接收新的文档

查询
当一个查询被触发,所有已知的段按顺序被查询。词项统计会对所有段的结果进行聚合,以保证每个词和每个文档的关联都被准确计算。 这种方式可以用相对较低的成本将新文档添加到索引。
删除
段是不可改变的,所以既不能把文档从旧的段中移除,也不能修改旧的段来进行反映文档的更新。 取而代之的是,每个提交点会包含一个 .del 文件,文件中会列出这些被删除文档的段信息。
当一个文档被 “删除” 时,它实际上只是在 .del 文件中被标记删除。一个被标记删除的文档仍然可以被查询匹配到,但它会在最终结果被返回前从结果集中移除。
更新
文档更新也是类似的操作方式:当一个文档被更新时,旧版本文档被标记删除,文档的新版本被索引到一个新的段中。 可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就已经被移除。
近实时搜索

- 分段数据先写入到内存缓存中,同时文档操作也会记录translog日志
- 内存的数据对查询不可见,默认间隔1s将内存中数据写入到文件系统缓存中,这里面的数据对查询可见。
- 文件系统缓存数据间隔30分钟再将数据刷入磁盘中。
- 如果文件系统缓存数据在没有刷新到硬盘时宕机了,可以从translog中恢复数据到磁盘,数据恢复完成后translog数据也会清理。
段合并
由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增。而段数目太多会带来较大的麻烦。 每一个段都会消耗文件句柄、内存和cpu运行周期。更重要的是,每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢。
Elasticsearch通过在后台进行段合并来解决这个问题。小的段被合并到大的段,然后这些大的段再被合并到更大的段。
段合并的时候会将那些旧的已删除文档从文件系统中清除。被删除的文档(或被更新文档的旧版本)不会被拷贝到新的大段中。
当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。
合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中。这并不会中断索引和搜索。

- 一旦合并结束,老的段被删除
新的段被刷新(flush)到了磁盘。写入一个包含新段且排除旧的和较小的段的新提交点。
新的段被打开用来搜索。
老的段被删除。

合并大的段需要消耗大量的I/O和CPU资源,如果任其发展会影响搜索性能。Elasticsearch在默认情况下会对合并流程进行资源限制,所以搜索仍然有足够的资源很好地执行。