Python 运用Paramiko实现批量巡检

通过封装Paramiko这个SSH模块,我们可以实现远程批量管理Linux主机,在此基础上配合钉钉API接口可实现自动告警机制,定期自动检查设备状态,并推送到钉钉群内。

首先需要配置双网卡模式,我们将无线网卡配置路由让其走外网与钉钉连接,有线网口则负责与内部服务器相连接,只需要配置路由即可实现。

网络目标        网络掩码          网关       接口   跃点数
0.0.0.0          0.0.0.0      132.35.93.1     132.35.93.11     21
0.0.0.0          0.0.0.0    192.168.191.1    192.168.191.3     25 

C:\Windows\system32> route delete 0.0.0.0

# 所有的外网访问走无线网卡,从网关 192.168.191.1 出去
C:\Windows\system32>route -p add 0.0.0.0 mask 0.0.0.0 192.168.191.1

# 如果是132网段,则走内部,网关为:132.35.93.1
C:\Windows\system32>route -p add 132.35.0.0 mask 255.255.0.0 132.35.93.1

封装钉钉接口: 接口的调用需要传入需要通知特定人的手机号,这个模块命名为Ding.py 代码如下。

import requests
import urllib.parse
import datetime,time,hmac,hashlib,base64,json

class DingToken():
    def __init__(self,atAll,atMobiles):
        self.atAll = atAll
        self.atMobiles = atMobiles

    def send_message(self,message):
        timestamp = str(round(time.time() * 1000))
        secret = 'SEC1018485caf7339e38530b'
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(timestamp,secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc,string_to_sign_enc,digestmod=hashlib.sha256).digest()
        sign = urllib.parse.quote(base64.b64encode(hmac_code))

        headers={'Content-Type': 'application/json'}
        webhook = 'https://oapi.dingtalk.com/robot/send?access_token=0fe10f&timestamp=' + timestamp + "&sign=" + sign
        data = {
            "msgtype": "text","text": {"content": message },"at": {
                "atMobiles": [ self.atMobiles ],"isAtAll": self.atAll
                }
            }
        requests.post(webhook,data=json.dumps(data),headers=headers)

    # 发送警告信息
    def send_warning(self,platform,person,group,address,send_date,type,message):
        self.send_message(
                    "------------------------------------------------------- \n"
                    "\t\t\t\t\t {0} \n"
                    "------------------------------------------------------- \n"
                    "维护人员: \t {1} \n"
                    "所在分组: \t {2} \n"
                    "系统地址: \t {3} \n"
                    "告警日期: \t {4} \n"
                    "告警类型: \t {5} \n"
                    "------------------------------------------------------- \n"
                    "{6} \n"
                    "-------------------------------------------------------".
                        format(platform,message) )

    # 发送Ping连通性报告
    def send_ping(self,success_len,error_len,error_list):
        self.send_message(
            "------------------------------------------------------- \n"
            "\t\t\t\t\t {0} \n"
            "------------------------------------------------------- \n"
            "[*] 日期: \t {1} \n[+] 连通主机数: \t {2} \n[-] 失败主机数: \t {3} \n"
            "------------------------------------------------------- \n"
            "失败主机列表: \n {4} \n".format(platform,error_list)
        )

if __name__ == "__main__":
    ding = DingToken(False,"15646596977")
    ding.send_warning("总部客服","王瑞","CTI服务组","192.168.1.1","2021:01:01","磁盘异常","C:// \t 100% \n")


封装好的MySSH模块: 接着就是封装一个拉取数据到本地的SSH模块,这个模块命名为 MySSH.py 代码如下。

import paramiko,math,json

