Contents
  1. 1. 一、前言
  2. 2. 二、RabbitMQ RESTful API
  3. 3. 三、zabbix sender
  4. 4. 四、python代码

一、前言

RabbitMQ自己已经有一个挺好的管理监控界面,可以看到非常详细的消息队列信息,但是对于运维人员来说,还缺少了很重要的告警功能。
自己写一个脚本也是很有必要的,在网上也有很多参考,主要思路是利用RabbitMQ自带管理模块的RESTful API取值,发送到zabbix,由zabbix负责告警阀值判断和发送告警

二、RabbitMQ RESTful API

首先要认识一下RabbitMQ的API,看看我们能取到什么东西,推荐看看官方的help
下面的例子我们在程序中会用到

查看MQ总体情况

官方解析:/api/overview Various random bits of information that describe the whole system.
例子:
curl -u guest:guest http://192.168.55.241:15672/api/overview

返回的是json格式的响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
{
"management_version": "3.0.0",
"statistics_level": "fine",
"exchange_types": [
{
"name": "topic",
"description": "AMQP topic exchange, as per the AMQP specification",
"enabled": true
},
{
"name": "fanout",
"description": "AMQP fanout exchange, as per the AMQP specification",
"enabled": true
},
{
"name": "direct",
"description": "AMQP direct exchange, as per the AMQP specification",
"enabled": true
},
{
"name": "headers",
"description": "AMQP headers exchange, as per the AMQP specification",
"enabled": true
}
],
"rabbitmq_version": "3.0.0",
"erlang_version": "R15B01",
"message_stats": {
"publish": 1104833,
"publish_details": {
"rate": 12.262779782167032,
"interval": 298146914019,
"last_event": 1476068131606
},
"ack": 1177571,
"ack_details": {
"rate": 11.230039450092189,
"interval": 57927468985,
"last_event": 1476068130882
},
"deliver": 1177572,
"deliver_details": {
"rate": 11.027518607238914,
"interval": 57927468985,
"last_event": 1476068130882
},
"deliver_get": 1207878,
"deliver_get_details": {
"rate": 11.027518607238914,
"interval": 57927468985,
"last_event": 1476068130882
},
"deliver_no_ack": 30306,
"deliver_no_ack_details": {
"rate": 0,
"interval": 1067361970,
"last_event": 1476068115093
},
"redeliver": 8,
"redeliver_details": {
"rate": 0,
"interval": 115179973,
"last_event": 1476068099504
}
},
"queue_totals": {
"messages": 0,
"messages_ready": 0,
"messages_unacknowledged": 0,
"messages_details": {
"rate": 0,
"interval": 255911999,
"last_event": 1476068131232
},
"messages_ready_details": {
"rate": 0,
"interval": 255911999,
"last_event": 1476068131232
},
"messages_unacknowledged_details": {
"rate": 0,
"interval": 255911999,
"last_event": 1476068131232
}
},
"object_totals": {
"consumers": 70,
"queues": 27,
"exchanges": 13,
"connections": 176,
"channels": 236
},
"node": "rabbit@rabbit01",
"statistics_db_node": "rabbit@rabbit01",
"listeners": [
{
"node": "rabbit@rabbit01",
"protocol": "amqp",
"ip_address": "::",
"port": 5672
}
],
"contexts": [
{
"node": "rabbit@rabbit01",
"description": "RabbitMQ Management",
"path": "/",
"port": 15672
},
{
"node": "rabbit@rabbit01",
"description": "Redirect to port 15672",
"path": "/",
"port": 55672,
"ignore_in_use": true
}
]
}

查看单个队列

官方解析:/api/queues/vhost/name An individual queue. To PUT a queue, you will need a body looking something like this:
{“auto_delete”:false,”durable”:true,”arguments”:[],”node”:”rabbit@smacmullen”}
All keys are optional.

例子:%2F代表vhost为/
curl -u guest:guest http://192.168.55.241:15672/api/queues/%2F/wxq.msgweb.push.wgw.0

