生产实践:
用于屏蔽指定节点下的机器报警,并添加屏蔽时间和备注
脚本内容:
python时间库time、datetime使用,用于屏蔽指定节点下的机器报警,并添加屏蔽时间和备注,支持从文本读取机器列表。脚本内容如下:
#!/Users/anzhihe/.pyenv/versions/3.7.4/bin/python3 # -*- coding: utf-8 -*- # @Time : 2019-10-29 # @Author : Anzhihe # @Site : https://chegva.com # @File : pb.py import requests import datetime import time import argparse import json import hashlib import sys # 获取认证token def get_auth_header_value(caller, skey): now = int(time.time()) now = str(now - now % 300) return "Cert caller={},token={}".format(caller, token(caller, skey, now)) def token(caller_name, skey, now): return hashlib.md5("{}.{}.{}".format(now, caller_name, skey).encode('utf-8')).hexdigest() # 获取当前时间及时间戳 def time_now(): timeNow = datetime.datetime.now() begin_ts = int(time.mktime(timeNow.timetuple())) return begin_ts def map_time(stime): """日期map映射""" times = { '1h': silence_time({'hours' : 1}), '2h': silence_time({'hours' : 2}), '6h': silence_time({'hours' : 6}), '12h': silence_time({'hours' : 12}), '1d': silence_time({'days' : 1}), '2d': silence_time({'days' : 2}), '7d': silence_time({'days' : 7}) } return times.get(stime, None) def silence_time(sil_time): after_ts = (datetime.datetime.now() + datetime.timedelta(**sil_time)) end_ts = int(time.mktime(after_ts.timetuple())) return end_ts # 将unix时间戳转换为当前时间格式 #strTime = datetime.datetime.fromtimestamp(timeStamp) #print(strTime) # 读取机器列表文件 def load_hostfile(hostfile): host_list = [] with open(hostfile,'r') as f: for host in f.readlines(): host = host.strip('\n').strip() if host: host_list.append(host) hosts = ','.join(str(h) for h in host_list) return hosts # 报警屏蔽执行函数 def slience_host(hostname, begin_ts, end_ts, tree, note, token): url = "http://monitor.chegva.com/auth/v1/silence/add" if len(tree) == 0: tree = "alarm.chegva.com" if len(note) == 0: note = "报障屏蔽" #print(hostname,begin_ts,end_ts,tree,note,token) payload = { "ns": tree, "type": "host", "begin_ts": begin_ts, "end_ts": end_ts, "note": note, "hosts": hostname } headers = { 'Accept': "application/json, text/javascript, */*; q=0.01", 'Referer': "http://monitor.chegva.com/", 'Origin': "http://monitor.chegva.com", 'X-Requested-With': "XMLHttpRequest", 'User-Agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36", 'Content-Type': "application/x-www-form-urlencoded; charset=UTF-8", 'cache-control': "no-cache", 'Authorization': token, } response = requests.request("POST", url, data=json.dumps(payload), headers=headers) print(response.text) # 入口函数 def main(argv): parser = argparse.ArgumentParser(description="example:pb [chegva01.bj,chegva02.bj | -f tmp] 7d '报修屏蔽' 'alarm.chegva.com'") parser.add_argument('-f', dest='file', default='tmp', help='the name of hostfile') parser.add_argument('host', type=str, help='输入屏蔽主机名') parser.add_argument('stime', type=str, help='输入屏蔽时间') parser.add_argument('note', type=str, help='输入屏蔽原因(默认报修屏蔽)', default='报修屏蔽') parser.add_argument('tree', type=str, nargs='?', help='屏蔽机器所在节点(默认alarm.chegva.com)', default='alarm.chegva.com') args = parser.parse_args() token = get_auth_header_value("sys.op-anzhihe", "yourtoken") if argv[1] == '-f': hosts = load_hostfile(args.file) stime = args.host begin_ts = time_now() end_ts = map_time(stime) note, tree = args.stime, args.note slience_host(hosts, begin_ts, end_ts, tree, note, token) else: stime = args.stime begin_ts = time_now() end_ts = map_time(stime) slience_host(args.host, begin_ts, end_ts, args.tree, args.note, token) if __name__ == '__main__': main(sys.argv)
◎查看效果
~ pb -h usage: pb [-h] [-f FILE] host stime note [tree] example:pb [chegva01.bj,chegva02.bj | -f tmp] 7d '报修屏蔽' 'alarm.chegva.com' positional arguments: host 输入屏蔽主机名 stime 输入屏蔽时间 note 输入屏蔽原因(默认报修屏蔽) tree 屏蔽机器所在节点(默认alarm.didi.com) optional arguments: -h, --help show this help message and exit -f FILE the name of hostfile # 直接屏蔽报警机器 pb chegva01.bj,chegva02.bj 2h '机器挂载屏蔽两小时' 'test.alarm.chegva.com' # 读取文件列表屏蔽 pb -f tmp 7d '下线屏蔽' '' pb -f tmp 7d '' ''
参考: