Elasticsearch学习笔记

在容器云的日志收集和分析中,没有采用CNCF的fluentd方案,而是直接采用了elastic-stack的一整套解决方案。

通过服务编排的时候写日志收集的CRD到K8S,在主机上通过daemonset的方式来运行fileBeat收集容器日志;fileBeat的output是kafka,通过消息中间件来做汇聚和压力削峰,接下来再是logstach和elasticsearch。这只是日志收集的流程,其实里面还涉及具体如何做系统的负载均衡,日志流的告警等,这里就不一一分析。

本文重点聊下elasticsearch相关的一些知识点。

ES安装

部署

1
docker run --name=elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms256m -Xmx256m" -d elasticsearch:7.3.0

插件命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
root@2c03a6312e66:/usr/share/elasticsearch# elasticsearch-plugin --help
A tool for managing installed elasticsearch plugins
Commands
--------
list - Lists installed elasticsearch plugins
install - Install a plugin
remove - removes a plugin from Elasticsearch

Non-option arguments:
command

Option Description
------ -----------
-h, --help show help
-s, --silent show minimal output
-v, --verbose show verbose output

# 安装插件
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.3.0/elasticsearch-analysis-ik-7.3.0.zip

Kibana安装

1
docker run -d --link elasticsearch:elasticsearch --name kibana -p 5601:5601 kibana:7.3.0

ES核心知识

主要涉及对ES整体框架认识理解的几个方面。

集群管理

节点角色

  • Master节点

    Master节点控制Elasticsearch集群,并负责在集群范围内创建/删除索引,跟踪哪些节点是集群的一部分,并将分片分配给这些节点。主节点一次处理一个集群状态,并将状态广播到所有其他节点,这些节点需要响应并确认主节点的信息。

    在elasticsearch.yml中,将nodes.master属性设置为true(默认),可以将节点配置为有资格成为主节点的节点。对于大型生产集群,建议拥有一个专用主​​节点来控制集群,并且不服务任何用户请求。

  • Data节点

    数据节点用来保存数据和倒排索引。默认情况下,每个节点都配置为一个data节点,并且在elasticsearch.yml中将属性node.data设置为true。如果您想要一个专用的master节点,那么将node.data属性更改为false。

  • Client节点

    如果将node.master和node.data设置为false,则将节点配置为客户端节点,并充当负载平衡器,将传入的请求路由到集群中的不同节点。若你连接的是作为客户端的节点,该节点称为协调节点(coordinating node)。协调节点将客户机请求路由到集群中对应分片所在的节点。对于读取请求,协调节点每次选择不同的分片来提供请求以平衡负载。

跨集群搜索

相当于在多个ES集群之外,配置一个ES作为代理,这个代理节点不会加入到存储数据的ES集群中;它只是作为查询代理节点。

配置的方式可以是直接修改yaml文件,或者通过HTTP的方式下发配置,指定remote ES cluster。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
PUT _cluster/settings
{
"persistent": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
]
},
"cluster_three": {
"seeds": [
"127.0.0.1:9302"
]
}
}
}
}
}

具体参考这里

故障检测

zen discovery模块

  • Ping:执行该过程的节点,用来发现对方。
  • Unicast:该模块包含一个主机名列表,用于控制要ping哪个节点。

故障检测的过程是这样的: 主节点ping所有的节点来检查这些节点是否存活,而所有的节点回ping主节点来汇报他们是存活的。

discovery.zen.minimum_master_nodes

推荐的值为: (num_master_node / 2) + 1

假设我们有一个集群,有3个master节点,当网络发生故障的时候,有可能其中一个节点不能和其他节点进行通信了。这个时候,当discovery.zen.minimum_master_nodes设置为1,就会分成两个小的独立集群,当网络好的时候,就会出现数据错误或者丢失数据的情况。当discovery.zen.minimum_master_nodes设置为2的时候,一个网络中有两个主资格节点,可以继续工作,另一部分,由于只有一个master节点,则不会形成一个独立的集群,这个时候当网络恢复的时候,节点又会从新加入集群。

