生产实践:
上云前调用接口查看老平台服务的配置信息及执行相关操作
学习技巧:
Python多线程、prettytable、argparse、pymysql等库使用
脚本内容:
上云前调用接口查看老平台待迁移服务的配置信息,执行相关操作,如发布封禁操作等,将一些上云中还未完善的人肉操作脚本化。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Name: svi.py @Version: v1.0 @Function: Call Service Platform Interfaces @Author: anzhihe@chegva.com @Create Date: 20210725 """ import requests import json import sys,os,time import prettytable import argparse import platform import socket as sk import threading import queue import pymysql class ServiceInterface: """调用Service平台接口""" def __init__(self, service_name = None): """长时间不用需要更新cookie""" self.headers = { 'accept': 'application/json, text/javascript, */*; q=0.01', 'x-requested-with': 'XMLHttpRequest', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7', 'content-type': 'application/x-www-form-urlencoded', 'cookie': 'hahahahaha' } self.curfew_headers = { 'accept': 'application/json, text/javascript, */*; q=0.01', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36', 'cookie': 'hehehehehhehe', 'content-type': 'application/json' } self.openmq_headers = { 'authority': 'infra-api.chegva.com', 'cache-control': 'no-cache', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.164 Safari/537.36', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'cookie': 'xixixixixi' } self.payload = {} self.service_name = service_name def get_service_ip(self): """获取服务节点下第一个挂载实例的ip""" url = "https://service.chegva.com/getsrvdata/?servicename={}&online=online".format(self.service_name) response = requests.request("GET", url, headers=self.headers, data=self.payload) get_svc_data = json.loads(response.text) if get_svc_data["errno"] == 1: print("\033[;31m%s 服务不存在!\033[0m" % self.service_name) elif not get_svc_data['data']: print("\033[;31m%s 服务没有实例!\033[0m" % self.service_name) else: return get_svc_data['data'][0]['ip'] def get_service_info(self): """获取服务元数据信息""" for sn in self.service_name.split(','): url = "https://service.chegva.com/getsrvdata/?servicename={}&online=online".format(sn) response = requests.request("GET", url, headers=self.headers, data=self.payload) get_svc_data = json.loads(response.text) if get_svc_data["errno"] == 1: print("%s查询报错:%s" % (sn,get_svc_data['errmsg'])) exit() instance_list = [] if get_svc_data['data_over']['cpu'] or get_svc_data['data_over']['mem']: cpu_mem_str = get_svc_data['data_over']['cpu'] + 'c ' + get_svc_data['data_over']['mem'] else: cpu_mem_str = 'None' owner_str = get_svc_data['data_over']['owner'] owner_list = [id for id in owner_str.split(',')] owner_split = [','.join(owner_list[i:i+4]) for i in range(0, len(owner_list), 4)] owner = '\n'.join('%s' %owner for owner in owner_split) start_command = get_svc_data['data_over']['start_command'] for instance in get_svc_data['data']: instance_str = instance['name'] + " " + instance['ip'] + " " + instance['host'] instance_list.append(instance_str) dept = get_svc_data['data_over']['dept'] url_dept = "https://service.chegva.com/getdeptdata/?deptname={}&online=Online".format(dept) response_dept = requests.request("GET", url_dept, headers=self.headers, data=self.payload) get_dept_data = json.loads(response_dept.text) httpport_idc = [str(instance['httpport']) + " " + instance['idc'] for instance in get_dept_data['data'] if instance['name'] == sn] deploy_list = self.get_deploy_info_by_sn(sn) table_header = ['service_name', 'instance & host', 'cpu & mem', 'httpport & idc', 'owner', 'deployer'] service_table = make_table(table_header, align= 'l') service_table.add_row([sn, '\n'.join(instance_list), cpu_mem_str, httpport_idc, owner, '\n'.join(deploy_list)]) print(service_table) print('start_command: %s\n' % start_command) time.sleep(1) def get_deploy_info_by_sn(self, service_name): """查询服务部署信息""" db = pymysql.connect( host="opdb-online-reader", db="deployxxx", user="deployxxx", passwd="deploy@123456", port=3306, charset="utf8") cursor = db.cursor() sql = "select * from xxx_deploy where service='%s' and env='online' " \ "order by start desc limit 3;" % service_name try: cursor.execute(sql) deploy_info = cursor.fetchall() if len(deploy_info) == 0: deploy_list = ['None'] else: deploy_list = [] for deploy_under_module in deploy_info: deployer = deploy_under_module[3] + " " + str(deploy_under_module[8]) deploy_list.append(deployer) return deploy_list except Exception as e: print(f'fetch error {e}') finally: cursor.close() db.close() def curfew_strategy(self): """添加并执行服务发布封禁策略""" url = "https://service.chegva.com/api/strategy" for sn in self.service_name.split(','): payload = { "token": "xxxxoooo", "env": "online", "dept": "*", "service": sn } response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload)) resp_json = json.loads(response.text) if resp_json["errno"] == 0: print("%s添加封禁策略成功, 策略id为%d。" % (sn, resp_json["id"])) curfew_id = resp_json["id"] # 执行添加地封禁策略 self.curfew_service(curfew_id) time.sleep(2) else: print(resp_json) def curfew_service(self, curfew_id: int): """根据传入的封禁id执行封禁策略封禁服务上线""" url = "https://service.chegva.com/api/curfew" payload = { "id": curfew_id, "token": "xxxxoooo", "action": "run" } response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload)) resp_json = json.loads(response.text) if resp_json["errno"] == 0: print("策略%d执行封禁成功。\n查看操作页面地址:https://service.chegva.com/curfew_strategy/\n" % curfew_id) else: print(resp_json) def rollback_curfew(self, curfew_id: int): """根据传入的封禁id执行回滚封禁策略""" url = "https://service.chegva.com/api/curfew" payload = { "id": curfew_id, "token": "xxxxoooo", "action": "rollback" } response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload)) resp_json = json.loads(response.text) if resp_json["errno"] == 0: print("%d回滚封禁策略成功。\n查看操作页面地址:https://service.chegva.com/curfew_strategy/" % curfew_id) else: print(resp_json) def open_mq(self, service_name: str): """打开服务rocketmq流量开关""" url = "https://infra-api.chegva.com/infra-phoenix-console/api/backdoor/xxx/service?serviceIdentity={}".format(service_name) print(url) response = requests.request("GET", url, headers=self.openmq_headers, data=self.payload) print(response.text) def get_same_vlan(self, network_segment: str): """根据输入的网络地址段,获得同网段的所有vlan""" url = "http://vlan.chegva.com/getgwdata/?type={}".format(network_segment) response = requests.request("GET", url, headers=self.headers, data=self.payload) vlan_segment = json.loads(response.text)['gwdata'].values() vlan_list = list(vlan_segment)[0].split('<br>') # print("The same vlan segment:", *vlan_list, sep='\n') same_vlans = [] same_vlans.append(network_segment) same_vlans.append(vlan_list) table_header = ['ip segment', 'same vlan ip segment'] vlan_table = make_table(table_header, align= 'l') vlan_table.add_row([same_vlans[0], '\n'.join(same_vlans[1])] ) print(vlan_table) def get_instance_id(self, service_name: str, instance_name: str, service_env: str): """根据实例名称获取实例id""" url = "https://service.chegva.com/getsrvdata/?servicename={}&online={}".format(service_name, service_env) response = requests.request("GET", url, headers=self.headers, data=self.payload) get_svc_data = json.loads(response.text) # [k for k,v in d.items() if k == value] instances = [instance['id'] for instance in get_svc_data['data'] if instance['name'] == instance_name] instance_id = instances[0] return instance_id def delete_instance(self, service_name: str, instances_name: str, service_env: str): """根据实例id删除服务下的实例""" url = "https://service.chegva.com/postsrvdata/" for instance_name in instances_name.split(','): # 获取删除实例的id instance_id = self.get_instance_id(service_name, instance_name, service_env) # print(instance_id) payload = "action=remove&id%5B%5D={}&servicename={}&online={}".format(instance_id, service_name, service_env) response = requests.request("POST", url, headers=self.headers, data=payload) if response.status_code != 200: print("删除实例失败,请检察!") print(response.text) else: print(" {} 服务删除 {} 实例成功!".format(service_name, instance_name)) # class Queue(Queue): # def randget(self): # from random import randrange # self.queue.rotate(randrange(0,self._qsize()+1)) # return self.get() def get_ip_port(options): """输入服务名扫描指定端口""" PORTS = [5000, 8080] if options.PORTS != None: ports = [int(po) for po in options.PORTS.split(',')] [PORTS.append(po) for po in ports if po not in PORTS] if options.NUM not in (None, 0): NUM = int(options.NUM) local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print('\n%s ==> Scanning IP & Port:%s with %d threads...\n' % (local_time, PORTS, NUM)) svc_dict = {} if options.service_name: for svc in options.service_name.split(','): ip = ServiceInterface(svc).get_service_ip() svc_dict[svc] = ip else: file = '/tmp/svc' if not os.path.exists(file): with open(file, 'a+') as f: f.write("") with open(file, 'r') as f: services = f.readlines() if services: for svc in services: svc = svc.strip('\n').strip() ip = ServiceInterface(svc).get_service_ip() svc_dict[svc] = ip else: print("\033[;31m/tmp/svc文件为空,请将要扫描的服务名写入文件。\033[0m") iplist = list(svc_dict.values()) sq = queue.Queue() lock = threading.Lock() if options.genfile == True: options.genlist = True if options.genlist == True: global ipdict ipdict = {} # global exce_ipdict # exce_ipdict = {} for i in range(NUM): threading.Thread(target=scan_ip_port, args=(sq, options), daemon=True).start() for scanip in iplist: for port in PORTS: sq.put((scanip, port)) sq.join() if options.genlist == True: for port, iplist in ipdict.items(): if options.genfile == True: file = open(str(port) + '.txt', "a") print("\n扫描结果已保存至当前目录{}中。".format(str(port) + '.txt')) else: print("\n========", port, '========') for ip in iplist: svc = [svc for svc, instanece_ip in svc_dict.items() if instanece_ip == ip][0] if options.genfile == True: file.write(svc + "\n") else: print(svc) def scan_ip_port(sq, options): """扫描服务下第一个实例的ip和端口""" global lock while True: # host, port = sq.randget() host, port = sq.get() sd=sk.socket(sk.AF_INET, sk.SOCK_STREAM) sd.settimeout(2) try: sd.connect((host,port)) if options.genlist==True: if port not in ipdict: ipdict[port]=[] ipdict[port].append(host) else: ipdict[port].append(host) else: # lock.acquire() with lock: print("%s:%d OPEN" % (host, port)) # lock.release() sd.close() except: pass # print("%s %d端口扫描异常!" % (host, port)) # if port not in exce_ipdict: # exce_ipdict[port] = [] # exce_ipdict[port].append(host) # print(exce_ipdict) # else: # exce_ipdict[port].append(host) # print(exce_ipdict) sq.task_done() def make_table(header, align=None): """Make a table object for output.""" encoding = "UTF-8" if platform.system().lower() in ('linux', 'darwin') else "GBK" table = prettytable.PrettyTable(encoding=encoding, field_names=header, align=align) table.padding_width = 0 return table def parse_args(argv): """Parse arguments/options. this uses the new argparse module instead of optparse see: <https://docs.python.org/3/library/argparse.html> """ p = argparse.ArgumentParser(description="Call Service Platform Interfaces. <author: anzhihe@chegva.com>") p.add_argument("--service_name", "-sn", metavar="[svc1,svc2,svc3,...]", help="Input service name to query service information,separate with ','", type=str, action="store") p.add_argument("--action", "-fj" , metavar="['curfew', 'rollback'] [service_names|strategy_id]", choices=['curfew', 'rollback'], help=" -fj curfew [svc1,svc2,svc3,...] or -fj rollback strategy_id", type=str, action="store") p.add_argument("services", type=str, nargs='?', help="Operate multiple services,separate with ','") p.add_argument("--open_rocketmq", "-mq", metavar="service_name", help="Open service's rocketmq,separate with ',', -mq [svc1,svc2,svc3,...]", type=str, action="store") p.add_argument("--ip_segment", "-is", metavar="10.xx.xx.xx/24", help="Input ip segment, list all the same vlans", type=str, action="store") # p.add_argument("--delete_instance", "-di", metavar="service_name instances_name online|Test", help="delete service instances", # type=str, nargs="*", action="store") p.add_argument("-t", "--threads", dest="NUM", help="Maximum threads, default 5", type=int, default=5) p.add_argument("-p", "--portlist", dest="PORTS",help="Customize port list, separate with ',' example: 22,5000,8080 ...") p.add_argument("-l", '--genlist', action="store_true", dest="genlist", help="Output a list, ordered by port number") p.add_argument("-L", '--genfile', action="store_true", dest="genfile", help="Put the IP list in separate files named by port number") if len(sys.argv) == 1: p.print_help() sys.exit(1) args = p.parse_args(argv) return args def main(): """主函数入口""" options = parse_args(sys.argv[1:]) if options.service_name: ServiceInterface(options.service_name).get_service_info() elif options.action: if options.action == 'curfew': ServiceInterface(options.services).curfew_strategy() elif options.action == 'rollback': strategy_id = int(options.services) ServiceInterface().rollback_curfew(strategy_id) elif options.open_rocketmq: for sn in options.open_rocketmq.split(','): service_name = "".join(sn) ServiceInterface().open_mq(service_name) time.sleep(1) elif options.ip_segment: network_segment = options.ip_segment ServiceInterface().get_same_vlan(network_segment) # elif options.delete_instance: # service_name = options.delete_instance[0] # instances_name = options.delete_instance[1] # service_env = options.delete_instance[2] # ServiceInterface().delete_instance(service_name, instances_name, service_env) if options.genlist: get_ip_port(options) if __name__ == '__main__': main()
◎使用说明
1、查询服务元信息:svi -sn 服务名,追加-l,可以扫描服务端口信息:svi -sn 服务名 -l
2、扫描服务端口信息:svi -l,默认读取/tmp/svc里的服务名批量扫描端口,默认端口[5000,8080],-t 指定扫描线程数,-p 添加扫描端口,-L 会将扫描结果以 端口.txt 命名保存在当前目录
3、服务添加封禁策略:svi --action curfew 服务名,回滚封禁策略:svi --action rollback 策略id
chegva-op-machine-vm:~/opdir$ svi -h usage: svi.py [-h] [--service_name [svc1,svc2,svc3,...]] [--action ['curfew', 'rollback'] [service_names|strategy_id]] [--open_rocketmq service_name] [--ip_segment 10.xx.xx.xx/24] [-t NUM] [-p PORTS] [-l] [-L] [services] Call Service Platform Interfaces. <author: anzhihe@chegva.com> positional arguments: services Operate multiple services,separate with ',' optional arguments: -h, --help show this help message and exit --service_name [svc1,svc2,svc3,...], -sn [svc1,svc2,svc3,...] Input service name to query service information,separate with ',' --action ['curfew', 'rollback'] [service_names|strategy_id], -fj ['curfew', 'rollback'] [service_names|strategy_id] -fj curfew [svc1,svc2,svc3,...] or -fj rollback strategy_id --open_rocketmq service_name, -mq service_name Open service's rocketmq,separate with ',', -mq [svc1,svc2,svc3,...] --ip_segment 10.xx.xx.xx/24, -is 10.xx.xx.xx/24 Input ip segment, list all the same vlans -t NUM, --threads NUM Maximum threads, default 5 -p PORTS, --portlist PORTS Customize port list, separate with ',' example: 22,5000,8080 ... -l, --genlist Output a list, ordered by port number -L, --genfile Put the IP list in separate files named by port number
◎查看效果
参考:https://github.com/AnthraX1/InsightScan/blob/master/scanner.py