返回的是json格式的响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
{
"policy": "",
"exclusive_consumer_tag": "",
"messages_ready": 0,
"messages_unacknowledged": 0,
"messages": 0,
"consumers": 1,
"active_consumers": 1,
"memory": 55680,
"backing_queue_status": {
"q1": 0,
"q2": 0,
"delta": [
"delta",
"undefined",
0,
"undefined"
],
"q3": 0,
"q4": 0,
"len": 0,
"pending_acks": 0,
"target_ram_count": "infinity",
"ram_msg_count": 0,
"ram_ack_count": 0,
"next_seq_id": 1,
"persistent_count": 0,
"avg_ingress_rate": 0,
"avg_egress_rate": 0,
"avg_ack_ingress_rate": 0,
"avg_ack_egress_rate": 0
},
"messages_details": {
"rate": 0,
"interval": 10001981,
"last_event": 1476068574166
},
"messages_ready_details": {
"rate": 0,
"interval": 10001981,
"last_event": 1476068574166
},
"messages_unacknowledged_details": {
"rate": 0,
"interval": 10001981,
"last_event": 1476068574166
},
"incoming": [
{
"stats": {
"publish": 763,
"publish_details": {
"rate": 0,
"interval": 75305976,
"last_event": 1476068545922
}
},
"exchange": {
"name": "wxq_appmsg_exchange",
"vhost": "/"
}
}
],
"deliveries": [
{
"stats": {
"deliver_get": 774,
"deliver_get_details": {
"rate": 0,
"interval": 44411017,
"last_event": 1476068470753
},
"deliver_no_ack": 774,
"deliver_no_ack_details": {
"rate": 0,
"interval": 44411017,
"last_event": 1476068470753
}
},
"channel_details": {
"name": "192.168.65.158:64326 -> 192.168.55.241:5672 (1)",
"number": 1,
"connection_name": "192.168.65.158:64326 -> 192.168.55.241:5672",
"peer_port": 64326,
"peer_host": "192.168.65.158"
}
}
],
"message_stats": {
"deliver_get": 774,
"deliver_get_details": {
"rate": 0,
"interval": 44411017,
"last_event": 1476068470753
},
"deliver_no_ack": 774,
"deliver_no_ack_details": {
"rate": 0,
"interval": 44411017,
"last_event": 1476068470753
},
"publish": 763,
"publish_details": {
"rate": 0,
"interval": 75305976,
"last_event": 1476068545922
}
},
"consumer_details": [
{
"channel_details": {
"name": "192.168.65.158:64326 -> 192.168.55.241:5672 (1)",
"number": 1,
"connection_name": "192.168.65.158:64326 -> 192.168.55.241:5672",
"peer_port": 64326,
"peer_host": "192.168.65.158"
},
"queue_details": {
"name": "wxq.msgweb.push.wgw.0",
"vhost": "/"
},
"consumer_tag": "amq.ctag-jKVWrQSs7dbX9VaFfOZiYA",
"exclusive": false,
"ack_required": false
}
],
"name": "wxq.msgweb.push.wgw.0",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {},
"node": "rabbit@rabbit01"
}

三、zabbix sender

区别于在zabbix agent添加item,sender可以主动发送item值到zabbix server
可以先规划好在zabbix server中使用的host和item名字

例如
rabbitmq.[rabbit@wx-pro-server01,messages_ready] 待处理消息总数
rabbitmq.[rabbit@wx-pro-server01,messages_unacknowledged] 未收到ack响应消息总数
rabbitmq.queues[/,queue_messages_ready,wxq_msg_send] wxq_msg_send队列的待处理消息数
rabbitmq.queues[/,queue_active_consumers,wxq_msg_send] wxq_msg_send队列的活动连接数

这样就可以通过python取值,保存在临时文件中,再send到zabbix server。

zabbix_sender使用方法
/usr/bin/zabbix_sender -c /etc/zabbix/zabbix_agentd.conf -i /tmp/tmp5ai4uI -s ser01-rabbitmq
-c agent配置文件
-i 临时文件,用于保存多个item和对应值,每行一个item
-s 在zabbix server上配置的host名字

四、python代码

做好上面的准备工作,就可以开始写脚本了,不多说了直接放代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/python
# coding=utf-8
'''
RabbitMq监控脚本 v1.0
v1.1 监控channel的unacknowledged
v1.0 监控overview、queue
'''
import json
import urllib2
import os
import socket
import tempfile
import logging
import subprocess
import optparse
import time
logging.basicConfig(filename='/var/log/zabbix/rabbitmq_zabbix.log', level=logging.WARNNING, format='%(asctime)s %(levelname)s: %(message)s')
class RabbitMqApi(object):
''' 调用RabbitMQ的API类 '''
######## json.loads() transfer json data to python data
######## json.dump() transfer python data to json data