shard (分片)

每个索引包含多个shard,默认是5个,Shard是一个最小的Lucene索引单元。

  1. 分片,ES是分布式搜索引擎,每个索引有一个或多个分片,索引的数据被分配到各个分片上,相当于一桶水用了N个杯子装。
  2. 分片有助于横向扩展,N个分片会被尽可能平均地(rebalance)分配在不同的节点上(例如你有2个节点,4个主分片,那么每个节点会分到2个分片,后来你增加了2个节点,那么你这4个节点上都会有1个分片,这个过程叫relocation,ES感知后自动完成)。
  3. 分片是独立的,对于一个Search Request的行为,每个分片都会执行这个Request。
  4. 每个分片都是一个Lucene Index,所以一个分片只能存放 Integer.MAX_VALUE - 128 = 2,147,483,519 个docs。

当写入一个document的时候,ES通过对docid进行hash来确定将其放在哪个shard上面,然后在shard上面进行索引存储(可以通过指定routing字段来固定hash值,从而将document固定到某个分片上)。

replicas (副本)

replicas就是备份,ES采用的是Push Replication模式,当你往master主分片上面索引一个文档,该分片会复制该文档到分片副本中,这些分片也会索引这个文档。默认情况下一个索引创建5个分片一个备份,即 5primary + 5replica = 10个分片

  1. 主分片和备分片不会出现在同一个节点上(防止单点故障)。
  2. 如果只有一个节点,那么5个replica都无法分配(unassigned),此时cluster status会变成Yellow。
  3. primary分片丢失,replica分片就会被顶上去成为新的主分片,同时根据这个新的主分片创建新的replica,集群数据安然无恙。
  4. 索引写请求只能发生在主分片上,replica不能执行索引写请求(可以执行读请求)。
  5. 对于一个索引,除非重建索引(reindex,需先close索引)否则不能调整分片的数目(主分片数, number_of_shards),但可以随时调整replica数(number_of_replicas)。

写数据

主要涉及ES为何能够做到实时查询的相关原理。

flush/refresh

决定一个document在哪个shard上,最重要的一个值就是routing值,默认是_id,也可以手动指定。routing也可以在发送请求的时候,手动指定一个routing value,比如说:put /index/type/id?routing=user_id

当向elasticsearch发送创建document索引请求的时候,document数据会先进入到index buffer之后,与此同时会将操作记录在translog之中,当发生 refresh 时(数据从index buffer中进入filesystem cache的过程,时间很短,默认是1s一次,写入cache的数据其实是按照多个index segment文件的形势组织的)translog中的操作记录并不会被清除,而是当数据从filesystem cache中被写入磁盘之后才会将translog中清空。而从filesystem cache写入磁盘的过程就是 flush,该过程会写一个commit point,指明这次flush包含了哪些index segment。

translog

  • translog的功能

    保证在filesystem cache中的数据不会因为elasticsearch重启或是发生意外故障的时候丢失。
    当系统重启时会从translog中恢复之前记录的操作。
    当对elasticsearch进行CRUD操作的时候,会先到translog之中进行查找,因为tranlog之中保存的是最新的数据。
    translog的清除时间到期时进行flush操作之后(将数据从filesystem cache刷入disk之中)。
    flush操作的时候,会先保存buffer的数据到filesystem cache,然后将cache的数据refresh到disk,再清除translog的数据。

  • flush操作的时间点

    es的各个shard会每个30分钟进行一次flush操作。
    当translog的数据达到某个上限的时候会进行一次flush操作。

    [Note]:但是由于translog本身cache的缘故,如果此时关机,可能导致5秒钟的数据丢失。

  • translog和flush的一些配置项

    index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush。默认是 unlimited。
    index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作。默认是512mb。
    index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作。默认是30m。
    index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作。es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s。

删除、更新数据

如果是删除操作,commit 的时候会生成一个 .del 文件,里面将某个 doc 标识为 deleted 状态,那么搜索的时候根据 .del 文件就知道这个 doc 是否被删除了。如果是更新操作,就是将原来的 doc 标识为 deleted 状态,然后新写入一条数据。

