背景

最近在全面接手估计有1k+的生产日志的运维工作,每天面临着大量的投诉,大部分的诉求都是日志突然没了,为了对生产故障进行排查要紧急查看日志,由于应用系统数量和种类繁多,以及所有日志系统的网络架构的复杂性导致出现这种问题有时并不能快速解决,与其每天被动的的接受开发的投诉,还不如主动的对这1K+应用产生的Elasticsearch索引进行运维治理。

其实应用突然没日志无外乎下面几种:

  • k8s pod里面的log挂载点发生了变化
  • log4j里面的日志路径发生了变化
  • 应用上云后log4j里面的路径没有进行同步变更
  • 还有的windows机器上的filebeat偶尔会挂掉
  • k8s集群新增了节点,pod重启后漂移到了新增的未部署filebeat的节点上了
  • 还有就是本站的这篇博客提到的类似error也会导致采集不到日志

经过最近几天的主动治理,发现当我主动找到开发反馈业务系统半个月或一个多月没日志的时候开发都很配合工作,部门的SLA水平因此在渐渐提高中,可见主动的对Elasticsearch索引的运维治理还是很有必要的。

下面的脚本运行了之后会列出脚本运行时刻往前30天(可以自定义,下面有提到)没有采集到一条日志的应用,然后你就可以抢在开发的前面,主动的去看是上面提到的“应用突然没日志无外乎下面几种”中的哪一种情况导致的了。

运行环境

  • python 3.6.8 (用到了asyncio的一个在3.6存在的方法,没在3.7测试)
  • Elasticsearch 6.x or 7.x 均可

依赖部署

python3 -m pip install 'elasticsearch[async]==7.11.0'
python3 -m pip install requests

使用说明

usage: es.py [-h] -v VERSION

A tool to do some governance on es index via es rest api.

optional arguments:
  -h, --help            show this help message and exit
  -v VERSION, --version VERSION
                        the es backend version 6.x or 7.x

使用示例

python3 es.py -v 6

脚本对elasticsearch 6.x 和7.x 返回的json字符串的解析均做了定向的处理,可以完美兼容两个大版本。由于脚本采用了异步模式,上千个索引的检查只要几十秒就可以搞定。

脚本源码

import re
import sys
import asyncio
import requests
import argparse
from elasticsearch import AsyncElasticsearch

class ElasticsearchApi(object):
	# 根据生产实际情况修改下面的ip、用户名密码之类的
    ip=''
    port=9200
    user = ''
    passwd = ''
    es= AsyncElasticsearch(
        [ip],
        http_auth=(user, passwd),
        scheme="http",
        port=port)
    # 这里的gte后面的30d是不固定可以结合实际业务情况进行修改,这里的30d代表检查当前的时间点往前推30天的日志条数
    body={
    "query": {
        "bool": {
          "must": [],
          "filter": [
            {
              "match_all": {}},
            {
              "range": {
                "@timestamp": {
                  "gte": "now-30d/d",
                  "lte": "now/d"}}}],
          "should": [],
          "must_not": []}}}
    
    def __init__(self, version):
        self.version = version

    def get_index_list(self):
        index_list_url = 'http://' + self.ip + ':' + str(self.port)  + "/_cat/indices"
        response_data = requests.get(index_list_url, auth=(self.user, self.passwd)).text
        if self.version == 7:
            index_list = [data.split(' ')[3] for data in response_data.strip().split('\n')]
        elif self.version == 6:
            index_list = [data.split(' ')[2] for data in response_data.strip().split('\n')]
        else:
            print('Not supported version: ' + str(self.version))
            sys.exit(1)

        index_list_cleaned = []
        for index in index_list:
            # 这里的xx-索引前缀也要根据生产的实际值进行修改,这里是为了过滤掉一些业务无关的索引
            if re.match('^xx-', index) is not None:
                index_list_cleaned.append(index)
        return index_list_cleaned

    async def get_index_hit_number(self,index_name):
        resp = await self.es.search(
            index=index_name+'-*',
            body=self.body,
            size=20)
        if self.version == 7:
            hits_number = resp['hits']['total']['value']
        elif self.version == 6:
            hits_number = resp['hits']['total']
        else:
            print('Not supported version: ' + str(self.version))
            sys.exit(1)

        if hits_number == 0:
            print('Few days to Now No Log Exception: ' + index_name + '-*')
    
    async def close_session(self):
        await self.es.close()
    
if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='A tool to do some governance on es index via es rest api.')

    parser.add_argument(
        '-v', '--version',
        required=True,
        type=int,
        help='the es backend version 6.x or 7.x')
    args = parser.parse_args()
    es_api = ElasticsearchApi(args.version)
    index_list = es_api.get_index_list()

    async def main(index_list):
        index_list_unique = list(set(['-'.join(i.split('-')[:-1]) for i in index_list]))
        with open('white_list.txt') as white:
            remove_list=white.read().strip().split('\n')
            for removing in remove_list:
                if removing in index_list_unique:
                    index_list_unique.remove(removing)
        await asyncio.gather(*[es_api.get_index_hit_number(index) for index in index_list_unique])
        await es_api.close_session()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(index_list))

这里需要重点关注的是请求的body json串中的gte的值,比如说你有一个系统只会在每个月的月底才会有人访问产生日志,那值就可以设为30,以此类推。

另外末尾的

loop = asyncio.get_event_loop()
loop.run_until_complete(main(index_list))

如果要在python3.7运行,需要换成_asyncio_.run(),印象中没问题,请各位自行测试一下。

有啥疑问欢迎评论留言哈,每一条评论我都会收到邮件通知然后及时回复大家的。