def __init__(self, user_name='guest', password='guest', host_name='192.168.55.241',
protocol='http', port=15672, senderhostname='ser01-rabbitmq',
zbconf='/etc/zabbix/zabbix_agentd.conf'):
self.user_name = user_name
self.password = password
self.host_name = host_name or socket.gethostname()
self.protocol = protocol
self.port = port
self.senderhostname = senderhostname if senderhostname else host_name
self.conf = zbconf
# 调用RabbitMQ的API类取原始数据,json格式
def call_api(self, path):
'''
All URIs will server only resource of type application/json,and will require HTTP basic authentication. The default username and password is guest/guest. /%sf is encoded for the default virtual host '/'
'''
url = '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host_name, self.port, path)
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, self.user_name, self.password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
logging.debug('Issue a rabbit API call to get data on ' + url)
return json.loads(urllib2.build_opener(handler).open(url).read())

# 调用RabbitMQ的API类取原始数据,json格式
def call_api_queue(self, queue_name):
'''
get queue detail
'''
url = '{0}://{1}:{2}/api/queues/%2F/{3}'.format(self.protocol, self.host_name, self.port, queue_name)
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, self.user_name, self.password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
logging.debug('Issue a rabbit API call to get data on ' + url)
return json.loads(urllib2.build_opener(handler).open(url).read())

def call_api_channel(self, channel_name):
'''
get channel detail
'''
url = '{0}://{1}:{2}/api/channels/{3}'.format(self.protocol, self.host_name, self.port, urllib2.quote(channel_name))
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, self.user_name, self.password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
logging.debug('Issue a rabbit API call to get data on ' + url)
return json.loads(urllib2.build_opener(handler).open(url).read())