buffer 每 refresh 一次,就会产生一个 index segment file(在OS cache中),所以默认情况下是 1 秒钟一个 segment file,这样下来 segment file 会越来越多。

ES会定期执行 merge(也就是translog flush时候的commit point),每次 merge 的时候,会将多个 segment file 合并成一个。同时这里会将标识为 deleted 的 doc 给物理删除掉,然后将新的 segment file 写入磁盘。这里会写一个 commit point,标识所有新的 segment file,然后打开 segment file 供搜索使用,同时删除旧的 segment file。(该流程类似于etcd的compact和Hbase的region merge)

reindex

可以实现将远端数据index到本地,也可以将本地的index做修改字段等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200", // 远程es的ip和port列表
"socket_timeout": "1m",
"connect_timeout": "10s" // 超时时间设置
},
"index": "my_index_name", // 源索引名称
"query": { // 满足条件的数据
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest_index_name" // 目标索引名称
},
"script": { // 修改信息
"source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
"lang": "painless"
}

}

查询

当进行查询时:

  1. 如果提供了查询的DocID,ES通过hash就知道Doc存在哪个shard上面,然后再基于随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡。
  2. 如果不提供DocID(比如搜索), 请求先发到协调节点,协调节点会在该Index(indics)的shard所在的所有primary或replicas上执行搜索;每个shard将搜索结果(其实就是一些doc id)返回给协调节点。协调节点会对结果进行合并、排序、分页等操作,生成结果。然后根据结果中的doc id去各个node上拉取文档数据,然后返回给客户端。

查询方式

各类查询

  • match all

    1
    2
    3
    4
    5
    6
    GET /_search
    {
    "query": {
    "match_all": {}
    }
    }
  • match

    1
    2
    3
    4
    5
    6
    7
    8
    GET /_search
    {
    "query": {
    "match": {
    "title": "my elasticsearch article"
    }
    }
    }
  • multi match

    1
    2
    3
    4
    5
    6
    7
    8
    9
    GET /test_index/test_type/_search
    {
    "query": {
    "multi_match": {
    "query": "test",
    "fields": ["test_field", "test_field1"]
    }
    }
    }
  • range query

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    GET /company/employee/_search 
    {
    "query": {
    "range": {
    "age": {
    "gte": 30
    }
    }
    }
    }
  • term query

    1
    2
    3
    4
    5
    6
    7
    8
    GET /test_index/test_type/_search 
    {
    "query": {
    "term": {
    "test_field": "test hello"
    }
    }
    }
  • terms query

    1
    2
    3
    4
    5
    6
    7
    8
    GET /_search
    {
    "query": {
    "terms": {
    "tag": [ "search", "full_text", "nosql" ]
    }
    }
    }
  • 条件组合query

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    GET /website/article/_search
    {
    "query": {
    "bool": {
    "must": [
    {
    "match": {
    "title": "elasticsearch"
    }
    }
    ],
    "should": [
    {
    "match": {
    "content": "elasticsearch"
    }
    }
    ],
    "must_not": [
    {
    "match": {
    "author_id": 111
    }
    }
    ]
    }
    }
    }
  • scroll query

    1. 使用scoll滚动搜索,可以先搜索一批数据,然后下次再搜索一批数据,以此类推,直到搜索出全部的数据来
    2. scroll搜索会在第一次搜索的时候,保存一个当时的视图快照,之后只会基于该旧的视图快照提供数据搜索,如果这个期间数据变更,是不会让用户看到的
    3. 采用基于_doc进行排序的方式,性能较高
    4. 每次发送scroll请求,我们还需要指定一个scoll参数,指定一个时间窗口,每次搜索请求只要在这个时间窗口内能完成就可以了
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    GET /test_index/test_type/_search?scroll=1m
    {
    "query": {
    "match_all": {}
    },
    "sort": [ "_doc" ],
    "size": 3
    }

    # 当scroll返回ID之后,第二次使用该ID查询;

    GET /_search/scroll
    {
    "scroll": "1m",
    "scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACxeFjRvbnNUWVZaVGpHdklqOV9zcFd6MncAAAAAAAAsYBY0b25zVFlWWlRqR3ZJajlfc3BXejJ3AAAAAAAALF8WNG9uc1RZVlpUakd2SWo5X3NwV3oydwAAAAAAACxhFjRvbnNUWVZaVGpHdklqOV9zcFd6MncAAAAAAAAsYhY0b25zVFlWWlRqR3ZJajlfc3BXejJ3"
    }
  • 排序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    GET /company/employee/_search 
    {
    "query": {
    "constant_score": {
    "filter": {
    "range": {
    "age": {
    "gte": 30
    }
    }
    }
    }
    },
    "sort": [
    {
    "join_date": {
    "order": "asc"
    }
    }
    ]
    }

