ElasticSearch之分布式特性

篇幅有限

完整内容及源码关注公众号:ReverseCode,发送

Elasticsearch是一套分布式的系统,分布式是为了应对大数据量,隐藏了复杂的分布式机制,实现了分片机制(数据插入分配不同的shard中),集群发现机制(启动多个es进程自动发现集群并加入且接受部分数据当做副本),shard负载均衡机制(增减节点时,多节点条件下es自动进行均匀分配,保持每个节点均衡读写负载请求),shard副本机制(请求路由,集群扩容,shard重分配)。master节点实现创建或删除索引,增加或删除节点。节点对等,每个节点都能接收所有的请求,自动请求路由,响应收集。

image-20210525152751946

ES就是一个分布式的文档数据存储系统。

  • 文档数据:es可以存储和操作json文档类型的数据,而且这也是es的核心数据结构。
  • 存储系统:es可以对json文档类型的数据进行存储,查询,创建,更新,删除,等等操作。其实ES满足了这些功能,就可以说已经是一个NoSQL的存储系统了。

场景:

  • 数据量较大,es的分布式本质,可以帮助你快速进行扩容,承载大量数据
  • 数据结构灵活多变,随时可能会变化,而且数据结构之间的关系,非常复杂,如果我们用传统数据库,那是不是很坑,因为要面临大量的表
  • 对数据的相关操作,较为简单,比如就是一些简单的增删改查,用我们之前讲解的那些document操作就可以搞定
  • NoSQL数据库,适用的也是类似于上面的这种场景

shard&replica机制

  1. index包含多个shard
  2. 每个shard都是一个最小工作单元,承载部分数据,lucene实例,完整的建立索引和处理请求的能力
  3. 增减节点时,shard会自动在nodes中负载均衡,6个节点,7个shard,其中一个节点2个shard,进入新节点时,自动将shard负载均匀分配到所有节点。
  4. primary shard和replica shard,每个document肯定只存在于某一个primary shard以及其对应的replica shard中,不可能存在于多个primary shard
  5. replica shard是primary shard的副本,负责容错,以及承担读请求负载
  6. primary shard的数量在创建索引的时候就固定了,replica shard的数量可以随时修改
  7. primary shard的默认数量是5,replica默认是1,默认有10个shard,5个primary shard,5个replica shard
  8. primary shard不能和自己的replica shard放在同一个节点上(否则节点宕机,primary shard和副本都丢失,起不到容错的作用),但是可以和其他primary shard的replica shard放在同一个节点上

shard&replica机制再次梳理

单node创建index

  1. 单node环境下,创建一个index,有3个primary shard,3个replica shard
  2. 集群status是yellow
  3. 这个时候,只会将3个primary shard分配到仅有的一个node上去,另外3个replica shard是无法分配的
  4. 集群可以正常工作,但是一旦出现节点宕机,数据全部丢失,而且集群不可用,无法承接任何请求
1
2
3
4
5
6
7
PUT /test_index
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}

双node分配shard

  1. replica shard分配:master(3个primary shard),slave(3个replica shard),新增1个node
  2. primary ---> replica同步
  3. 读请求:primary/replica

横向扩容

  1. primary&replica自动负载均衡,6个shard,3 primary,3 replica
  2. 每个node有更少的shard,IO/CPU/Memory资源给每个shard分配更多,每个shard性能更好
  3. 扩容的极限,6个shard(3 primary,3 replica),最多扩容到6台机器,每个shard可以占用单台服务器的所有资源,性能最好
  4. 超出扩容极限,动态修改replica数量,9个shard(3primary,6 replica),扩容到9台机器,比3台机器时,拥有3倍的读吞吐量
  5. 3台机器下,9个shard(3 primary,6 replica),资源更少,但是容错性更好,最多容纳2台机器宕机,6个shard只能容纳0台机器宕机

image-20210514171113961

扩容过程分析

容错机制

  1. 9 shard,3 node
  2. master node宕机,自动master选举,red
  3. replica容错:新master将replica提升为primary shard,yellow
  4. 重启宕机node,master copy replica到该node,使用原有的shard并同步宕机后的修改,green

image-20210514192317121

元数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT /test_indx/test_type/1
{
"test_content": "test test"
}
GET /test_indx/test_type/1
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"test_content": "test test"
}
}
  1. _index元数据