class MySSH:
    def __init__(self,username,password,default_port):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password

    def Init(self):
        try:
            self.ssh_obj = paramiko.SSHClient()
            self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False)
            self.sftp_obj = self.ssh_obj.open_sftp()
        except Exception:
            return False

    def BatchCMD(self,command):
        try:
            stdin,stdout,stderr = self.ssh_obj.exec_command(command,timeout=3)
            result = stdout.read()
            if len(result) != 0:
                result = str(result).replace("\\n","\n")
                result = result.replace("b'","").replace("'","")
                return result
            else:
                return None
        except Exception:
            return None

    def CloseSSH(self):
        try:
            self.sftp_obj.close()
            self.ssh_obj.close()
        except Exception:
            pass

    def GetSystemVersion(self):
        return self.BatchCMD("uname")

    # 测试主机连通率
    def GetPing(self):
        try:
            if self.GetSystemVersion() != None:
                print("{} 已连通.".format(self.address))
                return True
            else:
                return False
        except Exception:
            return False

    # 拉取磁盘数据到本地,并返回字典
    def GetAllDiskSpace(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'","AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version,run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
            print("利用率字典: {}".format(ref_dict))
            return ref_dict
        except Exception:
            return False

    # 拉取内存数据到本地。
    def GetAllMemSpace(self):
        cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'","AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version,run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n","")) / 1024)
                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024)
                    percentage = 100 - int(mem_free / int(mem_total / 100))
                    print("利用百分比: {}  \t 总内存: {}  \t 剩余内存: {}".format(percentage,mem_total,mem_free))
                    return str(percentage) + " %"
        except Exception:
            return False

    # 获取CPU利用率数据
    def GetCPUPercentage(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'","AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version,run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.split(":")
                            ref_dict = {"us": each[0],"sys": each[1],"idea": each[2]}
            print("CPU利用率数据: {}".format(ref_dict))
            return ref_dict
        except Exception:
            return False

    # 获取到系统负载利用率 也就是一分钟负载五分钟负载十五分钟负载
    def GetLoadAVG(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'","AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version,run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.replace(",","").split(":")
                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
                            print("负载利用率: {}".format(ref_dict))
                            return ref_dict
            return False
        except Exception:
            return False

    # 检测指定进程是否存活
    def CheckProcessStatus(self,processname):
        cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"),"AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version,run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
                    if ret_flag != "0":
                        return True
            return False
        except Exception:
            return "None"

    # 检测指定端口是否存活
    def CheckPortStatus(self,port):
        cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}"),"AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version,"").strip())
                    if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED":
                        return True
            return False
        except Exception:
            return False

定义巡检过程: 这个模块是最重要的一个模块,主要负责解析JSON文件并巡检,该模块我们就命名为system.py,代码如下:

from Ding import DingToken
from MySSH import MySSH
import os,sys,datetime,json

# --------------------------------------------------------------------------------------------------
# 对所有主机设备进行
def switch_ping():
    with open("./config.json","r",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        success,error = [],[]
        try:
            for system_list in system:
                for k,v in system_list.items():
                    for item in v:
                        now = datetime.datetime.now()
                        print("[+] 监控范围: Ping测试 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(now,'%Y-%m-%d %H:%M:%S'),item[0]))
                        #InspectCPU(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("cpu_limit"))
                        ssh = MySSH(item[0],ptr.get("default-port"))
                        ssh.Init()
                        ref = ssh.GetPing()
                        if ref == True:
                            success.append(item[0])
                        else:
                            error.append(item[0])
            ding = DingToken(False,ptr.get("telephone"))
            ding.send_ping(ptr.get("platform"),len(success),len(error),error)
        except Exception:
            pass

# --------------------------------------------------------------------------------------------------
# 磁盘告警流程
# 手机号 地址 账号 密码 端口 系统名称 姓名 分组 磁盘最大值
def InspectDisk(telephone,port,disk_limit):
    ding = DingToken(False,telephone)
    ssh = MySSH(address,port)
    ssh.Init()
    ret = ssh.GetAllDiskSpace()
    ssh.CloseSSH()
    if ret != False:
        for k,v in ret.items():
            try:
                space = eval(v.split("%")[0])
                if space >= int(disk_limit):
                    now = datetime.datetime.now()
                    strnow = datetime.datetime.strftime(now,'%Y-%m-%d %H:%M:%S')
                    ding.send_warning(platform,strnow,"磁盘过载","[ 分区: {} \t | \t 负载率: {} ]\n".format(k,v))
            except Exception:
                pass

def switch_inspect_disk():
    with open("./config.json",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        for system_list in system:
            # 循环所有字典
            for k,v in system_list.items():
                # 循环每个分组内的主机
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 监控范围: 磁盘检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))
                    # InspectDisk("15646596977","192.168.191.4","root","1233","22","总部客服(呼叫中心平台)","CTI 3.4",80)
                    InspectDisk(ptr.get("telephone"),ptr.get("disk_limit"))

# --------------------------------------------------------------------------------------------------
# 内存告警流程
# InspectMemory("15646596977","192.168.191.3",22,"总部客服系统","CTI组",内存阈值)
def InspectMemory(telephone,memory_limit):
    ding = DingToken(False,port)
    ssh.Init()
    ret = ssh.GetAllMemSpace()
    ssh.CloseSSH()
    if ret != False:
        try:
            space = eval(ret.split("%")[0])
            if space >= int(memory_limit):
                now = datetime.datetime.now()
                strnow = datetime.datetime.strftime(now,'%Y-%m-%d %H:%M:%S')
                ding.send_warning(platform,"内存过载","[ 负载率: \t | \t {} ]\n".format(ret))
        except Exception:
            pass

def switch_inspect_memory():
    with open("./config.json",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        for system_list in system:
            for k,v in system_list.items():
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 监控范围: 内存检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))
                    InspectMemory(ptr.get("telephone"),ptr.get("memory_limit"))