filter/query区别

  • filter,仅仅只是按照搜索条件过滤出需要的数据而已,不计算任何相关度分数,对相关度没有任何影响
  • query,会去计算每个document相对于搜索条件的相关度,并按照相关度进行排序

    一般来说,如果是在进行搜索,需要将最匹配搜索条件的数据先返回,那么用query;
    如果只是要根据一些条件筛选出一部分数据,不关注其排序,那么用filter。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    GET /company/employee/_search
    {
    "query": {
    "bool": {
    "must": [
    {
    "match": {
    "join_date": "2019-01-01"
    }
    }
    ],
    "filter": {
    "range": {
    "age": {
    "gte": 30
    }
    }
    }
    }
    }
    }

multi-index/multi-type搜索

/_search:所有索引,所有type下的所有数据都搜索出来
/index1/_search:指定一个index,搜索其下所有type的数据
/index1,index2/_search:同时搜索两个index下的数据
/1,2/_search:按照通配符去匹配多个索引
/index1/type1/_search:搜索一个index下指定的type的数据
/index1/type1,type2/_search:可以搜索一个index下多个type的数据
/index1,index2/type1,type2/_search:搜索多个index下的多个type的数据
/_all/type1,type2/_search:_all,可以代表搜索所有index下的指定type的数据

mapping

mapping,就是index的type的元数据,每个type都有一个自己的mapping,决定了数据类型,建立倒排索引的行为,还有进行搜索的行为。

ES默认是动态创建索引和索引类型的mapping的。这就相当于无需定义Schema,无需指定各个字段的索引规则就可以索引文件,很方便。但有时方便就代表着不灵活。比如,ES默认一个字段是要做分词的,但我们有时要搜索匹配整个字段却不行。如有统计工作要记录每个城市出现的次数。对于NAME字段,若记录“new york”文本,ES可能会把它拆分成“new”和“york”这两个词,分别计算这个两个单词的次数,而不是我们期望的“new york”。

这时,就需要我们在创建索引时定义mapping。此外,es支持多字段结构,例如:我们希望两个字段中有相同的值,一个用于搜索,一个用户排序;或者一个用于分词器分析,一个用于空白字符。例如:编写mapping文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{	
"index_type":{
"properties":{
"ID":{
"type":"string",
"index":"not_analyzed"
},
"NAME":{
"type":"string",
"fields":{
"NAME":{
"type":"string"
},
"raw":{
"type":"string",
"index":"not_analyzed"
}
}
}
}
}
}

就是说我们对于index_type这个索引类型,定义了它的mapping。重点是将NAME这个字段映射为两个,一个是需要做索引分析的NAME,另一个是不分析的raw,即不会拆分new york这种词组。这样我们在做搜索的时候,就可以对NAME.raw这个字段做term aggregation,获得所有城市出现的次数了。

核心的数据类型

  • string
  • byte,short,integer,long
  • float,double
  • boolean
  • date

dynamic mapping

  • true or false –> boolean
  • 123 –> long
  • 123.45 –> double
  • 2017-01-01 –> date
  • “hello world” –> string/text

查看mapping

1
GET /index/_mapping/type

