生产实践:
上云前调用接口查看老平台服务的配置信息及执行相关操作
学习技巧:
Python多线程、prettytable、argparse、pymysql等库使用
脚本内容:
上云前调用接口查看老平台待迁移服务的配置信息,执行相关操作,如发布封禁操作等,将一些上云中还未完善的人肉操作脚本化。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Name: svi.py
@Version: v1.0
@Function: Call Service Platform Interfaces
@Author: anzhihe@chegva.com
@Create Date: 20210725
"""
import requests
import json
import sys,os,time
import prettytable
import argparse
import platform
import socket as sk
import threading
import queue
import pymysql
class ServiceInterface:
"""调用Service平台接口"""
def __init__(self, service_name = None):
"""长时间不用需要更新cookie"""
self.headers = {
'accept': 'application/json, text/javascript, */*; q=0.01',
'x-requested-with': 'XMLHttpRequest',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7',
'content-type': 'application/x-www-form-urlencoded',
'cookie': 'hahahahaha'
}
self.curfew_headers = {
'accept': 'application/json, text/javascript, */*; q=0.01',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
'cookie': 'hehehehehhehe',
'content-type': 'application/json'
}
self.openmq_headers = {
'authority': 'infra-api.chegva.com',
'cache-control': 'no-cache',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.164 Safari/537.36',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'cookie': 'xixixixixi'
}
self.payload = {}
self.service_name = service_name
def get_service_ip(self):
"""获取服务节点下第一个挂载实例的ip"""
url = "https://service.chegva.com/getsrvdata/?servicename={}&online=online".format(self.service_name)
response = requests.request("GET", url, headers=self.headers, data=self.payload)
get_svc_data = json.loads(response.text)
if get_svc_data["errno"] == 1:
print("\033[;31m%s 服务不存在!\033[0m" % self.service_name)
elif not get_svc_data['data']:
print("\033[;31m%s 服务没有实例!\033[0m" % self.service_name)
else:
return get_svc_data['data'][0]['ip']
def get_service_info(self):
"""获取服务元数据信息"""
for sn in self.service_name.split(','):
url = "https://service.chegva.com/getsrvdata/?servicename={}&online=online".format(sn)
response = requests.request("GET", url, headers=self.headers, data=self.payload)
get_svc_data = json.loads(response.text)
if get_svc_data["errno"] == 1:
print("%s查询报错:%s" % (sn,get_svc_data['errmsg']))
exit()
instance_list = []
if get_svc_data['data_over']['cpu'] or get_svc_data['data_over']['mem']:
cpu_mem_str = get_svc_data['data_over']['cpu'] + 'c ' + get_svc_data['data_over']['mem']
else:
cpu_mem_str = 'None'
owner_str = get_svc_data['data_over']['owner']
owner_list = [id for id in owner_str.split(',')]
owner_split = [','.join(owner_list[i:i+4]) for i in range(0, len(owner_list), 4)]
owner = '\n'.join('%s' %owner for owner in owner_split)
start_command = get_svc_data['data_over']['start_command']
for instance in get_svc_data['data']:
instance_str = instance['name'] + " " + instance['ip'] + " " + instance['host']
instance_list.append(instance_str)
dept = get_svc_data['data_over']['dept']
url_dept = "https://service.chegva.com/getdeptdata/?deptname={}&online=Online".format(dept)
response_dept = requests.request("GET", url_dept, headers=self.headers, data=self.payload)
get_dept_data = json.loads(response_dept.text)
httpport_idc = [str(instance['httpport']) + " " + instance['idc'] for instance in get_dept_data['data']
if instance['name'] == sn]
deploy_list = self.get_deploy_info_by_sn(sn)
table_header = ['service_name', 'instance & host', 'cpu & mem', 'httpport & idc',
'owner', 'deployer']
service_table = make_table(table_header, align= 'l')
service_table.add_row([sn, '\n'.join(instance_list), cpu_mem_str, httpport_idc,
owner, '\n'.join(deploy_list)])
print(service_table)
print('start_command: %s\n' % start_command)
time.sleep(1)
def get_deploy_info_by_sn(self, service_name):
"""查询服务部署信息"""
db = pymysql.connect(
host="opdb-online-reader",
db="deployxxx",
user="deployxxx",
passwd="deploy@123456",
port=3306,
charset="utf8")
cursor = db.cursor()
sql = "select * from xxx_deploy where service='%s' and env='online' " \
"order by start desc limit 3;" % service_name
try:
cursor.execute(sql)
deploy_info = cursor.fetchall()
if len(deploy_info) == 0:
deploy_list = ['None']
else:
deploy_list = []
for deploy_under_module in deploy_info:
deployer = deploy_under_module[3] + " " + str(deploy_under_module[8])
deploy_list.append(deployer)
return deploy_list
except Exception as e:
print(f'fetch error {e}')
finally:
cursor.close()
db.close()
def curfew_strategy(self):
"""添加并执行服务发布封禁策略"""
url = "https://service.chegva.com/api/strategy"
for sn in self.service_name.split(','):
payload = {
"token": "xxxxoooo",
"env": "online",
"dept": "*",
"service": sn
}
response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload))
resp_json = json.loads(response.text)
if resp_json["errno"] == 0:
print("%s添加封禁策略成功, 策略id为%d。" % (sn, resp_json["id"]))
curfew_id = resp_json["id"]
# 执行添加地封禁策略
self.curfew_service(curfew_id)
time.sleep(2)
else:
print(resp_json)
def curfew_service(self, curfew_id: int):
"""根据传入的封禁id执行封禁策略封禁服务上线"""
url = "https://service.chegva.com/api/curfew"
payload = {
"id": curfew_id,
"token": "xxxxoooo",
"action": "run"
}
response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload))
resp_json = json.loads(response.text)
if resp_json["errno"] == 0:
print("策略%d执行封禁成功。\n查看操作页面地址:https://service.chegva.com/curfew_strategy/\n" % curfew_id)
else:
print(resp_json)
def rollback_curfew(self, curfew_id: int):
"""根据传入的封禁id执行回滚封禁策略"""
url = "https://service.chegva.com/api/curfew"
payload = {
"id": curfew_id,
"token": "xxxxoooo",
"action": "rollback"
}
response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload))
resp_json = json.loads(response.text)
if resp_json["errno"] == 0:
print("%d回滚封禁策略成功。\n查看操作页面地址:https://service.chegva.com/curfew_strategy/" % curfew_id)
else:
print(resp_json)
def open_mq(self, service_name: str):
"""打开服务rocketmq流量开关"""
url = "https://infra-api.chegva.com/infra-phoenix-console/api/backdoor/xxx/service?serviceIdentity={}".format(service_name)
print(url)
response = requests.request("GET", url, headers=self.openmq_headers, data=self.payload)
print(response.text)
def get_same_vlan(self, network_segment: str):
"""根据输入的网络地址段,获得同网段的所有vlan"""
url = "http://vlan.chegva.com/getgwdata/?type={}".format(network_segment)
response = requests.request("GET", url, headers=self.headers, data=self.payload)
vlan_segment = json.loads(response.text)['gwdata'].values()
vlan_list = list(vlan_segment)[0].split('<br>')
# print("The same vlan segment:", *vlan_list, sep='\n')
same_vlans = []
same_vlans.append(network_segment)
same_vlans.append(vlan_list)
table_header = ['ip segment', 'same vlan ip segment']
vlan_table = make_table(table_header, align= 'l')
vlan_table.add_row([same_vlans[0], '\n'.join(same_vlans[1])] )
print(vlan_table)
def get_instance_id(self, service_name: str, instance_name: str, service_env: str):
"""根据实例名称获取实例id"""
url = "https://service.chegva.com/getsrvdata/?servicename={}&online={}".format(service_name, service_env)
response = requests.request("GET", url, headers=self.headers, data=self.payload)
get_svc_data = json.loads(response.text)
# [k for k,v in d.items() if k == value]
instances = [instance['id'] for instance in get_svc_data['data'] if instance['name'] == instance_name]
instance_id = instances[0]
return instance_id
def delete_instance(self, service_name: str, instances_name: str, service_env: str):
"""根据实例id删除服务下的实例"""
url = "https://service.chegva.com/postsrvdata/"
for instance_name in instances_name.split(','):
# 获取删除实例的id
instance_id = self.get_instance_id(service_name, instance_name, service_env)
# print(instance_id)
payload = "action=remove&id%5B%5D={}&servicename={}&online={}".format(instance_id, service_name, service_env)
response = requests.request("POST", url, headers=self.headers, data=payload)
if response.status_code != 200:
print("删除实例失败,请检察!")
print(response.text)
else:
print(" {} 服务删除 {} 实例成功!".format(service_name, instance_name))
# class Queue(Queue):
# def randget(self):
# from random import randrange
# self.queue.rotate(randrange(0,self._qsize()+1))
# return self.get()
def get_ip_port(options):
"""输入服务名扫描指定端口"""
PORTS = [5000, 8080]
if options.PORTS != None:
ports = [int(po) for po in options.PORTS.split(',')]
[PORTS.append(po) for po in ports if po not in PORTS]
if options.NUM not in (None, 0):
NUM = int(options.NUM)
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print('\n%s ==> Scanning IP & Port:%s with %d threads...\n' % (local_time, PORTS, NUM))
svc_dict = {}
if options.service_name:
for svc in options.service_name.split(','):
ip = ServiceInterface(svc).get_service_ip()
svc_dict[svc] = ip
else:
file = '/tmp/svc'
if not os.path.exists(file):
with open(file, 'a+') as f:
f.write("")
with open(file, 'r') as f:
services = f.readlines()
if services:
for svc in services:
svc = svc.strip('\n').strip()
ip = ServiceInterface(svc).get_service_ip()
svc_dict[svc] = ip
else:
print("\033[;31m/tmp/svc文件为空,请将要扫描的服务名写入文件。\033[0m")
iplist = list(svc_dict.values())
sq = queue.Queue()
lock = threading.Lock()
if options.genfile == True:
options.genlist = True
if options.genlist == True:
global ipdict
ipdict = {}
# global exce_ipdict
# exce_ipdict = {}
for i in range(NUM):
threading.Thread(target=scan_ip_port, args=(sq, options), daemon=True).start()
for scanip in iplist:
for port in PORTS:
sq.put((scanip, port))
sq.join()
if options.genlist == True:
for port, iplist in ipdict.items():
if options.genfile == True:
file = open(str(port) + '.txt', "a")
print("\n扫描结果已保存至当前目录{}中。".format(str(port) + '.txt'))
else:
print("\n========", port, '========')
for ip in iplist:
svc = [svc for svc, instanece_ip in svc_dict.items() if instanece_ip == ip][0]
if options.genfile == True:
file.write(svc + "\n")
else:
print(svc)
def scan_ip_port(sq, options):
"""扫描服务下第一个实例的ip和端口"""
global lock
while True:
# host, port = sq.randget()
host, port = sq.get()
sd=sk.socket(sk.AF_INET, sk.SOCK_STREAM)
sd.settimeout(2)
try:
sd.connect((host,port))
if options.genlist==True:
if port not in ipdict:
ipdict[port]=[]
ipdict[port].append(host)
else:
ipdict[port].append(host)
else:
# lock.acquire()
with lock:
print("%s:%d OPEN" % (host, port))
# lock.release()
sd.close()
except:
pass
# print("%s %d端口扫描异常!" % (host, port))
# if port not in exce_ipdict:
# exce_ipdict[port] = []
# exce_ipdict[port].append(host)
# print(exce_ipdict)
# else:
# exce_ipdict[port].append(host)
# print(exce_ipdict)
sq.task_done()
def make_table(header, align=None):
"""Make a table object for output."""
encoding = "UTF-8" if platform.system().lower() in ('linux', 'darwin') else "GBK"
table = prettytable.PrettyTable(encoding=encoding, field_names=header, align=align)
table.padding_width = 0
return table
def parse_args(argv):
"""Parse arguments/options.
this uses the new argparse module instead of optparse
see: <https://docs.python.org/3/library/argparse.html>
"""
p = argparse.ArgumentParser(description="Call Service Platform Interfaces. <author: anzhihe@chegva.com>")
p.add_argument("--service_name", "-sn", metavar="[svc1,svc2,svc3,...]", help="Input service name to query service information,separate with ','", type=str, action="store")
p.add_argument("--action", "-fj" , metavar="['curfew', 'rollback'] [service_names|strategy_id]", choices=['curfew', 'rollback'],
help=" -fj curfew [svc1,svc2,svc3,...] or -fj rollback strategy_id", type=str, action="store")
p.add_argument("services", type=str, nargs='?', help="Operate multiple services,separate with ','")
p.add_argument("--open_rocketmq", "-mq", metavar="service_name", help="Open service's rocketmq,separate with ',', -mq [svc1,svc2,svc3,...]", type=str, action="store")
p.add_argument("--ip_segment", "-is", metavar="10.xx.xx.xx/24", help="Input ip segment, list all the same vlans",
type=str, action="store")
# p.add_argument("--delete_instance", "-di", metavar="service_name instances_name online|Test", help="delete service instances",
# type=str, nargs="*", action="store")
p.add_argument("-t", "--threads", dest="NUM", help="Maximum threads, default 5",
type=int, default=5)
p.add_argument("-p", "--portlist", dest="PORTS",help="Customize port list, separate with ',' example: 22,5000,8080 ...")
p.add_argument("-l", '--genlist', action="store_true", dest="genlist", help="Output a list, ordered by port number")
p.add_argument("-L", '--genfile', action="store_true", dest="genfile", help="Put the IP list in separate files named by port number")
if len(sys.argv) == 1:
p.print_help()
sys.exit(1)
args = p.parse_args(argv)
return args
def main():
"""主函数入口"""
options = parse_args(sys.argv[1:])
if options.service_name:
ServiceInterface(options.service_name).get_service_info()
elif options.action:
if options.action == 'curfew':
ServiceInterface(options.services).curfew_strategy()
elif options.action == 'rollback':
strategy_id = int(options.services)
ServiceInterface().rollback_curfew(strategy_id)
elif options.open_rocketmq:
for sn in options.open_rocketmq.split(','):
service_name = "".join(sn)
ServiceInterface().open_mq(service_name)
time.sleep(1)
elif options.ip_segment:
network_segment = options.ip_segment
ServiceInterface().get_same_vlan(network_segment)
# elif options.delete_instance:
# service_name = options.delete_instance[0]
# instances_name = options.delete_instance[1]
# service_env = options.delete_instance[2]
# ServiceInterface().delete_instance(service_name, instances_name, service_env)
if options.genlist:
get_ip_port(options)
if __name__ == '__main__':
main()◎使用说明
1、查询服务元信息:svi -sn 服务名,追加-l,可以扫描服务端口信息:svi -sn 服务名 -l
2、扫描服务端口信息:svi -l,默认读取/tmp/svc里的服务名批量扫描端口,默认端口[5000,8080],-t 指定扫描线程数,-p 添加扫描端口,-L 会将扫描结果以 端口.txt 命名保存在当前目录
3、服务添加封禁策略:svi --action curfew 服务名,回滚封禁策略:svi --action rollback 策略id
chegva-op-machine-vm:~/opdir$ svi -h usage: svi.py [-h] [--service_name [svc1,svc2,svc3,...]] [--action ['curfew', 'rollback'] [service_names|strategy_id]] [--open_rocketmq service_name] [--ip_segment 10.xx.xx.xx/24] [-t NUM] [-p PORTS] [-l] [-L] [services] Call Service Platform Interfaces. <author: anzhihe@chegva.com> positional arguments: services Operate multiple services,separate with ',' optional arguments: -h, --help show this help message and exit --service_name [svc1,svc2,svc3,...], -sn [svc1,svc2,svc3,...] Input service name to query service information,separate with ',' --action ['curfew', 'rollback'] [service_names|strategy_id], -fj ['curfew', 'rollback'] [service_names|strategy_id] -fj curfew [svc1,svc2,svc3,...] or -fj rollback strategy_id --open_rocketmq service_name, -mq service_name Open service's rocketmq,separate with ',', -mq [svc1,svc2,svc3,...] --ip_segment 10.xx.xx.xx/24, -is 10.xx.xx.xx/24 Input ip segment, list all the same vlans -t NUM, --threads NUM Maximum threads, default 5 -p PORTS, --portlist PORTS Customize port list, separate with ',' example: 22,5000,8080 ... -l, --genlist Output a list, ordered by port number -L, --genfile Put the IP list in separate files named by port number
◎查看效果
参考:https://github.com/AnthraX1/InsightScan/blob/master/scanner.py