(1)代表一个document存放在哪个index中
(2)类似的数据放在一个索引,非类似的数据放不同索引:product index(包含了所有的商品),sales index(包含了所有的商品销售数据),inventory index(包含了所有库存相关的数据)。如果你把比如product,sales,human resource(employee),全都放在一个大的index里面,比如说company index,不合适的。
(3)index中包含了很多类似的document:类似是什么意思,其实指的就是说,这些document的fields很大一部分是相同的,你说你放了3个document,每个document的fields都完全不一样,这就不是类似了,就不太适合放到一个index里面去了。
(4)索引名称必须是小写的,不能用下划线开头,不能包含逗号:product,website,blog

  1. _type元数据

(1)代表document属于index中的哪个类别(type)
(2)一个索引通常会划分为多个type,逻辑上对index中有些许不同的几类数据进行分类:因为一批相同的数据,可能有很多相同的fields,但是还是可能会有一些轻微的不同,可能会有少数fields是不一样的,举个例子,就比如说,商品,可能划分为电子商品,生鲜商品,日化商品,等等。
(3)type名称可以是大写或者小写,但是同时不能用下划线开头,不能包含逗号

  1. _id元数据

(1)代表document的唯一标识,与index和type一起,可以唯一标识和定位一个document
(2)我们可以手动指定document的id(put /index/type/id),也可以不指定,由es自动为我们创建一个id

image-20210514193728892

document_id

  1. 手动指定document id

一般来说,是从某些其他的系统中,导入一些数据到es时,会采取这种方式,就是使用系统中已有数据的唯一标识,作为es中document的id。举个例子,比如说,我们现在在开发一个电商网站,做搜索功能,或者是OA系统,做员工检索功能。这个时候,数据首先会在网站系统或者IT系统内部的数据库中,会先有一份,此时就肯定会有一个数据库的primary key(自增长,UUID,或者是业务编号)。如果将数据导入到es中,此时就比较适合采用数据在数据库中已有的primary key。

如果说,我们是在做一个系统,这个系统主要的数据存储就是es一种,也就是说,数据产生出来以后,可能就没有id,直接就放es一个存储,那么这个时候,可能就不太适合说手动指定document id的形式了,因为你也不知道id应该是什么,此时可以采取下面要讲解的让es自动生成id的方式。

1
2
3
4
PUT /test_index/test_type/2
{
"test_content": "my test"
}
  1. 自动生成document id

(1)post /index/type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
POST /test_index/test_type
{
"test_content": "my test"
}

{
"_index": "test_index",
"_type": "test_type",
"_id": "AVp4RN0bhjxldOOnBxaE",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}

(2)自动生成的id,长度为20个字符,URL安全,base64编码,GUID,分布式系统并行生成时不可能会发生冲突

image-20210514194511913

_source

_source元数据:就是说,我们在创建一个document的时候,使用的那个放在request body中的json串,默认情况下,在get的时候,会原封不动的给我们返回回来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
put /test_index/test_type/1
{
"test_field1": "test field1",
"test_field2": "test field2"
}

get /test_index/test_type/1

{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"test_field1": "test field1",
"test_field2": "test field2"
}
}

定制返回的结果,指定_source中,返回哪些field,GET /test_index/test_type/1?_source=test_field1,test_field2

批量查询

就是一条一条的查询,比如说要查询100条数据,那么就要发送100次网络请求,这个开销还是很大的
如果进行批量查询的话,查询100条数据,就只要发送1次网络请求,网络请求的性能开销缩减100倍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
GET /test_index/test_type/1
GET /test_index/test_type/2
GET /_mget mget批量查询
{
"docs" : [
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 1
},
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 2
}
]
}
GET /test_index/_mget 查询的document是一个index下的不同type
{
"docs" : [
{
"_type" : "test_type",
"_id" : 1
},
{
"_type" : "test_type",
"_id" : 2
}
]
}
GET /test_index/test_type/_mget 查询的数据都在同一个index下的同一个type
{
"ids": [1, 2]
}

可以说mget是很重要的,一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api,尽可能减少网络开销次数,可能可以将性能提升数倍,甚至数十倍。

bulk批量增删改

bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行

  1. delete:删除一个文档,只要1个json串就可以了
  2. create:PUT /index/type/id/_create,强制创建
  3. index:普通的put操作,可以是创建文档,也可以是全量替换文档
  4. update:执行的partial update操作