总结

  • 往es里面直接插入数据,es会自动建立索引,同时建立type以及对应的mapping
  • mapping中就自动定义了每个field的数据类型
  • 不同的数据类型(比如说text和date),可能有的是exact value,有的是full text
  • exact value,在建立倒排索引的时候,分词的时候,是将整个值一起作为一个关键词建立到倒排索引中的;full text,会经历各种各样的处理,分词,normaliztion(时态转换,同义词转换,大小写转换),才会建立到倒排索引中
  • 同时呢,exact value和full text类型的field就决定了,在一个搜索过来的时候,对exact value field或者是full text field进行搜索的行为也是不一样的,会跟建立倒排索引的行为保持一致;比如说exact value搜索的时候,就是直接按照整个值进行匹配,full text query string,也会进行分词和normalization再去倒排索引中去搜索
  • 可以用es的dynamic mapping,让其自动建立mapping,包括自动设置数据类型;也可以提前手动创建index和type的mapping,自己对各个field进行设置,包括数据类型,包括索引行为,包括分词器,等等

phrase

首先说,wildcard/regexp/prefix这三个的性能都不好!wildcard和regexp查询的工作方式和prefix查询完全一样。它们也需要遍历倒排索引中的词条列表来找到所有的匹配词条,然后逐个词条地收集对应的文档ID。它们和prefix查询的唯一区别在于它们能够支持更加复杂的模式。

prefix,wildcard以及regexp查询基于词条进行操作。如果你在一个analyzed字段上使用了它们,它们会检查字段中的每个词条,而不是整个字段。比如,假设我们的title字段中含有”Quick brown fox”,它会产生词条quick,brown和fox。这个查询能够匹配:

1
{ "regexp": { "title": "br.*" }}

而不会匹配:

1
{ "regexp": { "title": "quick br*" }}

如果需要匹配字符串”quick brown fox”,可以采用phrase。phrase在ES中叫邻近匹配, slop值来用指定查询项之间可以分隔多远的距离。

1
2
3
4
5
6
7
8
9
10
11
12
POST /bookdb_index/book/_search
{
"query": {
"multi_match" : {
"query": "quick brown fox",
"fields": ["title", "summary"],
"type": "phrase",
"slop": 1
}
},
"_source": [ "title", "summary", "publish_date" ]
}

前面的查询的数据必须满足下面3个条件:

  1. quick, brown, fox 三个词必须同时出现
  2. brown 的位置必须比quick大1
  3. fox 的位置必须比quick大2

有关ES查询相关的方法,可以参考这篇文档, 总结的比较好!

bulk api

bulk的json数据结构如下:

1
2
3
4
{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n

bulk api的实例如下:

1
2
3
4
5
6
7
8
POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }}
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

一个bulk api中可以带多种操作,如果采用数组的方式,需要java做json数组到数据结构的解析,会消耗掉大量的内存。使用上面的结构后,有以下好处。

  1. 不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json
  2. 对每两个一组的json,读取meta,进行document路由
  3. 直接将对应的json发送到node上去

Logstash

pipeline

  • 可以运行多个pipeline,这个需要指定一个配置文件pipelines.yml,在这个文件中指定每一个pipeline所使用的具体的input,filter,ouput的配置文件;
  • 一般情况下,一个logstash只运行一个pipeline,可以指定pipeline的worker数为cpu cores数,但是这样容易出现对多个input的相互阻塞的问题;
  • 多个pipeline的分别拥有隔离的Persistent queues 和 dead letter queues,但是需要小心设置其workers,因为会出现资源竞争的问题;

高可用场景

  • 直接使用beats对接到logstash,这样logstash只需要扩展节点,因为beats可以做客户端负载均衡;
  • 使用kafka,然后将多个logstash节点配置为consumer去消费同一个topic的group,这样kafka可做负载均衡;
  • 对于udp、tcp之类的力量,需要在logstash nodes前添加负载均衡设备

Plugins

Logstash有按照条件发送邮件的output plugins,按照这个的逻辑,可以实现一个告警的output plugins,在被匹配的规则满足条件后,调用webhook或者是直接发送邮件,从而实现日志告警功能。

0%