共计 3595 个字符,预计需要花费 9 分钟才能阅读完成。
环境信息
- zabbix 4.2.4
- Python 3.6.8
说明
- 本脚本在生产环境久经考验
- 部分特殊场景可能会出现已经解除的告警依然会推送,原因未知
- 自行替换脚本中的企业微信推送 url,群 id,zabbix api 地址,zabbix 用户名和密码
源码
#!/usr/bin/env python
# encoding=utf-8
import datetime
import json
import time
import requests
def send_msg_to_wework(chat_id, content):
diag = {"chatid": chat_id,
"msgtype": "markdown",
"markdown": {"content": content}}
headers = {"Content-Type": "application/json"}
diag = json.dumps(diag)
requests.post('http://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxx',
data=diag, headers=headers)
def getToken(url, post_headers, url_user, url_password):
post_data = {
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"user": url_user,
"password": url_password
},
"id": 1
}
ret = requests.post(url, data=json.dumps(post_data), headers=post_headers)
return json.loads(ret.text).get("result")
def timestamp_to_string(timestamp):
# 转换成 localtime
time_local = time.localtime(timestamp)
# 转换成新的时间格式 (2016-05-05 20:28:54)
dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
return dt
def get_start_end_timestamp():
t = datetime.datetime.now()
# 当前日期
t1 = t.strftime('%Y-%m-%d %H:%M:%S')
# 转为秒级时间戳
current_time = time.mktime(time.strptime(t1, '%Y-%m-%d %H:%M:%S'))
# 12 小时前
t2 = (t - datetime.timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S")
# 转为秒级时间戳
half_day_before = time.mktime(time.strptime(t2, '%Y-%m-%d %H:%M:%S'))
return int(half_day_before), int(current_time)
def get_problem(url, post_headers, auth, time_start, time_end):
post_data = {
"jsonrpc": "2.0",
"method": "problem.get",
"params": {
"output": "extend",
"time_from": str(time_start),
"time_till": str(time_end),
"select_acknowledges": "extend",
"selectTags": "extend",
"selectSuppressionData": "extend",
"sortfield": ["eventid"],
"severities": [5, 4, 3, 2], # 5: 一级,4: 二级,3: 三级,2: 四级,
"sortorder": "DESC"
},
"auth": auth,
"id": 1
}
ret = requests.post(url, data=json.dumps(post_data), headers=post_headers)
ret_dict = json.loads(ret.text)
return ret_dict
def get_trigger_id_list(url, auth, post_headers, event_id):
post_data = {
"jsonrpc": "2.0",
"method": "event.get",
"params": {
"output": "extend",
"eventids": event_id,
"sortfield": ["eventid"],
"sortorder": "DESC"
},
"auth": auth,
"id": 1
}
ret = requests.post(url, data=json.dumps(post_data), headers=post_headers)
ret_dict = json.loads(ret.text)
return ret_dict
def get_trigger_item(url, auth, post_headers, trigger_id):
post_data = {
"jsonrpc": "2.0",
"method": "trigger.get",
"params": {
"output": "extend",
"triggerids": trigger_id,
"expandDescription": "1",
"selectHosts": "extend"
},
"auth": auth,
"id": 1
}
ret = requests.post(url, data=json.dumps(post_data), headers=post_headers)
ret_dict = json.loads(ret.text)
return ret_dict
if __name__ == '__main__':
# zabbix 监控系统接口地址
url = "http://127.0.0.1/zabbix/api_jsonrpc.php"
post_headers = {'Content-Type': 'application/json'}
url_user = "admin"
url_passwd = "admin12345"
auth = getToken(url, post_headers, url_user, url_passwd) # 获取 token 值
time_start, time_end = get_start_end_timestamp()
problem_data = get_problem(url, post_headers, auth, time_start, time_end)
result_list = problem_data['result']
event_list = []
for res in result_list:
event_list.append(str(res['eventid']))
trigger_id_list = get_trigger_id_list(url=url, auth=auth, post_headers=post_headers, event_id=event_list)
trigger_item_list = get_trigger_item(url=url, auth=auth, post_headers=post_headers,
trigger_id=[trigger['objectid'] for trigger in trigger_id_list['result']])
from pprint import pprint
title = "# <font color='warning'>** 生产 -ZABBIX- 告警信息定时统计 (最近 12 小时内仍未解除的告警, 最多显示 50 条) **</font><@all>\n"
msg_to_wework_list = []
count = 1
for trigger in trigger_item_list['result']:
trigger_name = trigger['description']
host_name = trigger['hosts'][0]['name']
msg_to_wework_list.append(str(count) + '.' + "<font color='warning'> 主机名称:</font>" + host_name +
'' +"<font color='warning'> 触发器名称:</font>" + trigger_name)
count = count + 1
send_msg_to_wework("xxxxx", title + '\n\n'.join(msg_to_wework_list[0:20]))
正文完
发表至: Zabbix运维开发
2021-11-29