共计 10764 个字符,预计需要花费 27 分钟才能阅读完成。
场景
目前的例行发布做了发布前自动屏蔽触发器,发布后恢复的操作,由于有时候发布过程中某一步失败了,人工介入处理之后整个发布流程没有继续走完就会导致触发器未恢复。
代码
代码分两块,一个是 main.py,一个是 github 上的开源脚本 zhtrigfinder.py,仓库名叫 q1x/zabbix-gnomes,感谢,该仓库年久失修,为了防止失联本人 fork 了一下,也在下面把源码直接贴了出来,请把两个代码放到同一级目录。
整个项目依赖 python2,主要是这个七八年前的上古开源脚本 zhtrigfinder.py 也依赖 python2。
脚本依赖安装如下:
python2 -m pip install pyzabbix==1.0.0 requests==2.26.0
这两个脚本公用一个叫 zbx_prod.conf 的配置文件,格式如下:
[Zabbix API]
username=johndoe
password=verysecretpassword
api=https://zabbix.mycompany.com/path/to/zabbix/frontend/
no_verify=true
所以主代码 main.py 的运行方式如下:
python main.py -c zbx_prod.conf
另外,主代码中你只需要修改企业微信的群组 chat_id 和机器人公网请求地址,此外修改一下 operation_dictoperation_dict 这个字典中的 zabbix agent 的 ip 和触发器中关键字就行。
main.py 代码如下:
# -*- encoding: utf-8 -*-
import subprocess
import argparse
import ConfigParser
import os
import os.path
import sys
import distutils.util
import requests
import json
from pyzabbix import ZabbixAPI
import logging
import time
def logger_getter():
today = time.strftime("%Y-%m-%d", time.localtime())
logger = logging.getLogger()
if not len(logger.handlers):
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s ||| %(levelname)s ||| %(lineno)d ||| %(funcName)s ||| %(message)s",
datefmt='%Y-%m-%d %H:%M:%S')
file_handler = logging.FileHandler('/data/logs/zabbix_trigger_disabled_notify/debug.log.' + today)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def check_kw(keywords, host_ip):
try:
cmd="python zhtrigfinder.py -c zbx_prod.conf -s'{0}'-n'{1}'".format(keywords, host_ip)
# print(cmd)
trigger_id_list = subprocess.check_output(cmd, shell=True)
return trigger_id_list.strip()
except subprocess.CalledProcessError:
return ''
def send_msg_2_wework(trigger_notify_list):
if trigger_notify_list:
content = """# **zabbix 触发器巡检通知 **
<font color='warning'>** 巡检状态:**</font> 异常
<font color='warning'>** 巡检详情:**</font> 以下触发器未恢复,请检查核实!{0}
<@xxxx> <@xxxx>
""".format('\n'.join(['IP: '+ t[1] +' 触发器名称:' + t[0] for t in trigger_notify_list]))
else:
content = """# **zabbix 触发器巡检通知 **
<font color='info'>** 巡检状态:**</font> 正常
<font color='info'>** 巡检详情:**</font> 容器触发器状态一切正常
<@xxxx> <@xxx>
"""
# prod
chat_id = 'xxxxx'
# test
diag = {"chatid": chat_id,
"msgtype":"markdown",
"markdown": {"content": content}}
headers ={"Content-Type":"application/json"}
diag = json.dumps(diag)
requests.post('weixin.qq.com/cgi-bin/xxxxxx',
data=diag, headers=headers)
def trigger_id_to_txt(zapi,t_id):
check=zapi.trigger.get(filter={'triggerid':t_id}, output=['description','status'], expandDescription=1)
return check[0]['description'].encode('utf-8')
def get_trigger_list(operation_dict):
# test
# operation_dict = {'10.99.224.4': ['tof']}
trigger_notify_list = []
for item in operation_dict:
ip_list = item.split(',')
for ip in ip_list:
keywords_list = operation_dict[item]
trigger_id_txt_all = []
# coz the ip maybe has two keywords,so at first we iterate the kw list to get a string list which seperated by '\n'
for kw in keywords_list:
#print('kw:' + kw)
#print('ip:' + ip)
trigger_id_txt = check_kw(kw, ip)
trigger_id_txt_all.append(trigger_id_txt)
#print(trigger_id_txt_all)
trigger_id_list_all = []
# Second, we iterate the txt based string list to get the whole trigger id list,which contains one trigger id for each list element
for txt in trigger_id_txt_all:
trigger_id_list = [id_ for id_ in txt.strip().split('\n')]
for id_ in trigger_id_list:
trigger_id_list_all.append(id_)
# Then we check the trigger's status
for id_ in trigger_id_list_all:
check=zapi.trigger.get(filter={'triggerid': id_}, output=['description','status'], expandDescription=1)
if check[0]['status'] == '1':
trigger_notify_list.append([int(id_),ip])
elif check[0]['status'] == '0':
pass
else:
trigger_notify_list.append([int(id_),ip])
trigger_desc_list = [[trigger_id_to_txt(zapi, item[0]),item[1]] for item in trigger_notify_list]
return trigger_desc_list
if __name__ == '__main__':
# define config helper function
def ConfigSectionMap(section):
dict1 = {}
options = Config.options(section)
for option in options:
try:
dict1[option] = Config.get(section, option)
if dict1[option] == -1:
DebugPrint("skip: %s" % option)
except:
print("exception on %s!" % option)
dict1[option] = None
return dict1
# set default vars
defconf = os.getenv("HOME") + "/.zbx.conf"
username = ""password =""
api = ""noverify =""
# Define commandline arguments
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description='Switches the host inventory mode for the specified host(s) or hostgroup(s). The default setting is to switch to"automatic"mode.', epilog="""
This program can use .ini style configuration files to retrieve the needed API connection information.
To use this type of storage, create a conf file (the default is $HOME/.zbx.conf) that contains at least the [Zabbix API] section and any of the other parameters:
[Zabbix API]
username=johndoe
password=verysecretpassword
api=https://zabbix.mycompany.com/path/to/zabbix/frontend/
no_verify=true
""")
parser.add_argument('-u', '--username', help='User for the Zabbix api')
parser.add_argument('-p', '--password', help='Password for the Zabbix api user')
parser.add_argument('-a', '--api', help='Zabbix API URL')
parser.add_argument('--no-verify', help='Disables certificate validation when nventory_mode using a secure connection',action='store_true')
parser.add_argument('-c','--config', help='Config file location (defaults to $HOME/.zbx.conf)')
args = parser.parse_args()
# load config module
Config = ConfigParser.ConfigParser()
Config
# if configuration argument is set, test the config file
if args.config:
if os.path.isfile(args.config) and os.access(args.config, os.R_OK):
Config.read(args.config)
# if not set, try default config file
else:
if os.path.isfile(defconf) and os.access(defconf, os.R_OK):
Config.read(defconf)
# try to load available settings from config file
try:
username=ConfigSectionMap("Zabbix API")['username']
password=ConfigSectionMap("Zabbix API")['password']
api=ConfigSectionMap("Zabbix API")['api']
noverify=bool(distutils.util.strtobool(ConfigSectionMap("Zabbix API")["no_verify"]))
except:
pass
# override settings if they are provided as arguments
if args.username:
username = args.username
if args.password:
password = args.password
if args.api:
api = args.api
if args.no_verify:
noverify = args.no_verify
# test for needed params
if not username:
sys.exit("Error: API User not set")
if not password:
sys.exit("Error: API Password not set")
if not api:
sys.exit("Error: API URL is not set")
# Setup Zabbix API connection
zapi = ZabbixAPI(api)
if noverify is True:
zapi.session.verify = False
# Login to the Zabbix API
zapi.login(username, password)
# prod
operation_dict = {'127.0.0.1':['网站'],'127.0.0.1': ['接口']}
docker_trigger_list = get_trigger_list(operation_dict)
send_msg_2_wework(docker_trigger_list)
开源脚本 zhtrigfinder.py 如下,再次感谢:
#!/usr/bin/env python
#
# import needed modules.
# pyzabbix is needed, see https://github.com/lukecyca/pyzabbix
#
import argparse
import ConfigParser
import os
import os.path
import sys
import distutils.util
from pyzabbix import ZabbixAPI
# define config helper function
def ConfigSectionMap(section):
dict1 = {}
options = Config.options(section)
for option in options:
try:
dict1[option] = Config.get(section, option)
if dict1[option] == -1:
DebugPrint("skip: %s" % option)
except:
print("exception on %s!" % option)
dict1[option] = None
return dict1
# set default vars
defconf = os.getenv("HOME") + "/.zbx.conf"
username = ""password =""
api = ""noverify =""
# Define commandline arguments
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description='Tries to find triggers configured for the specified Zabbix host.', epilog="""
This program can use .ini style configuration files to retrieve the needed API connection information.
To use this type of storage, create a conf file (the default is $HOME/.zbx.conf) that contains at least the [Zabbix API] section and any of the other parameters:
[Zabbix API]
username=johndoe
password=verysecretpassword
api=https://zabbix.mycompany.com/path/to/zabbix/frontend/
no_verify=true
""")
group = parser.add_mutually_exclusive_group(required=False)
group2 = parser.add_mutually_exclusive_group(required=False)
parser.add_argument('hostname', help='Hostname to find the configured triggers on')
parser.add_argument('-u', '--username', help='User for the Zabbix api')
parser.add_argument('-p', '--password', help='Password for the Zabbix api user')
parser.add_argument('-a', '--api', help='Zabbix API URL')
parser.add_argument('--no-verify', help='Disables certificate validation when using a secure connection',action='store_true')
parser.add_argument('-c','--config', help='Config file location (defaults to $HOME/.zbx.conf)')
group.add_argument('-n', '--numeric', help='Return numeric triggerids instead of descriptions',action='store_true')
group.add_argument('-e', '--extended', help='Returns trigger id, value, status, state, severity, description and expression separated by":". See https://www.zabbix.com/documentation/2.2/manual/api/reference/trigger/object for more information.',action='store_true')
group2.add_argument('-s', '--search', help='Show only triggers with a description containing this search string')
group2.add_argument('-A', '--active', help='Show only active triggers',action='store_true')
args = parser.parse_args()
# load config module
Config = ConfigParser.ConfigParser()
Config
# if configuration argument is set, test the config file
if args.config:
if os.path.isfile(args.config) and os.access(args.config, os.R_OK):
Config.read(args.config)
# if not set, try default config file
else:
if os.path.isfile(defconf) and os.access(defconf, os.R_OK):
Config.read(defconf)
# try to load available settings from config file
try:
username=ConfigSectionMap("Zabbix API")['username']
password=ConfigSectionMap("Zabbix API")['password']
api=ConfigSectionMap("Zabbix API")['api']
noverify=bool(distutils.util.strtobool(ConfigSectionMap("Zabbix API")["no_verify"]))
except:
pass
# override settings if they are provided as arguments
if args.username:
username = args.username
if args.password:
password = args.password
if args.api:
api = args.api
if args.no_verify:
noverify = args.no_verify
# test for needed params
if not username:
sys.exit("Error: API User not set")
if not password:
sys.exit("Error: API Password not set")
if not api:
sys.exit("Error: API URL is not set")
# Setup Zabbix API connection
zapi = ZabbixAPI(api)
if noverify is True:
zapi.session.verify = False
# Login to the Zabbix API
zapi.login(username, password)
##################################
# Start actual API logic
##################################
# set the hostname we are looking for
host_name = args.hostname
hosts = zapi.host.get(output="extend", filter={"host": host_name})
if hosts:
# Find triggers
if args.search:
triggers = zapi.trigger.get(filter={'host':host_name},output='extend',search={'description':args.search},expandExpression=1,expandDescription=1)
elif args.active:
triggers = zapi.trigger.get(filter={'host':host_name,'value':1},output='extend',monitored=1,active=1,expandExpression=1,expandDescription=1)
else:
triggers = zapi.trigger.get(filter={'host':host_name},output='extend',expandExpression=1,expandDescription=1)
if triggers:
if args.extended:
# print ids and descriptions
for trigger in triggers:
print(format(trigger["triggerid"])+":"+format(trigger["value"])+":"+format(trigger["status"])+":"+format(trigger["state"])+":"+format(trigger["priority"])+":"+format(trigger["description"])+":"+format(trigger["expression"]))
else:
if args.numeric:
# print ids
for trigger in triggers:
print(format(trigger["triggerid"]))
else:
# print descriptions
for trigger in triggers:
print(format(trigger["description"]))
else:
sys.exit("Error: No matching triggers found on"+ host_name)
else:
sys.exit("Error: Could not find host"+ host_name)
# And we're done...
正文完
发表至: Zabbix运维开发
2022-06-14