如何对1K+应用产生的Elasticsearch索引进行运维治理

84次阅读
没有评论

共计 3491 个字符,预计需要花费 9 分钟才能阅读完成。

背景

最近在全面接手估计有 1k+ 的生产日志的运维工作,每天面临着大量的投诉,大部分的诉求都是日志突然没了,为了对生产故障进行排查要紧急查看日志,由于应用系统数量和种类繁多,以及所有日志系统的网络架构的复杂性导致出现这种问题有时并不能快速解决,与其每天被动的的接受开发的投诉,还不如主动的对这 1K+ 应用产生的 Elasticsearch 索引进行运维治理。

其实应用突然没日志无外乎下面几种:

  • k8s pod 里面的 log 挂载点发生了变化
  • log4j 里面的日志路径发生了变化
  • 应用上云后 log4j 里面的路径没有进行同步变更
  • 还有的 windows 机器上的 filebeat 偶尔会挂掉
  • k8s 集群新增了节点,pod 重启后漂移到了新增的未部署 filebeat 的节点上了
  • 还有就是本站的 这篇博客 提到的类似 error 也会导致采集不到日志

经过最近几天的主动治理,发现当我主动找到开发反馈业务系统半个月或一个多月没日志的时候开发都很配合工作,部门的 SLA 水平因此在渐渐提高中,可见主动的对 Elasticsearch 索引的运维治理还是很有必要的。

下面的脚本运行了之后会列出脚本运行时刻往前 30 天 (可以自定义,下面有提到) 没有采集到一条日志的应用,然后你就可以抢在开发的前面,主动的去看是上面提到的 “ 应用突然没日志无外乎下面几种 ” 中的哪一种情况导致的了。

运行环境

  • python 3.6.8(用到了 asyncio 的一个在 3.6 存在的方法,没在 3.7 测试)
  • Elasticsearch 6.x or 7.x 均可

依赖部署

python3 -m pip install 'elasticsearch[async]==7.11.0'
python3 -m pip install requests

使用说明

usage: es.py [-h] -v VERSION

A tool to do some governance on es index via es rest api.

optional arguments:
  -h, --help            show this help message and exit
  -v VERSION, --version VERSION
                        the es backend version 6.x or 7.x

使用示例

python3 es.py -v 6

脚本对 elasticsearch 6.x 和 7.x 返回的 json 字符串的解析均做了定向的处理,可以完美兼容两个大版本。由于脚本采用了异步模式,上千个索引的检查只要几十秒就可以搞定。

脚本源码

import re
import sys
import asyncio
import requests
import argparse
from elasticsearch import AsyncElasticsearch

class ElasticsearchApi(object):
	# 根据生产实际情况修改下面的 ip、用户名密码之类的
    ip=''
    port=9200
    user = ''passwd =''
    es= AsyncElasticsearch([ip],
        http_auth=(user, passwd),
        scheme="http",
        port=port)
    # 这里的 gte 后面的 30d 是不固定可以结合实际业务情况进行修改,这里的 30d 代表检查当前的时间点往前推 30 天的日志条数
    body={
    "query": {
        "bool": {"must": [],
          "filter": [
            {"match_all": {}},
            {
              "range": {
                "@timestamp": {
                  "gte": "now-30d/d",
                  "lte": "now/d"}}}],
          "should": [],
          "must_not": []}}}
    
    def __init__(self, version):
        self.version = version

    def get_index_list(self):
        index_list_url = 'http://' + self.ip + ':' + str(self.port)  + "/_cat/indices"
        response_data = requests.get(index_list_url, auth=(self.user, self.passwd)).text
        if self.version == 7:
            index_list = [data.split('')[3] for data in response_data.strip().split('\n')]
        elif self.version == 6:
            index_list = [data.split('')[2] for data in response_data.strip().split('\n')]
        else:
            print('Not supported version:' + str(self.version))
            sys.exit(1)

        index_list_cleaned = []
        for index in index_list:
            # 这里的 xx- 索引前缀也要根据生产的实际值进行修改,这里是为了过滤掉一些业务无关的索引
            if re.match('^xx-', index) is not None:
                index_list_cleaned.append(index)
        return index_list_cleaned

    async def get_index_hit_number(self,index_name):
        resp = await self.es.search(
            index=index_name+'-*',
            body=self.body,
            size=20)
        if self.version == 7:
            hits_number = resp['hits']['total']['value']
        elif self.version == 6:
            hits_number = resp['hits']['total']
        else:
            print('Not supported version:' + str(self.version))
            sys.exit(1)

        if hits_number == 0:
            print('Few days to Now No Log Exception:' + index_name + '-*')
    
    async def close_session(self):
        await self.es.close()
    
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='A tool to do some governance on es index via es rest api.')

    parser.add_argument(
        '-v', '--version',
        required=True,
        type=int,
        help='the es backend version 6.x or 7.x')
    args = parser.parse_args()
    es_api = ElasticsearchApi(args.version)
    index_list = es_api.get_index_list()

    async def main(index_list):
        index_list_unique = list(set(['-'.join(i.split('-')[:-1]) for i in index_list]))
        with open('white_list.txt') as white:
            remove_list=white.read().strip().split('\n')
            for removing in remove_list:
                if removing in index_list_unique:
                    index_list_unique.remove(removing)
        await asyncio.gather(*[es_api.get_index_hit_number(index) for index in index_list_unique])
        await es_api.close_session()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(index_list))

这里需要重点关注的是请求的 body json 串中的 gte 的值,比如说你有一个系统只会在每个月的月底才会有人访问产生日志,那值就可以设为 30,以此类推。

另外末尾的

loop = asyncio.get_event_loop()
loop.run_until_complete(main(index_list))

如果要在 python3.7 运行,需要换成asyncio.run(),印象中没问题,请各位自行测试一下。

有啥疑问欢迎评论留言哈,每一条评论我都会收到邮件通知然后及时回复大家的。

正文完
 
sharp097
版权声明:本站原创文章,由 sharp097 2021-03-03发表,共计3491字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)