# --------------------------------------------------------------------------------------------------
# 巡检CPU利用率
def InspectCPU(telephone,cpu_limit):
    ding = DingToken(False,port)
    ssh.Init()
    ret = ssh.GetCPUPercentage()
    ssh.CloseSSH()
    if ret != False:
        try:
            space = int(100 - int(ret.get("idea")))
            us = ret.get("us") + "%"
            sys = ret.get("sys") + "%"
            if space >= int(cpu_limit):
                print("CPU 总利用率: {}".format(space))
                now = datetime.datetime.now()
                strnow = datetime.datetime.strftime(now,"CPU 利用率过载","[ 用户态: \t | \t {} ]\n[ 内核态: \t | \t {} ]\n[ 总利用率: \t | \t {}% ]\n".format(us,space))
        except Exception:
            pass

def switch_inspect_cpu():
    with open("./config.json",v in system_list.items():
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 监控范围: CPU检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))
                    InspectCPU(ptr.get("telephone"),ptr.get("cpu_limit"))

# --------------------------------------------------------------------------------------------------
# 巡检系统平均负载
def InspectLoadAvg(telephone,load_avg_limit):
    ding = DingToken(False,port)
    ssh.Init()
    ret = ssh.GetLoadAVG()
    ssh.CloseSSH()
    if ret != False:
        try:
            if float(ret.get("1avg")) >= float(load_avg_limit[0]) or float(ret.get("5avg")) >= float(load_avg_limit[1]) or float(ret.get("15avg")) >= float(load_avg_limit[2]):
                now = datetime.datetime.now()
                strnow = datetime.datetime.strftime(now,"LoadAvg 过载","[ 一分钟负载: \t | \t {} ]\n[ 五分钟负载: \t | \t {} ]\n[ 十五分钟负载: \t | \t {} ]\n".
                                  format(ret.get("1avg"),ret.get("5avg"),ret.get("15avg")))
        except Exception:
            pass

def switch_inspect_avg():
    with open("./config.json",v in system_list.items():
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 监控范围: 系统负载检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))
                    InspectLoadAvg(ptr.get("telephone"),list(ptr.get("load_avg_limit")))

