从kafka-lag引出的es问题

早上突然收到一个kafka lag的报警,一个平时5k lag的consumer突然lag到了14k多。

这也高的太多了,而且1分钟多点就下来了,这个第一反应是有实例重启了,可查看监控没有那个时间点的重启的时间啊。

那就看服务的日志吧,还真在那个时间点有3个error日志。看日志就是队列堵了。有些事件被丢弃了。

这时候再看下es的监控,发现负载有明显升高在那个时间点上。整个es集群所有机器的CPU空闲率从80%降到了30%左右,系统负载也大幅升高。

然后又看了下es的query time分布,发现那个时间点有大量的慢查询了。大量请求超过了预设的绿线值。

这个时候去查了下es的日志,发现有用户的查询对一个大索引引发了很多DFS_QUERY_THEN_FETCH类型的查询,这种查询由于要求精确性,查询次数和效率都比较差。从而导致整个队列堵了。

而针对es的search thread pool我们并没有进行优化,都是默认值。es官方文档里有写计算公式。

1
2
search
For count/search operations. Thread pool type is fixed with a size of int((# of available_processors * 3) / 2) + 1, queue_size of 1000.

那我们这种16核的服务器就是25个线程了。我们可以查询当前的使用情况。

1
2
3
4
5
6
7
8
9
10
curl 'http://192.168.106.45:9200/_cat/thread_pool?v'
host ip bulk.active bulk.queue bulk.rejected index.active index.queue index.rejected search.active search.queue search.rejected
192.168.106.44 192.168.106.44 0 0 0 0 0 2011 0 0 5640536
192.168.106.41 192.168.106.41 0 0 0 0 0 838 0 0 5661985
192.168.106.42 192.168.106.42 0 0 0 0 0 2444 0 0 5326245
192.168.106.11 192.168.106.11 0 0 0 0 0 0 0 0 0
192.168.106.43 192.168.106.43 0 0 0 0 0 1281 0 0 5374522
192.168.106.12 192.168.106.12 0 0 0 0 0 0 0 0 0
192.168.106.45 192.168.106.45 0 0 0 4 0 1784 0 0 5787051
192.168.106.13 192.168.106.13 0 0 0 0 0 0 0 0 0

看这数据是一直累加的结果,可真不少啊。看来这个需要在线程池做更多的优化了。

于是就看了现在的配置,发现需要修改的地方还不少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
curl http://192.168.106.45:9200/_nodes/thread_pool/ | jq .

"Ixh3hhf4Qv-lLpBna0pBNw": {
"name": "192.168.106.45",
"transport_address": "192.168.106.45:9300",
"host": "192.168.106.45",
"ip": "192.168.106.45",
"version": "X.X.X",
"build": "YYYYY",
"http_address": "192.168.106.45/192.168.106.45:9200",
"attributes": {
"master": "false"
},
"thread_pool": {
"force_merge": {
"type": "fixed",
"min": 1,
"max": 1,
"queue_size": -1
},
"percolate": {
"type": "fixed",
"min": 16,
"max": 16,
"queue_size": 1000
},
"fetch_shard_started": {
"type": "scaling",
"min": 1,
"max": 32,
"keep_alive": "5m",
"queue_size": -1
},
"listener": {
"type": "fixed",
"min": 8,
"max": 8,
"queue_size": -1
},
"index": {
"type": "fixed",
"min": 16,
"max": 16,
"queue_size": 200
},
"refresh": {
"type": "scaling",
"min": 1,
"max": 8,
"keep_alive": "5m",
"queue_size": -1
},
"suggest": {
"type": "fixed",
"min": 16,
"max": 16,
"queue_size": 1000
},
"generic": {
"type": "cached",
"keep_alive": "30s",
"queue_size": -1
},
"warmer": {
"type": "scaling",
"min": 1,
"max": 5,
"keep_alive": "5m",
"queue_size": -1
},
"search": {
"type": "fixed",
"min": 25,
"max": 25,
"queue_size": 1000
},
"flush": {
"type": "scaling",
"min": 1,
"max": 5,
"keep_alive": "5m",
"queue_size": -1
},
"fetch_shard_store": {
"type": "scaling",
"min": 1,
"max": 32,
"keep_alive": "5m",
"queue_size": -1
},
"management": {
"type": "scaling",
"min": 1,
"max": 5,
"keep_alive": "5m",
"queue_size": -1
},
"get": {
"type": "fixed",
"min": 16,
"max": 16,
"queue_size": 1000
},
"bulk": {
"type": "fixed",
"min": 16,
"max": 16,
"queue_size": 50
},
"snapshot": {
"type": "scaling",
"min": 1,
"max": 5,
"keep_alive": "5m",
"queue_size": -1
}
}
}

由于这个集群用的版本还比较老还可以使用curl的方式直接改

1
2
3
4
5
curl -XPUT  _cluster/settings -d '{
"persistent" : {
"threadpool.search.queue_size" : <new_size>
}
}'

全部搞定。 后面要的就是推动研发修改那块部分使用的查询方法,以及如何进行限流。还有需要总结下处理流程和报警项,这样后面才可以自动化的进行处理。

本文参考:
https://www.elastic.co/guide/en/elasticsearch/reference/2.1/modules-threadpool.html
https://kionf.com/2019/01/22/errornote-elk/