1
2
3
4
5
6
7
8
POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }}
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
POST /test_index/_bulk
{ "delete": { "_type": "test_type", "_id": "3" }}
{ "create": { "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_type": "test_type" }}
{ "test_field": "auto-generate id test" }
{ "index": { "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

POST /test_index/test_type/_bulk
{ "delete": { "_id": "3" }}
{ "create": { "_id": "12" }}
{ "test_field": "test12" }
{ "index": { }}
{ "test_field": "auto-generate id test" }
{ "index": { "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

bulk request会加载到内存里,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从1000~5000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在5~15MB之间。

bulk api奇特的json格式

1
2
3
4
{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
  1. 不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json
  2. 对每两个一组的json,读取meta,进行document路由
  3. 直接将对应的json发送到node上去
  4. 最大的优势在于,不需要将json数组解析为一个JSONArray对象,形成一份大数据的拷贝,浪费内存空间,尽可能地保证性能

如果采用比较良好的json数组格式,允许任意的换行,整个可读性非常棒,读起来很爽,es拿到那种标准格式的json串以后,要按照下述流程去进行处理

  • 将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象
  • 解析json数组里的每个json,对每个请求中的document进行路由
  • 为路由到同一个shard上的多个请求,创建一个请求数组
  • 将这个请求数组序列化
  • 将序列化后的请求数组发送到对应的节点上去

耗费更多内存,更多的jvm gc开销,我们之前提到过bulk size最佳大小的那个问题,一般建议说在几千条那样,然后大小在10MB左右,所以说,可怕的事情来了。假设说现在100个bulk请求发送到了一个节点上去,然后每个请求是10MB,100个请求,就是1000MB = 1GB,然后每个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。因为弄成jsonarray之后,还可能会多搞一些其他的数据结构,2GB+的内存占用。占用更多的内存可能就会积压其他请求的内存使用量,比如说最重要的搜索请求,分析请求,等等,此时就可能会导致其他请求的性能急速下降。占用内存更多,就会导致java虚拟机的垃圾回收次数更多,跟频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致es的java虚拟机停止工作线程的时间更多

更新

全量替换

  1. 语法与创建文档是一样的,如果document id不存在,那么就是创建;如果document id已经存在,那么就是全量替换操作,替换document的json串内容,更新_version的值

image-20210514195424111

  1. document是不可变的,如果要修改document的内容,第一种方式就是全量替换,直接对document重新建立索引,替换里面所有的内容
  2. es会将老的document标记为deleted,然后新增我们给定的一个document,当我们创建越来越多的document的时候,es会在适当的时机在后台自动删除标记为deleted的document

局部更新

PUT /index/type/id,创建文档&替换文档,就是一样的语法

  1. 应用程序先发起一个get请求,获取到document,展示到前台界面,供用户查看和修改
  2. 用户在前台界面修改数据,发送到后台
  3. 后台代码,会将用户修改的数据在内存中进行执行,然后封装好修改后的全量数据
  4. 然后发送PUT请求,到es中,进行全量替换
  5. es将老的document标记为deleted,然后重新创建一个新的document
1
2
3
4
5
6
post /index/type/id/_update 
{
"doc": {
"要修改的少数几个field即可,不需要全量的数据"
}
}

原理

image-20210517194617604

实战partial update

1
2
3
4
5
6
7
8
9
10
11
12
PUT /test_index/test_type/10
{
"test_field1": "test1",
"test_field2": "test2"
}

POST /test_index/test_type/10/_update
{
"doc": {
"test_field2": "updated test2"
}
}

groovy实现partial update

es,其实是有个内置的脚本支持的,可以基于groovy脚本实现各种各样的复杂操作

准备数据

1
2
3
4
5
PUT /test_index/test_type/11
{
"num": 0,
"tags": []
}

更新数据

1
2
3
4
POST /test_index/test_type/11/_update
{
"script" : "ctx._source.num+=1"
}

外部脚本

vim elasticsearch-5.2.0\config\scripts\test-add-tags.groovy
ctx._source.tags+=new_tag 为tags字段添加tag1,test-add-tags.groovy

1
2
3
4
5
6
7
8
9
10
POST /test_index/test_type/11/_update
{
"script": {
"lang": "groovy",
"file": "test-add-tags",
"params": {
"new_tag": "tag1"
}
}
}

ctx.op = ctx._source.num == count ? 'delete' : 'none' 删除文档test-delete-document.groovy

1
2
3
4
5
6
7
8
9
10
POST /test_index/test_type/11/_update
{
"script": {
"lang": "groovy",
"file": "test-delete-document",
"params": {
"count": 1
}
}
}

upsert操作

1
2
3
4
5
6
POST /test_index/test_type/11/_update
{
"doc": {
"num": 1
}
}

原始_update会直接报错404.

如果指定的document不存在,就执行upsert中的初始化操作;如果指定的document存在,就执行doc或者script指定的partial update操作

1
2
3
4
5
6
7
8
POST /test_index/test_type/11/_update
{
"script" : "ctx._source.num+=1",
"upsert": {
"num": 0,
"tags": []
}
}

强制创建

PUT /index/type/id?op_type=create,PUT /index/type/id/_create

删除

DELETE /index/type/id

不会理解物理删除,只会将其标记为deleted,当数据越来越多的时候,在后台自动删除

并发冲突

image-20210514195747886

悲观锁与乐观锁方案

image-20210514201338263

第一次创建一个document的时候,它的_version内部版本号就是1;以后,每次对这个document执行修改或者删除操作,都会对这个_version版本号自动加1;哪怕是删除,也会对这条数据的版本号加1。

在删除一个document之后,可以从一个侧面证明,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1

image-20210514202012003

_version乐观锁实战

  1. 构造一条数据
    1
    2
    3
    4
    PUT /test_index/test_type/7
    {
    "test_field": "test test"
    }
  2. 模拟两个客户端,都获取到了同一条数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    GET test_index/test_type/7
    {
    "_index": "test_index",
    "_type": "test_type",
    "_id": "7",
    "_version": 1,
    "found": true,
    "_source": {
    "test_field": "test test"
    }
    }
  3. 其中一个客户端,先更新了一下这个数据,同时带上数据的版本号,确保说,es中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    PUT /test_index/test_type/7?version=1 
    {
    "test_field": "test client 1"
    }
    {
    "_index": "test_index",
    "_type": "test_type",
    "_id": "7",
    "_version": 2,
    "result": "updated",
    "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
    },
    "created": false
    }
  4. 另外一个客户端,尝试基于version=1的数据去进行修改,同样带上version版本号,进行乐观锁的并发控制
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    PUT /test_index/test_type/7?version=1 
    {
    "test_field": "test client 2"
    }
    {
    "error": {
    "root_cause": [
    {
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "3",
    "index": "test_index"
    }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "3",
    "index": "test_index"
    },
    "status": 409
    }
  5. 在乐观锁成功阻止并发问题之后,尝试正确的完成更新
1
2
3
4
5
6
7
8
9
10
11
GET /test_index/test_type/7
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 2,
"found": true,
"_source": {
"test_field": "test client 1"
}
}
  1. 基于最新的数据和版本号,去进行修改,修改后,带上最新的版本号,可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下
    1
    2
    3
    4
    PUT /test_index/test_type/7?version=2 
    {
    "test_field": "test client 2"
    }

external version乐观锁实战

es提供了一个feature,就是说,你可以不用它提供的内部_version版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制。比如,加入你的数据在mysql里也有一份,然后你的应用系统本身就维护了一个版本号,无论是什么自己生成的,程序控制的。这个时候,你进行乐观锁并发控制的时候,可能并不是想要用es内部的_version来进行控制,而是用你自己维护的那个version来进行控制。

_version=1,?version=1,才能更新成功
_version=1,?version>1&version_type=external,才能成功,比如说?version=2&version_type=external

version_type=external,唯一的区别在于,_version,只有当你提供的version与es中的_version一模一样的时候,才可以进行修改,只要不一样,就报错;当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改。

  1. 先构造一条数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PUT /test_index/test_type/8
{
"test_field": "test"
}

{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
  1. 模拟两个客户端同时查询到这条数据
1
2
3
4
5
6
7
8
9
10
11
12
GET /test_index/test_type/8

{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 1,
"found": true,
"_source": {
"test_field": "test"
}
}
  1. 第一个客户端先进行修改,此时客户端程序是在自己的数据库中获取到了这条数据的最新版本号,比如说是2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PUT /test_index/test_type/8?version=2&version_type=external
{
"test_field": "test client 1"
}

{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
  1. 模拟第二个客户端,同时拿到了自己数据库中维护的那个版本号,也是2,同时基于version=2发起了修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
PUT /test_index/test_type/8?version=2&version_type=external
{
"test_field": "test client 2"
}

{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "1",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "1",
"index": "test_index"
},
"status": 409
}
  1. 在并发控制成功后,重新基于最新的版本号发起更新
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    GET /test_index/test_type/8

    {
    "_index": "test_index",
    "_type": "test_type",
    "_id": "8",
    "_version": 2,
    "found": true,
    "_source": {
    "test_field": "test client 1"
    }
    }

    PUT /test_index/test_type/8?version=3&version_type=external
    {
    "test_field": "test client 2"
    }

    {
    "_index": "test_index",
    "_type": "test_type",
    "_id": "8",
    "_version": 3,
    "result": "updated",
    "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
    },
    "created": false
    }

partial update内置乐观锁并发控制

image-20210525150841434

post /index/type/id/_update?retry_on_conflict=5&version=6

document原理

路由

image-20210525153053021

document路由到shard上的路由算法:shard = hash(routing) % number_of_primary_shards

举个例子,一个index有3个primary shard,P0,P1,P2,每次增删改查一个document的时候,都会带过来一个routing number,默认就是这个document的_id(可能是手动指定,也可能是自动生成),routing = _id,假设_id=1,会将这个routing值,传入一个hash函数中,产出一个routing值的hash值,hash(routing) = 21,然后将hash函数产出的值对这个index的primary shard的数量求余数,21 % 3 = 0,就决定了,这个document就放在P0上。

决定一个document在哪个shard上,最重要的一个值就是routing值,默认是_id,也可以手动指定,相同的routing值,每次过来,从hash函数中,产出的hash值一定是相同的。无论hash值是几,无论是什么数字,对number_of_primary_shards求余数,结果一定是在0~number_of_primary_shards-1之间这个范围内的。0,1,2。

默认的routing就是_id,也可以在发送请求的时候,手动指定一个routing value,比如说put /index/type/id?routing=user_id。手动指定routing value是很有用的,可以保证说,某一类document一定被路由到一个shard上去,那么在后续进行应用级别的负载均衡,以及提升批量读取的性能。

增删改

  • 客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点)
  • coordinating node,对document进行路由,将请求转发给对应的node(有primary shard)
  • 实际的node上的primary shard处理请求,然后将数据同步到replica node
  • coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端

image-20210525154434211

  1. 客户端发送请求到任意一个node,成为coordinate node
  2. coordinate node对document进行路由,将请求转发到对应的node,此时会使用round-robin随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡
  3. 接收请求的node返回document给coordinate node
  4. coordinate node返回document给客户端
  5. 特殊情况:document如果还在建立索引过程中,可能只有primary shard有,任何一个replica shard都没有,此时可能会导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有了

image-20210525165721839

quorum

我们在发送任何一个增删改操作的时候,比如说put /index/type/id,都可以带上一个consistency参数,指明我们想要的写一致性
put /index/type/id?consistency=quorum

one:要求我们这个写操作,只要有一个primary shard是active活跃可用的,就可以执行
all:要求我们这个写操作,必须所有的primary shard和replica shard都是活跃的,才可以执行这个写操作
quorum:默认的值,要求所有的shard中,必须是大部分的shard都是活跃的,可用的,才可以执行这个写操作

image-20210525164457823

quorum机制,写之前必须确保大多数shard都可用,int( (primary + number_of_replicas) / 2 ) + 1,当number_of_replicas>1时才生效

quroum = int( (primary + number_of_replicas) / 2 ) + 1
举个例子,3个primary shard,number_of_replicas=1,总共有3 + 3 * 1 = 6个shard
quorum = int( (3 + 1) / 2 ) + 1 = 3
所以,要求6个shard中至少有3个shard是active状态的,才可以执行这个写操作

如果节点数少于quorum数量,可能导致quorum不齐全,进而导致无法执行任何写操作。3个primary shard,replica=1,要求至少3个shard是active,3个shard按照之前学习的shard&replica机制,必须在不同的节点上,如果说只有2台机器的话,是不是有可能出现说,3个shard都没法分配齐全,此时就可能会出现写操作无法执行的情况。

1个primary shard,replica=3,quorum=((1 + 3) / 2) + 1 = 3,要求1个primary shard + 3个replica shard = 4个shard,其中必须有3个shard是要处于active状态的。如果这个时候只有2台机器的话,es提供了一种特殊的处理场景,就是说当number_of_replicas>1时才生效,因为假如说,你就一个primary shard,replica=1,此时就2个shard,(1 + 1 / 2) + 1 = 2,要求必须有2个shard是活跃的,但是可能就1个node,此时就1个shard是活跃的,如果你不特殊处理的话,导致我们的单节点集群就无法工作

quorum不齐全时,wait,默认1分钟,timeout,100,30s,等待期间,期望活跃的shard数量可以增加,最后实在不行,就会timeout,我们其实可以在写操作的时候,加一个timeout参数,比如说put /index/type/id?timeout=30,这个就是说自己去设定quorum不齐全的时候,es的timeout时长,可以缩短,也可以增长

image-20210525165432451

文章作者: J
文章链接: http://onejane.github.io/2021/05/13/ElasticSearch之分布式特性/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 万物皆可逆向
支付宝打赏
微信打赏