Python3上云前置工作检测脚本

  • 生产实践:

    上云前调用接口查看老平台服务的配置信息及执行相关操作

  • 学习技巧:

   Python多线程、prettytable、argparse、pymysql等库使用

  • 脚本内容:      

上云前调用接口查看老平台待迁移服务的配置信息,执行相关操作,如发布封禁操作等,将一些上云中还未完善的人肉操作脚本化。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
 @Name:         svi.py
 @Version:      v1.0
 @Function:     Call Service Platform Interfaces
 @Author:       anzhihe@chegva.com
 @Create Date:  20210725
"""

import requests
import json
import sys,os,time
import prettytable
import argparse
import platform
import socket as sk
import threading
import queue
import pymysql


class ServiceInterface:
   """调用Service平台接口"""

   def __init__(self, service_name = None):
  
      """长时间不用需要更新cookie"""
      self.headers =  {
         'accept': 'application/json, text/javascript, */*; q=0.01',
         'x-requested-with': 'XMLHttpRequest',
         'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
         'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7',
         'content-type': 'application/x-www-form-urlencoded',
         'cookie': 'hahahahaha'
      }
      self.curfew_headers = {
         'accept': 'application/json, text/javascript, */*; q=0.01',
         'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36',
         'cookie': 'hehehehehhehe',
         'content-type': 'application/json'
      }
      self.openmq_headers = {
         'authority': 'infra-api.chegva.com',
         'cache-control': 'no-cache',
         'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.164 Safari/537.36',
         'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
         'cookie': 'xixixixixi'
      }
      self.payload = {}
      self.service_name = service_name

   def get_service_ip(self):
      """获取服务节点下第一个挂载实例的ip"""

      url = "https://service.chegva.com/getsrvdata/?servicename={}&online=online".format(self.service_name)
      response = requests.request("GET", url, headers=self.headers, data=self.payload)
      get_svc_data = json.loads(response.text)
      if get_svc_data["errno"] == 1:
         print("\033[;31m%s 服务不存在!\033[0m" % self.service_name)
      elif not get_svc_data['data']:
         print("\033[;31m%s 服务没有实例!\033[0m" % self.service_name)
      else:
         return get_svc_data['data'][0]['ip']

   def get_service_info(self):
      """获取服务元数据信息"""

      for sn in self.service_name.split(','):
         url = "https://service.chegva.com/getsrvdata/?servicename={}&online=online".format(sn)
         response = requests.request("GET", url, headers=self.headers, data=self.payload)
         get_svc_data = json.loads(response.text)
         if get_svc_data["errno"] == 1:
            print("%s查询报错:%s" % (sn,get_svc_data['errmsg']))
            exit()

         instance_list = []
         if get_svc_data['data_over']['cpu'] or get_svc_data['data_over']['mem']:
            cpu_mem_str = get_svc_data['data_over']['cpu'] + 'c ' + get_svc_data['data_over']['mem']
         else:
            cpu_mem_str = 'None'
         owner_str = get_svc_data['data_over']['owner']
         owner_list = [id for id in owner_str.split(',')]
         owner_split = [','.join(owner_list[i:i+4]) for i in range(0, len(owner_list), 4)]
         owner = '\n'.join('%s' %owner for owner in owner_split)
         start_command = get_svc_data['data_over']['start_command']

         for instance in get_svc_data['data']:
            instance_str = instance['name'] + " "  + instance['ip'] + " " + instance['host']
            instance_list.append(instance_str)

         dept = get_svc_data['data_over']['dept']
         url_dept = "https://service.chegva.com/getdeptdata/?deptname={}&online=Online".format(dept)
         response_dept = requests.request("GET", url_dept, headers=self.headers, data=self.payload)
         get_dept_data = json.loads(response_dept.text)
         httpport_idc = [str(instance['httpport']) + " " + instance['idc'] for instance in get_dept_data['data']
                     if instance['name'] == sn]

         deploy_list = self.get_deploy_info_by_sn(sn)

         table_header = ['service_name', 'instance & host', 'cpu & mem', 'httpport & idc',
                     'owner', 'deployer']
         service_table = make_table(table_header, align= 'l')
         service_table.add_row([sn, '\n'.join(instance_list), cpu_mem_str, httpport_idc,
                     owner, '\n'.join(deploy_list)])
         print(service_table)
         print('start_command: %s\n' % start_command)
         time.sleep(1)

   def get_deploy_info_by_sn(self, service_name):
      """查询服务部署信息"""

      db = pymysql.connect(
         host="opdb-online-reader",
         db="deployxxx",
         user="deployxxx",
         passwd="deploy@123456",
         port=3306,
         charset="utf8")
      cursor = db.cursor()

      sql = "select * from xxx_deploy where service='%s' and env='online' " \
           "order by start desc limit 3;" % service_name

      try:
         cursor.execute(sql)
         deploy_info = cursor.fetchall()
         if len(deploy_info) == 0:
            deploy_list = ['None']
         else:
            deploy_list = []
            for deploy_under_module in deploy_info:
               deployer = deploy_under_module[3] + " " + str(deploy_under_module[8])
               deploy_list.append(deployer)
         return deploy_list
      except Exception as e:
         print(f'fetch error {e}')
      finally:
         cursor.close()
         db.close()

   def curfew_strategy(self):
      """添加并执行服务发布封禁策略"""

      url = "https://service.chegva.com/api/strategy"
      for sn in self.service_name.split(','):
         payload = {
            "token": "xxxxoooo",
            "env": "online",
            "dept": "*",
            "service": sn
         }
         response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload))
         resp_json = json.loads(response.text)
         if resp_json["errno"] == 0:
            print("%s添加封禁策略成功, 策略id为%d。" % (sn, resp_json["id"]))
            curfew_id = resp_json["id"]
            # 执行添加地封禁策略
            self.curfew_service(curfew_id)
            time.sleep(2)
         else:
            print(resp_json)

   def curfew_service(self, curfew_id: int):
      """根据传入的封禁id执行封禁策略封禁服务上线"""

      url = "https://service.chegva.com/api/curfew"
      payload = {
         "id": curfew_id,
         "token": "xxxxoooo",
         "action": "run"
      }
      response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload))
      resp_json = json.loads(response.text)
      if resp_json["errno"] == 0:
         print("策略%d执行封禁成功。\n查看操作页面地址:https://service.chegva.com/curfew_strategy/\n" % curfew_id)
      else:
         print(resp_json)

   def rollback_curfew(self, curfew_id: int):
      """根据传入的封禁id执行回滚封禁策略"""

      url = "https://service.chegva.com/api/curfew"
      payload = {
         "id": curfew_id,
         "token": "xxxxoooo",
         "action": "rollback"
      }
      response = requests.request("POST", url, headers=self.curfew_headers, data=json.dumps(payload))
      resp_json = json.loads(response.text)
      if resp_json["errno"] == 0:
         print("%d回滚封禁策略成功。\n查看操作页面地址:https://service.chegva.com/curfew_strategy/" % curfew_id)
      else:
         print(resp_json)

   def open_mq(self, service_name: str):
      """打开服务rocketmq流量开关"""

      url = "https://infra-api.chegva.com/infra-phoenix-console/api/backdoor/xxx/service?serviceIdentity={}".format(service_name)
      print(url)
      response = requests.request("GET", url, headers=self.openmq_headers, data=self.payload)
      print(response.text)

   def get_same_vlan(self, network_segment: str):
      """根据输入的网络地址段,获得同网段的所有vlan"""

      url = "http://vlan.chegva.com/getgwdata/?type={}".format(network_segment)
      response = requests.request("GET", url, headers=self.headers, data=self.payload)
      vlan_segment =  json.loads(response.text)['gwdata'].values()
      vlan_list = list(vlan_segment)[0].split('<br>')
      # print("The same vlan segment:", *vlan_list, sep='\n')
      same_vlans = []
      same_vlans .append(network_segment)
      same_vlans .append(vlan_list)
      table_header = ['ip segment', 'same vlan ip segment']
      vlan_table = make_table(table_header, align= 'l')
      vlan_table.add_row([same_vlans[0], '\n'.join(same_vlans[1])] )
      print(vlan_table)

   def get_instance_id(self, service_name: str, instance_name: str, service_env: str):
      """根据实例名称获取实例id"""

      url = "https://service.chegva.com/getsrvdata/?servicename={}&online={}".format(service_name, service_env)
      response = requests.request("GET", url, headers=self.headers, data=self.payload)
      get_svc_data = json.loads(response.text)
      # [k for k,v in d.items() if k == value]
      instances = [instance['id'] for instance in get_svc_data['data'] if instance['name'] == instance_name]
      instance_id = instances[0]

      return instance_id

   def delete_instance(self, service_name: str, instances_name: str, service_env: str):
      """根据实例id删除服务下的实例"""

      url = "https://service.chegva.com/postsrvdata/"
      for instance_name in instances_name.split(','):
         # 获取删除实例的id
         instance_id = self.get_instance_id(service_name, instance_name, service_env)
         # print(instance_id)

         payload = "action=remove&id%5B%5D={}&servicename={}&online={}".format(instance_id, service_name, service_env)
         response = requests.request("POST", url, headers=self.headers, data=payload)

         if response.status_code != 200:
            print("删除实例失败,请检察!")
            print(response.text)
         else:
            print(" {} 服务删除 {} 实例成功!".format(service_name, instance_name))

# class Queue(Queue):
#  def randget(self):
#     from random import randrange
#     self.queue.rotate(randrange(0,self._qsize()+1))
#     return self.get()

def get_ip_port(options):
   """输入服务名扫描指定端口"""

   PORTS = [5000, 8080]
   if options.PORTS != None:
      ports = [int(po) for po in options.PORTS.split(',')]
      [PORTS.append(po) for po in ports if po not in PORTS]

   if options.NUM not in (None, 0):
      NUM = int(options.NUM)
      local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
      print('\n%s ==> Scanning IP & Port:%s with %d threads...\n' % (local_time, PORTS, NUM))

   svc_dict = {}

   if options.service_name:
      for svc in options.service_name.split(','):
         ip = ServiceInterface(svc).get_service_ip()
         svc_dict[svc] = ip
   else:
      file = '/tmp/svc'
      if not os.path.exists(file):
         with open(file, 'a+') as f:
            f.write("")
      with open(file, 'r') as f:
         services = f.readlines()
         if services:
            for svc in services:
               svc = svc.strip('\n').strip()
               ip = ServiceInterface(svc).get_service_ip()
               svc_dict[svc] = ip
         else:
            print("\033[;31m/tmp/svc文件为空,请将要扫描的服务名写入文件。\033[0m")

   iplist = list(svc_dict.values())

   sq = queue.Queue()
   lock = threading.Lock()

   if options.genfile == True:
      options.genlist = True
   if options.genlist == True:
      global ipdict
      ipdict = {}
      # global exce_ipdict
      # exce_ipdict = {}

   for i in range(NUM):
      threading.Thread(target=scan_ip_port, args=(sq, options), daemon=True).start()

   for scanip in iplist:
      for port in PORTS:
         sq.put((scanip, port))

   sq.join()

   if options.genlist == True:
      for port, iplist in ipdict.items():
         if options.genfile == True:
            file = open(str(port) + '.txt', "a")
            print("\n扫描结果已保存至当前目录{}中。".format(str(port) + '.txt'))
         else:
            print("\n========", port, '========')

         for ip in iplist:
            svc = [svc for svc, instanece_ip in svc_dict.items() if instanece_ip == ip][0]
            if options.genfile == True:
               file.write(svc + "\n")
            else:
               print(svc)

def scan_ip_port(sq, options):
   """扫描服务下第一个实例的ip和端口"""

   global lock
   while True:
         # host, port = sq.randget()
         host, port = sq.get()
         sd=sk.socket(sk.AF_INET, sk.SOCK_STREAM)
         sd.settimeout(2)
         try:
            sd.connect((host,port))
            if options.genlist==True:
               if port not in ipdict:
                  ipdict[port]=[]
                  ipdict[port].append(host)
               else:
                  ipdict[port].append(host)
            else:
               # lock.acquire()
               with lock:
                  print("%s:%d OPEN" % (host, port))
               # lock.release()
            sd.close()
         except:
            pass
            # print("%s %d端口扫描异常!" % (host, port))
            # if port not in exce_ipdict:
            #  exce_ipdict[port] = []
            #  exce_ipdict[port].append(host)
            #  print(exce_ipdict)
            # else:
            #  exce_ipdict[port].append(host)
            #  print(exce_ipdict)
         sq.task_done()

def make_table(header, align=None):
   """Make a table object for output."""

   encoding = "UTF-8" if platform.system().lower() in ('linux', 'darwin') else "GBK"
   table = prettytable.PrettyTable(encoding=encoding, field_names=header, align=align)
   table.padding_width = 0
   return table

def parse_args(argv):
   """Parse arguments/options.

   this uses the new argparse module instead of optparse
   see: <https://docs.python.org/3/library/argparse.html>
   """

   p = argparse.ArgumentParser(description="Call Service Platform Interfaces. <author: anzhihe@chegva.com>")
   p.add_argument("--service_name", "-sn", metavar="[svc1,svc2,svc3,...]", help="Input service name to query service information,separate with ','", type=str, action="store")
   p.add_argument("--action", "-fj" , metavar="['curfew', 'rollback'] [service_names|strategy_id]", choices=['curfew', 'rollback'],
               help=" -fj curfew [svc1,svc2,svc3,...] or -fj rollback strategy_id", type=str, action="store")
   p.add_argument("services", type=str, nargs='?', help="Operate multiple services,separate with ','")
   p.add_argument("--open_rocketmq",  "-mq", metavar="service_name", help="Open service's rocketmq,separate with ',', -mq [svc1,svc2,svc3,...]", type=str, action="store")
   p.add_argument("--ip_segment",  "-is", metavar="10.xx.xx.xx/24", help="Input ip segment, list all the same vlans",
               type=str, action="store")
   # p.add_argument("--delete_instance",  "-di", metavar="service_name instances_name online|Test", help="delete service instances",
   #              type=str, nargs="*", action="store")
   p.add_argument("-t", "--threads", dest="NUM", help="Maximum threads, default 5",
                type=int, default=5)
   p.add_argument("-p", "--portlist", dest="PORTS",help="Customize port list, separate with ',' example: 22,5000,8080 ...")
   p.add_argument("-l", '--genlist', action="store_true", dest="genlist", help="Output a list, ordered by port number")
   p.add_argument("-L", '--genfile', action="store_true", dest="genfile", help="Put the IP list in separate files named by port number")

   if len(sys.argv) == 1:
      p.print_help()
      sys.exit(1)

   args = p.parse_args(argv)
   return args

def main():
   """主函数入口"""

   options = parse_args(sys.argv[1:])

   if options.service_name:
      ServiceInterface(options.service_name).get_service_info()
   elif options.action:
      if options.action == 'curfew':
         ServiceInterface(options.services).curfew_strategy()
      elif options.action == 'rollback':
         strategy_id = int(options.services)
         ServiceInterface().rollback_curfew(strategy_id)
   elif options.open_rocketmq:
      for sn in options.open_rocketmq.split(','):
         service_name = "".join(sn)
         ServiceInterface().open_mq(service_name)
         time.sleep(1)
   elif options.ip_segment:
      network_segment = options.ip_segment
      ServiceInterface().get_same_vlan(network_segment)
   # elif options.delete_instance:
      # service_name = options.delete_instance[0]
      # instances_name = options.delete_instance[1]
      # service_env = options.delete_instance[2]
      # ServiceInterface().delete_instance(service_name, instances_name, service_env)

   if options.genlist:
      get_ip_port(options)


if __name__  ==  '__main__':
   main()

◎使用说明

1、查询服务元信息svi -sn 服务名追加-l,可以扫描服务端口信息:svi -sn 服务名 -l

2、扫描服务端口信息svi -l,默认读取/tmp/svc里的服务名批量扫描端口,默认端口[5000,8080],-t 指定扫描线程数,-p 添加扫描端口,-L 会将扫描结果以 端口.txt 命名保存在当前目录

3、服务添加封禁策略svi --action curfew 服务名,回滚封禁策略:svi --action rollback 策略id

chegva-op-machine-vm:~/opdir$ svi -h
usage: svi.py [-h] [--service_name [svc1,svc2,svc3,...]] [--action ['curfew', 'rollback'] [service_names|strategy_id]]
              [--open_rocketmq service_name] [--ip_segment 10.xx.xx.xx/24] [-t NUM] [-p PORTS] [-l] [-L]
              [services]
 
Call Service Platform Interfaces. <author: anzhihe@chegva.com>
 
positional arguments:
  services              Operate multiple services,separate with ','
 
optional arguments:
  -h, --help            show this help message and exit
  --service_name [svc1,svc2,svc3,...], -sn [svc1,svc2,svc3,...]
                        Input service name to query service information,separate with ','
  --action ['curfew', 'rollback'] [service_names|strategy_id], -fj ['curfew', 'rollback'] [service_names|strategy_id]
                        -fj curfew [svc1,svc2,svc3,...] or -fj rollback strategy_id
  --open_rocketmq service_name, -mq service_name
                        Open service's rocketmq,separate with ',', -mq [svc1,svc2,svc3,...]
  --ip_segment 10.xx.xx.xx/24, -is 10.xx.xx.xx/24
                        Input ip segment, list all the same vlans
  -t NUM, --threads NUM
                        Maximum threads, default 5
  -p PORTS, --portlist PORTS
                        Customize port list, separate with ',' example: 22,5000,8080 ...
  -l, --genlist         Output a list, ordered by port number
  -L, --genfile         Put the IP list in separate files named by port number

◎查看效果

Python3上云前置工作检测脚本


参考:https://github.com/AnthraX1/InsightScan/blob/master/scanner.py

anzhihe安志合个人博客,版权所有丨 如未注明,均为原创 丨转载请注明转自:https://chegva.com/4799.html | ☆★★每天进步一点点,加油!★★☆

您可能还感兴趣的文章!

发表评论

电子邮件地址不会被公开。 必填项已用*标注