1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
''' RabbitMq监控脚本 v1.0 v1.1 监控channel的unacknowledged v1.0 监控overview、queue ''' import json import urllib2 import os import socket import tempfile import logging import subprocess import optparse import time logging.basicConfig(filename='/var/log/zabbix/rabbitmq_zabbix.log', level=logging.WARNNING, format='%(asctime)s %(levelname)s: %(message)s') class RabbitMqApi(object): ''' 调用RabbitMQ的API类 ''' def __init__(self, user_name='guest', password='guest', host_name='192.168.55.241', protocol='http', port=15672, senderhostname='ser01-rabbitmq', zbconf='/etc/zabbix/zabbix_agentd.conf'): self.user_name = user_name self.password = password self.host_name = host_name or socket.gethostname() self.protocol = protocol self.port = port self.senderhostname = senderhostname if senderhostname else host_name self.conf = zbconf def call_api(self, path): ''' All URIs will server only resource of type application/json,and will require HTTP basic authentication. The default username and password is guest/guest. /%sf is encoded for the default virtual host '/' ''' url = '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host_name, self.port, path) password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, url, self.user_name, self.password) handler = urllib2.HTTPBasicAuthHandler(password_mgr) logging.debug('Issue a rabbit API call to get data on ' + url) return json.loads(urllib2.build_opener(handler).open(url).read()) def call_api_queue(self, queue_name): ''' get queue detail ''' url = '{0}://{1}:{2}/api/queues/%2F/{3}'.format(self.protocol, self.host_name, self.port, queue_name) password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, url, self.user_name, self.password) handler = urllib2.HTTPBasicAuthHandler(password_mgr) logging.debug('Issue a rabbit API call to get data on ' + url) return json.loads(urllib2.build_opener(handler).open(url).read()) def call_api_channel(self, channel_name): ''' get channel detail ''' url = '{0}://{1}:{2}/api/channels/{3}'.format(self.protocol, self.host_name, self.port, urllib2.quote(channel_name)) password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, url, self.user_name, self.password) handler = urllib2.HTTPBasicAuthHandler(password_mgr) logging.debug('Issue a rabbit API call to get data on ' + url) return json.loads(urllib2.build_opener(handler).open(url).read()) def _get_queue_data(self, queue, tmpfile): for item in [ 'memory','messages','messages_ready','messages_unacknowledged','consumers','active_consumers' ]: key = '"rabbitmq.queues[{0},queue_{1},{2}]"'.format(queue['vhost'], item, queue['name']) value = queue.get(item, 0) logging.debug("SENDER_DATA: - %s %s" % (key,value)) tmpfile.write("- %s %s\n" % (key, value)) for item in ['deliver_get', 'publish']: key = '"rabbitmq.queues[{0},queue_message_stats_{1},{2}]"'.format(queue['vhost'], item, queue['name']) value = queue.get('message_stats', {}).get(item, 0) logging.debug("SENDER_DATA: - %s %s" % (key,value)) tmpfile.write("- %s %s\n" % (key, value)) def _send_queue_data(self, tmpfile): '''Send the queue data to Zabbix.''' '''Get key value from temp file. ''' args = '/usr/bin/zabbix_sender -c {0} -i {1}' if self.senderhostname: args = args + " -s " + self.senderhostname return_code = 0 process = subprocess.Popen(args.format(self.conf, tmpfile.name), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = process.communicate() logging.debug("Finished sending data") return_code = process.wait() logging.info("Found return code of " + str(return_code)) if return_code != 0: logging.warning(out) logging.warning(err) else: logging.debug(err) logging.debug(out) return return_code def check_queue(self, queueName): rdatafile = tempfile.NamedTemporaryFile(delete=False) return_code = 0 queue = self.call_api_queue(queueName) self._get_queue_data(queue, rdatafile) self._get_channel_data(queue, rdatafile) rdatafile.close() return_code = self._send_queue_data(rdatafile) os.unlink(rdatafile.name) return return_code def check_overview(self): return_code = 0 rdatafile = tempfile.NamedTemporaryFile(delete=False) queue = self.call_api('overview') self._get_overview_data(queue, rdatafile) rdatafile.close() return_code = self._send_queue_data(rdatafile) os.unlink(rdatafile.name) return return_code def _get_overview_data(self, queue, tmpfile): for item in ['publish_details', 'ack_details', 'deliver_get_details']: key = "rabbitmq.[{0},{1}_rate]".format(queue['node'], item) value = queue.get('message_stats').get(item).get('rate', 0) logging.debug("SENDER_DATA: - {0} {1:.2f}\n".format(key, value)) tmpfile.write("- {0} {1:.2f}\n".format(key, value)) for item in ['messages','messages_ready','messages_unacknowledged']: key = "rabbitmq.[{0},{1}]".format(queue['node'], item) value = queue.get('queue_totals').get(item,0) logging.debug("SENDER_DATA: - %s %s" % (key,value)) tmpfile.write("- %s %s\n" % (key, value)) def _get_channel_data(self, queue, tmpfile): ''' 从queue获取queue name和channel name,组装zabbix的k-v并写进临时文件 ''' for ch_detail in queue['consumer_details']: key = "rabbitmq.[{},{}]".format(queue['name'], ch_detail.get('channel_details').get('name','no_name')) value = self._get_channel_ack(ch_detail.get('channel_details').get('name','no_name')) logging.debug("SENDER_DATA: - %s %s" % (key,value)) tmpfile.write("- %s %s\n" % (key, value)) def _get_channel_ack(self, channel_name): ''' 使用api获取messages_unacknowledged值 ''' d = self.call_api_channel(channel_name) return d['messages_unacknowledged'] def main(): '''Command-line parameters and decoding for Zabbix use/consumption.''' parser = optparse.OptionParser() parser.add_option('-u', '--username', help='RabbitMQ API username,default=guest',default='guest') parser.add_option('-p', '--password', help='RabbitMQ API password,default=guest',default='guest') parser.add_option('--mqhost', help='RabbitMQ API hostname,default=hostname',default=socket.gethostname()) parser.add_option('--protocol', help='RabbitMQ API protocol,default=http',default='http') parser.add_option('--port', help='RabbitMQ API port,default=15672',default=15672) parser.add_option('--host', help='Zabbix Host name which the item belongs to,no default') parser.add_option('--conf', help='Zabbix agent configure file,default=/etc/zabbix/zabbix_agentd.conf',default='/etc/zabbix/zabbix_agentd.conf') parser.add_option('--queue', help='RabbitMQ queue which you want to check,no default') parser.add_option('-o','--overview', help='check RabbitMQ overview',action="store_true") parser.add_option('-c','--count', help='How many times to execute command', type='int', default=1) parser.add_option('-i','--interval', help='Command execute interval', type='int', default=0) (options, args) = parser.parse_args() if not options.host: parser.error("请输入zabbix里定义的hostname") elif (not options.queue and not options.overview): parser.error("请指定-o或者--queue,选择检查mq总体指标或者某个队列指标") logging.debug("Started trying to process data") api = RabbitMqApi(user_name=options.username, password=options.password, host_name=options.mqhost, protocol=options.protocol, port=options.port, senderhostname=options.host, zbconf=options.conf) cnt = 0 while cnt < options.count: if options.overview: api.check_overview() time.sleep(options.interval) elif options.queue: api.check_queue(options.queue) time.sleep(options.interval) cnt += 1 if __name__ == '__main__': main()
|