历代文学网 历代文学
收录来自古今中外 20 多个朝代,近 60个 国家的作者超 3万 人,诗词曲赋、文言文等作品数近 60万 个,名句超 10万 条,著作超 2万 部。

Python实战-从菜鸟到大牛的进阶之路 作者:极客学院 近现代)

章节目录树

8 python 调用 zabbix 的 api 接口添加主机、查询组、主机、模板

上一章 下一章

 #!/usr/bin/env python # -*- coding: utf-8 -*-import jsonimport urllib2import sysclass zabbixtools:    def __init__(self):self.url = "http://192.168.100.200/zabbix/api_jsonrpc.php"self.header = {"Content-Type": "application/json"}self.authID = self.user_login    def user_login(self):data = json.dumps({    "jsonrpc": "2.0",    "method": "user.login",    "params": {"user": "Admin","password": "zabbix"},    "id": 0    })request = urllib2.Request(self.url,data)for key in self.header:    request.add_header(key,self.header[key])try:    result = urllib2.urlopen(request)except URLError as e:    print "Auth Failed, Please Check Your Name And Password:",e.codeelse:    response = json.loads(result.read)    result.close    authID = response[\'result\']    return authID    def get_data(self,data,hostip=""):request = urllib2.Request(self.url,data)for key in self.header:    request.add_header(key,self.header[key])try:    result = urllib2.urlopen(request)except URLError as e:    if hasattr(e, \'reason\'):print \'We failed to reach a server.\'print \'Reason: \', e.reason    elif hasattr(e, \'code\'):print \'The server could not fulfill the request.\'print \'Error code: \', e.code    return 0else:    response = json.loads(result.read)    result.close    return response    def host_get(self,hostip):#hostip = raw_input("\033[1;35;40m%s\033[0m" % \'Enter Your Check Host:Host_ip :\')data = json.dumps({    "jsonrpc": "2.0",    "method": "host.get",    "params": {"output":["hostid","name","status","host"],"filter": {"host": [hostip]}},    "auth": self.authID,    "id": 1})res = self.get_data(data)[\'result\']if (res != 0) and (len(res) != 0):    #for host in res:    host = res[0]    if host[\'status\'] == \'1\':print "t","\033[1;31;40m%s\033[0m" % "Host_IP:","\033[1;31;40m%s\033[0m" % host[\'host\'].ljust(15),\'t\',"\033[1;31;40m%s\033[0m" % "Host_Name:","\033[1;31;40m%s\033[0m" % host[\'name\'].encode(\'GBK\'),\'t\',"\033[1;31;40m%s\033[0m" % u\'未在监控状态\'.encode(\'GBK\')return host[\'hostid\']    elif host[\'status\'] == \'0\':print "t","\033[1;32;40m%s\033[0m" % "Host_IP:","\033[1;32;40m%s\033[0m" % host[\'host\'].ljust(15),\'t\',"\033[1;32;40m%s\033[0m" % "Host_Name:","\033[1;32;40m%s\033[0m" % host[\'name\'].encode(\'GBK\'),\'t\',"\033[1;32;40m%s\033[0m" % u\'在监控状态\'.encode(\'GBK\')return host[\'hostid\']    printelse:    print \'t\',"\033[1;31;40m%s\033[0m" % "Get Host Error or cannot find this host,please check !"    return 0    def host_del(self):hostip = raw_input("\033[1;35;40m%s\033[0m" % \'Enter Your Check Host:Host_ip :\')hostid = self.host_get(hostip)if hostid == 0:    print \'t\',"\033[1;31;40m%s\033[0m" % "This host cannot find in zabbix,please check it !"    sys.exitdata = json.dumps({    "jsonrpc": "2.0",    "method": "host.delete",    "params": [{"hostid": hostid}],    "auth": self.authID,    "id": 1})res = self.get_data(data)[\'result\']if \'hostids\' in res.keys:    print "t","\033[1;32;40m%s\033[0m" % "Delet Host:%s success !" % hostipelse:    print "t","\033[1;31;40m%s\033[0m" % "Delet Host:%s failure !" % hostip    def hostgroup_get(self):data = json.dumps({    "jsonrpc": "2.0",    "method": "hostgroup.get",    "params": {"output": "extend",},    "auth": self.authID,    "id": 1,    })res = self.get_data(data)if \'result\' in res.keys:    res = res[\'result\']    if (res !=0) or (len(res) != 0):print "\033[1;32;40m%s\033[0m" % "Number Of Group: ", "\033[1;31;40m%d\033[0m" % len(res)for host in res:    print "t","HostGroup_id:",host[\'groupid\'],"t","HostGroup_Name:",host[\'name\'].encode(\'GBK\')printelse:    print "Get HostGroup Error,please check !"    def template_get(self):data = json.dumps({    "jsonrpc": "2.0",    "method": "template.get",    "params": {"output": "extend",},    "auth": self.authID,    "id": 1,    })res = self.get_data(data)#[\'result\']if \'result\' in res.keys:    res = res[\'result\']    if (res !=0) or (len(res) != 0):print "\033[1;32;40m%s\033[0m" % "Number Of Template: ", "\033[1;31;40m%d\033[0m" % len(res)for host in res:    print "t","Template_id:",host[\'templateid\'],"t","Template_Name:",host[\'name\'].encode(\'GBK\')printelse:    print "Get Template Error,please check !"    def host_create(self):hostip = raw_input("\033[1;35;40m%s\033[0m" % \'Enter your:Host_ip :\')groupid = raw_input("\033[1;35;40m%s\033[0m" % \'Enter your:Group_id :\')templateid = raw_input("\033[1;35;40m%s\033[0m" % \'Enter your:Tempate_id :\')g_list=t_list=for i in groupid.split(\',\'):    var = {}    var[\'groupid\'] = i    g_list.append(var)for i in templateid.split(\',\'):    var = {}    var[\'templateid\'] = i    t_list.append(var)if hostip and groupid and templateid:    data = json.dumps(    {"jsonrpc": "2.0","method": "host.create","params": {    "host": hostip,    "interfaces": [{    "type": 1,    "main": 1,    "useip": 1,    "ip": hostip,    "dns": "",    "port": "10050"}    ],    "groups": g_list,    "templates": t_list,    },"auth": self.authID,"id": 1,})    res = self.get_data(data,hostip)    if \'result\' in res.keys:res = res[\'result\']if \'hostids\' in res.keys:    print "\033[1;32;40m%s\033[0m" % "Create host success"    else:print "\033[1;31;40m%s\033[0m" % "Create host failure: %s" % res[\'error\'][\'data\']else:    print "\033[1;31;40m%s\033[0m" % "Enter Error: ip or groupid or tempateid is NULL,please check it !"def main:    test = zabbixtools    #test.template_get    #test.hostgroup_get    #test.host_get    test.host_del    #test.host_createif __name__ == "__main__":    main 

zabbix 有一个 API 接口,可以调用这些几口来自动添加主机,查询 zabbix 中监控的主机,监控的模板、监控的主机组等信息,使用也非常的方便。以下是用 python 调用 zabbix 的 API 接口来实现上述功能:

相关的材料的可以参考官方文档。这个只是一些功能模块,包含获取主机,主机组、模板、删除主机等功能,可以根据需要进行调整,实现 zabbix 的批量化和自动化管理。因为是在 linux 运行,所以设置了输出终端的字体颜色方便区分,如果不需要,自行删除即可。

上一章 下一章