# 把数据与item对应写进临时文件
def _get_queue_data(self, queue, tmpfile):
for item in [ 'memory','messages','messages_ready','messages_unacknowledged','consumers','active_consumers' ]:
#key = rabbitmq.queues[/,queue_memory,queue.helloWorld]
key = '"rabbitmq.queues[{0},queue_{1},{2}]"'.format(queue['vhost'], item, queue['name'])
### if item is in queue,value=queue[item],else value=0
value = queue.get(item, 0)
logging.debug("SENDER_DATA: - %s %s" % (key,value))
tmpfile.write("- %s %s\n" % (key, value))
## This is a non standard bit of information added after the standard items
for item in ['deliver_get', 'publish']:
key = '"rabbitmq.queues[{0},queue_message_stats_{1},{2}]"'.format(queue['vhost'], item, queue['name'])
value = queue.get('message_stats', {}).get(item, 0)
logging.debug("SENDER_DATA: - %s %s" % (key,value))
tmpfile.write("- %s %s\n" % (key, value))
# 使用zabbix_sender把item发送到zabbix server
def _send_queue_data(self, tmpfile):
'''Send the queue data to Zabbix.'''
'''Get key value from temp file. '''
args = '/usr/bin/zabbix_sender -c {0} -i {1}'
if self.senderhostname:
args = args + " -s " + self.senderhostname
return_code = 0
process = subprocess.Popen(args.format(self.conf, tmpfile.name),
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = process.communicate()
logging.debug("Finished sending data")
return_code = process.wait()
logging.info("Found return code of " + str(return_code))
if return_code != 0:
logging.warning(out)
logging.warning(err)
else:
logging.debug(err)
logging.debug(out)
return return_code

def check_queue(self, queueName):
rdatafile = tempfile.NamedTemporaryFile(delete=False)
return_code = 0
queue = self.call_api_queue(queueName)
self._get_queue_data(queue, rdatafile)
self._get_channel_data(queue, rdatafile)
rdatafile.close()
return_code = self._send_queue_data(rdatafile)
os.unlink(rdatafile.name)
return return_code
def check_overview(self):
return_code = 0
rdatafile = tempfile.NamedTemporaryFile(delete=False)
queue = self.call_api('overview')
self._get_overview_data(queue, rdatafile)
rdatafile.close()
return_code = self._send_queue_data(rdatafile)
os.unlink(rdatafile.name)
return return_code

def _get_overview_data(self, queue, tmpfile):
for item in ['publish_details', 'ack_details', 'deliver_get_details']:
key = "rabbitmq.[{0},{1}_rate]".format(queue['node'], item)
value = queue.get('message_stats').get(item).get('rate', 0)
logging.debug("SENDER_DATA: - {0} {1:.2f}\n".format(key, value))
tmpfile.write("- {0} {1:.2f}\n".format(key, value))
for item in ['messages','messages_ready','messages_unacknowledged']:
key = "rabbitmq.[{0},{1}]".format(queue['node'], item)
value = queue.get('queue_totals').get(item,0)
logging.debug("SENDER_DATA: - %s %s" % (key,value))
tmpfile.write("- %s %s\n" % (key, value))

def _get_channel_data(self, queue, tmpfile):
'''
从queue获取queue name和channel name,组装zabbix的k-v并写进临时文件
'''
for ch_detail in queue['consumer_details']:
key = "rabbitmq.[{},{}]".format(queue['name'], ch_detail.get('channel_details').get('name','no_name'))
value = self._get_channel_ack(ch_detail.get('channel_details').get('name','no_name'))
logging.debug("SENDER_DATA: - %s %s" % (key,value))
tmpfile.write("- %s %s\n" % (key, value))

def _get_channel_ack(self, channel_name):
'''
使用api获取messages_unacknowledged值
'''
d = self.call_api_channel(channel_name)
return d['messages_unacknowledged']

def main():
'''Command-line parameters and decoding for Zabbix use/consumption.'''
#testapi = RabbitMqApi()
#testapi.check_queue('wxq_msg_send')
parser = optparse.OptionParser()
parser.add_option('-u', '--username', help='RabbitMQ API username,default=guest',default='guest')
parser.add_option('-p', '--password', help='RabbitMQ API password,default=guest',default='guest')
parser.add_option('--mqhost', help='RabbitMQ API hostname,default=hostname',default=socket.gethostname())
parser.add_option('--protocol', help='RabbitMQ API protocol,default=http',default='http')
parser.add_option('--port', help='RabbitMQ API port,default=15672',default=15672)
parser.add_option('--host', help='Zabbix Host name which the item belongs to,no default')
parser.add_option('--conf', help='Zabbix agent configure file,default=/etc/zabbix/zabbix_agentd.conf',default='/etc/zabbix/zabbix_agentd.conf')
parser.add_option('--queue', help='RabbitMQ queue which you want to check,no default')
parser.add_option('-o','--overview', help='check RabbitMQ overview',action="store_true")
parser.add_option('-c','--count', help='How many times to execute command', type='int', default=1)
parser.add_option('-i','--interval', help='Command execute interval', type='int', default=0)
(options, args) = parser.parse_args()
if not options.host:
parser.error("请输入zabbix里定义的hostname")
elif (not options.queue and not options.overview):
parser.error("请指定-o或者--queue,选择检查mq总体指标或者某个队列指标")
logging.debug("Started trying to process data")
api = RabbitMqApi(user_name=options.username, password=options.password, host_name=options.mqhost,
protocol=options.protocol, port=options.port, senderhostname=options.host,
zbconf=options.conf)
cnt = 0
while cnt < options.count:
if options.overview:
api.check_overview()
time.sleep(options.interval)
elif options.queue:
api.check_queue(options.queue)
time.sleep(options.interval)
cnt += 1

if __name__ == '__main__':
main()

脚本使用方式如下:

  • 监控mq总体情况: mqMon.py -o –mqhost 192.168.55.241 –host ser01-rabbitmq -c 5 -i 12
  • 监控队列情况: mqMon.py –queue wxq_msg_send –mqhost 192.168.55.241 –host ser01-rabbitmq -c 5 -i 12

–mqhost RabbitMQ服务器ip
–host zabbix server上配置的host名字
–queue 队列名字
-o 获取overview数据
-c 执行5次
-i 间隔12秒
–help 查看帮助,有其他参数可选

代码参考了 http://john88wang.blog.51cto.com/2165294/1745824

Contents
  1. 1. 一、前言
  2. 2. 二、RabbitMQ RESTful API
  3. 3. 三、zabbix sender
  4. 4. 四、python代码