# --------------------------------------------------------------------------------------------------
# 巡检系统端口是否开放
# 首先传入一个IP地址,解析出其用户名密码表
def GetAddressAsPasswd(address):
    with open("./config.json",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        for each_list in ptr.get("system"):
            for k,v in each_list.items():
                for ser in v:
                    if ser[0] == address:
                        return ser
    return False

# 拼接字符串数据
def append_string(src_str,append_str):
    for x in range(len(append_str)):
        src_str += append_str[x]
    return src_str

# 检查端口是否被关闭了,如果关闭了则提示关闭
def InspectPort(telephone,person):
    with open("./config.json",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        port = ptr.get("port")
        default_port = ptr.get("default-port")
        for each_list in port:
            for k,v in each_list.items():
                for item in v:
                    this_user_passwd = GetAddressAsPasswd(item[0])
                    if this_user_passwd != False:
                        try:
                            now = datetime.datetime.now()
                            print("[+] 监控范围: 端口状态检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(
                                                                                                      now,item[0]))
                            ssh = MySSH(this_user_passwd[0],this_user_passwd[1],this_user_passwd[2],default_port)
                            ssh.Init()

                            close_port_list = []
                            for check_port in range(1,len(item)):
                                ref = ssh.CheckPortStatus(item[check_port])
                                print("端口ID: {} \t 状态位: {} 存活端口: {}".format(check_port,ref,item[check_port]))
                                if ref == False or ref == "None":
                                    close_port_list.append(item[check_port])
                                    print("端口ID: {} \t 状态位: {} 关闭端口: {}".format(check_port,item[check_port]))

                            ding = DingToken(False,telephone)
                            send = ""
                            for i in close_port_list:
                                im = "[ 端口: {0} \t\t | \t 状态: 已关闭 ]\n".format(i)
                                send = append_string(send,im)

                            if(send != ""):
                                ding.send_warning(platform,this_user_passwd[0],datetime.datetime.strftime(datetime.datetime.now(),"端口关闭",send)
                            ssh.CloseSSH()
                        except Exception:
                            pass

def switch_inspect_port():
    with open("./config.json",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        telephone = ptr.get("telephone")
        platform = ptr.get("platform")
        person = ptr.get("person")
        InspectPort(telephone,person)


# --------------------------------------------------------------------------------------------------
# 巡检系统是否开放指定进程
def InspectProcess(telephone,encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        process = ptr.get("process")
        default_port = ptr.get("default-port")
        for each_list in process:
            for k,v in each_list.items():
                for item in v:
                    this_user_passwd = GetAddressAsPasswd(item[0])
                    if this_user_passwd != False:
                        try:
                            now = datetime.datetime.now()
                            print("[+] 监控范围: 进程状态检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,default_port)
                            ssh.Init()

                            close_process_list = []
                            for check_process in range(1,len(item)):
                                ref = ssh.CheckProcessStatus(item[check_process])
                                print("进程ID: {} \t 状态位: {} 存活进程: {}".format(check_process,item[check_process]))
                                if ref == False or ref == "None":
                                    close_process_list.append(item[check_process])
                                    print("进程ID: {} \t 状态位: {} 关闭进程: {}".format(check_process,item[check_process]))

                            ding = DingToken(False,telephone)
                            send = ""
                            for i in close_process_list:
                                im = "[ 进程: {0} \t\t | \t 状态: 已关闭 ]\n".format(i)
                                send = append_string(send,"进程退出",send)
                            ssh.CloseSSH()
                        except Exception:
                            pass

def switch_inspect_process():
    with open("./config.json",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        telephone = ptr.get("telephone")
        platform = ptr.get("platform")
        person = ptr.get("person")
        InspectProcess(telephone,person)

以磁盘为例,当超过了我们定义好的阈值时,则会通过钉钉提示用户告警,告警提示如下。

定义配置文件: 配置文件则是巡检时需要解析的内容,我们需要依次写入账号密码等信息。

{
    "system_config":
    [
        {"ping_": "True"},{"disk_": "True"},{"cpu_": "False"},{"loadavg_": "False"},{"memory_": "False"},{"process_": "False"},{"port_": "False"}
    ],"platform": "总部客服系统(呼叫中心)","person": "王瑞","telephone": "18264825669","e-mail": "admin@lyshark.com","default-port": "22","set-timeout": "3600","disk_limit": 75,"memory_limit": 95,"cpu_limit": 90,"load_avg_limit": ["5.0","8.0","10.0"],"system":
    [
        {
        "Centos 服务器组":
            [
                ["192.168.1.1","1233"],["192.168.1.1","1233"]
            ],"AIX 服务器组":
            [
                ["192.168.1.1","1233"]
            ]
        }
    ],"process":
    [
        {
        "CTI进程组":
            [
                ["192.168.1.1","icdcomm","aplogic","ctilink","cnfgsvr","mcp"],"mcp","ccsapp","nis","oas","ivr","ctiserver"]
            ]
        }
    ],"port":
    [
        {
        "CTI端口检测":
            [
                ["192.168.1.1","111","631","8888","19001","25","2556","5150","5600","34902","10000","34901","10005"],"8080","10004","10005","60661"]
            ]
        }
    ]
}

定义main入口代码: 入口代码主要负责解析参数与巡检,由于怕影响服务器性能,所有没加多线程支持,不过也够用了。

import system
import json,time

if __name__ == "__main__":
    with open("./config.json",encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())

        timeout = ptr.get("set-timeout")
        configure = ptr.get("system_config")

        # 默认开关全部关闭
        disk_ = "False"
        cpu_ = "False"
        memory_ = "False"
        loadavg_ = "False"
        process_ = "False"
        port_ = "False"
        ping_ = "False"

        # 判断配置文件开关是否开启,如果开启了则将内存的开关开启
        for dic in configure:
            for k,v in dic.items():
                if k == "ping_" and v == "True":
                    ping_ = "True"
                if k == "disk_" and v == "True":
                    disk_ = "True"
                if k == "cpu_" and v == "True":
                    cpu_ = "True"
                if k == "memory_" and v == "True":
                    memory_ = "True"
                if k == "loadavg_" and v == "True":
                    loadavg_ = "True"
                if k == "process_" and v == "True":
                    process_ = "True"
                if k == "port_" and v == "True":
                    port_ = "True"

        while True:
            if ping_ == "True":
                system.switch_ping()
            if disk_ == "True":
                system.switch_inspect_disk()
            if cpu_ == "True":
                system.switch_inspect_cpu()
            if memory_ == "True":
                system.switch_inspect_memory()
            if loadavg_ == "True":
                system.switch_inspect_avg()
            if process_ == "True":
                system.switch_inspect_process()
            if port_ == "True":
                system.switch_inspect_port()
            time.sleep(int(timeout))

实现正则解析: 前面的配置无法实现交互式问答,智能机器人推送数据,很被动,我们需要开启交互机器人,自己实现业务逻辑。

import os,re

def participle(text):
    ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text)
    address = ".".join(ref[0])
    select = re.findall('查询',text)

    if(len(address) !=0 and len(select) != 0):
        if len (re.findall('内存',text)) != 0:
            return [address,"内存"]
        elif len (re.findall('磁盘',"磁盘"]
        elif len (re.findall('负载',"负载"]

if __name__ == "__main__":
    text = "帮我查询192.168.1.1主机的内存负载情况"
    text2 = "帮我查询 192.168.1.200 这 台设 备的 磁盘 使用率"
    text3 = "需要查询192.168.1.200 设备中负载使用情况,请反馈结果"
    text4 = "发现负载出现告警,请立即查询192.168.1.200这台设备的负载使用率,并立即反馈"

    print(participle(text))
    print(participle(text2))
    print(participle(text3))
    print(participle(text4))

    txt = '''
    主编单位:总部客服系统
    '''
    addr = re.findall('(主编单位:.*?)\s',txt)[0]
    print(addr)

这个案例,从文本中提取可用信息,并将其返回为列表格式。

接着配置钉钉开发者平台机器人,此处需要有公网地址,作用是,钉钉群有人at机器人时,机器人会将请求post发送到我们的django应用服务上。

使用django配置接收请求并处理即可,处理代码如下所示。

from django.http import HttpResponse,JsonResponse
import json,base64
import sys,os,re

# 机器人 app_secret
app_secret = "LcKEf0nlqe"

# 根据IP查询操作使用
def select_participle(text):
    ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text)

    # 判断如果地址不为空,则进行查询等操作
    if len(ref) != 0:
        address = ".".join(ref[0])
        select = re.findall('查询',text)
        if(len(address) !=0 and len(select) != 0):
            if len (re.findall('内存',text)) != 0:
                return [address,"内存"]
            elif len (re.findall('负载',"负载"]
    else:
        if len(re.findall('主机列表',text)) != 0:
            return ["主机列表"]

def index(request):
    if request.method == "POST":
        HTTP_SIGN = request.META.get("HTTP_SIGN")
        HTTP_TIMESTAMP = request.META.get("HTTP_TIMESTAMP")
        res = json.loads(request.body)

        # 接收用户的输入并做处理
        sendnick = res.get("senderNick")
        is_admin = res.get("isAdmin")
        conver = res.get("conversationTitle")
        content = res.get("text").get("content")
        print("发送人: {} 是否管理员: {} 所在组: {} 接收数据: {}".format(sendnick,is_admin,conver,content))

        # 数据的加密解密
        string_to_sign = '{}\n{}'.format(HTTP_TIMESTAMP,app_secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(app_secret.encode("utf-8"),digestmod=hashlib.sha256).digest()
        sign = base64.b64encode(hmac_code).decode("utf-8")

        ref_select_text = select_participle(content)

        if len(ref_select_text) == 2:
            if sign == HTTP_SIGN:
                if "负载" in ref_select_text[1]:
                    return JsonResponse(
                    {"msgtype": "text","text": {
                             "content": "经过查询,主机: {} 负载率为: 10% 当前为正常状态".format(ref_select_text[0])
                         }
                    })

                if "内存" in ref_select_text[1]:
                    return JsonResponse\
                    (
                        {"msgtype": "text","text":
                            {
                             "content": "经过查询,主机: {} 内存占用率为: 70% 当前为正常状态".format(ref_select_text[0])
                            }
                         }
                    )
        elif len(ref_select_text) == 1:
            if "主机列表" in ref_select_text[0]:
                return JsonResponse \
                        (
                        {"msgtype": "text","text":
                             {
                                 "content": "目前您所管理的主机,总结: 205台"
                             }
                         }
                    )
        return JsonResponse({"error": "没有权限"})
    if request.method == "GET":
        return HttpResponse("没有权限")

django运行后,我们回到钉钉群内,问机器人一些问题看看吧,前提是这些问题已经定义了正则解析。

django 后台则能够接收到,这些数据,并做出回应。

原文地址:https://www.cnblogs.com/LyShark

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用OpenCV实现视频去抖 整体步骤: 设置输入输出视频 寻找帧之间的移动:使用opencv的特征检测器,检测前一帧的特征,并使用Lucas-Kanade光流算法在下一帧跟踪这些特征,根据两组点,将前一个坐标系映射到当前坐标系完成刚性(欧几里得)变换,最后使用数组纪录帧之间的运动。 计算帧之间的平
前言 对中文标题使用余弦相似度算法和编辑距离相似度分析进行相似度分析。 准备数据集part1 本次使用的数据集来源于前几年的硕士学位论文,可根据实际需要更换。结构如下所示: 学位论文题名 基于卷积神经网络的人脸识别研究 P2P流媒体视频点播系统设计和研究 校园网安全体系的设计与实现 无线传感器网络中
前言 之前尝试写过一个爬虫,那时对网页请求还不够熟练,用的原理是:爬取整个html文件,然后根据标签页筛选有效信息。 现在看来这种方式无疑是吃力不讨好,因此现在重新写了一个爬取天气的程序。 准备工作 网上能轻松找到的是 101010100 北京这种编号,而查看中国气象局URL,他们使用的是北京545
前言 本文使用Python实现了PCA算法,并使用ORL人脸数据集进行了测试并输出特征脸,简单实现了人脸识别的功能。 1. 准备 ORL人脸数据集共包含40个不同人的400张图像,是在1992年4月至1994年4月期间由英国剑桥的Olivetti研究实验室创建。此数据集包含40个类,每个类含10张图
前言 使用opencv对图像进行操作,要求:(1)定位银行票据的四条边,然后旋正。(2)根据版面分析,分割出小写金额区域。 图像校正 首先是对图像的校正 读取图片 对图片二值化 进行边缘检测 对边缘的进行霍夫曼变换 将变换结果从极坐标空间投影到笛卡尔坐标得到倾斜角 根据倾斜角对主体校正 import
天气预报API 功能 从中国天气网抓取数据返回1-7天的天气数据,包括: 日期 天气 温度 风力 风向 def get_weather(city): 入参: 城市名,type为字符串,如西安、北京,因为数据引用中国气象网,因此只支持中国城市 返回: 1、列表,包括1-7的天气数据,每一天的分别为一个
数据来源:House Prices - Advanced Regression Techniques 参考文献: Comprehensive data exploration with Python 1. 导入数据 import pandas as pd import warnings warnin
同步和异步 同步和异步是指程序的执行方式。在同步执行中,程序会按顺序一个接一个地执行任务,直到当前任务完成。而在异步执行中,程序会在等待当前任务完成的同时,执行其他任务。 同步执行意味着程序会阻塞,等待任务完成,而异步执行则意味着程序不会阻塞,可以同时执行多个任务。 同步和异步的选择取决于你的程序需
实现代码 import time import pydirectinput import keyboard if __name__ == '__main__': revolve = False while True: time.sleep(0.1) if keyboard.is_pr
本文从多个角度分析了vi编辑器保存退出命令。我们介绍了保存和退出vi编辑器的命令,以及如何撤销更改、移动光标、查找和替换文本等实用命令。希望这些技巧能帮助你更好地使用vi编辑器。
Python中的回车和换行是计算机中文本处理中的两个重要概念,它们在代码编写中扮演着非常重要的角色。本文从多个角度分析了Python中的回车和换行,包括回车和换行的概念、使用方法、使用场景和注意事项。通过本文的介绍,读者可以更好地理解和掌握Python中的回车和换行,从而编写出更加高效和规范的Python代码。
SQL Server启动不了错误1067是一种比较常见的故障,主要原因是数据库服务启动失败、权限不足和数据库文件损坏等。要解决这个问题,我们需要检查服务日志、重启服务器、检查文件权限和恢复数据库文件等。在日常的数据库运维工作中,我们应该时刻关注数据库的运行状况,及时发现并解决问题,以确保数据库的正常运行。
信息模块是一种可重复使用的、可编程的、可扩展的、可维护的、可测试的、可重构的软件组件。信息模块的端接需要从接口设计、数据格式、消息传递、函数调用等方面进行考虑。信息模块的端接需要满足高内聚、低耦合的原则,以保证系统的可扩展性和可维护性。
本文从电脑配置、PyCharm版本、Java版本、配置文件以及程序冲突等多个角度分析了Win10启动不了PyCharm的可能原因,并提供了解决方法。
本文主要从多个角度分析了安装SQL Server 2012时可能出现的错误,并提供了解决方法。
Pycharm是一款非常优秀的Python集成开发环境,它可以让Python开发者更加高效地进行代码编写、调试和测试。在Pycharm中设置解释器非常简单,我们可以通过创建新项目、修改项目解释器、设置全局解释器等多种方式进行设置。
Python中有多种方法可以将字符串转换为整数,包括使用int()函数、try-except语句、正则表达式、map()函数、ord()函数和reduce()函数。在实际应用中,应根据具体情况选择最合适的方法。
本文介绍了导入CSV文件的多种方法,包括使用Excel、Python和R等工具。同时,还介绍了导入CSV文件时需要注意的一些细节和问题。CSV文件是数据处理和分析中不可或缺的一部分,希望本文能够对读者有所帮助。
mongodb是一种新型的数据库,它采用了面向文档的数据模型,具有灵活性、高性能和高可用性等优势。但是,mongodb也存在数据结构混乱、安全性和学习成本高等问题。
当Python运行不了时,我们应该从代码、Python环境、操作系统和硬件设备等多个角度来排查问题,并采取相应的解决措施。