RabbitMQ作为一个工业级的消息中间件,肯定是缺少不了监控的,RabbitMQ提供了WEB版的页面监控(访问地址:http://server-name:15672/,默认端口号是15672。原文:The web UI is located at: http://server-name:15672/),类似于如下:
当然,需要有相关功能的前提是开启了:rabbitmqctl rabbitmq_management.
如果小用用的话,这个web管理界面就够了,如果公司有专门的团队,比如中间件团队来专门负责一些基础组件,那么必然会有自身的一套生态环境,那么自然而然的独立的且可以和公司其他系统接入的监控系统必不可少,没有监控的代码那是一抹黑的。
要构建独立的监控系统,可以利用RabbitMQ提供的restful http api接口(原文:The HTTP API and its documentation are both located at: http://server-name:15672/api/ (or view our latest HTTP API documentation here).)。当然这个接口的作用远不至于调取一些监控数据,也可以通过api来操作RabbitMQ进行添加删除的操作(GET,PUT,DELETE,POST)。
引用RabbitMQ官网的例子,比如列出所有的vhosts:
再比如创建一个新的vhost:
采用RabbitMQ提供的restful http api来做监控其实很简单,只需调用(比如HttpClient工具):http://server-ip:15672/api/nodes即可。下面展示下博主这里的某些监控指标:broker节点的内存占用,磁盘剩余空间,Socket句柄,Broker子进程数,文件句柄数。监控示例图分别如下:
通过http://server-ip:15672/api/nodes获取到的数据如下:
[ { "cluster_links": [ { "peer_addr": "10.198.197.73", "peer_port": 25672, "sock_addr": "10.198.198.10", "sock_port": 29226, "stats": { "send_bytes": 49847976, "send_bytes_details": { "rate": 0 }, "recv_bytes": 2714074, "recv_bytes_details": { "rate": 0 } }, "name": "rabbit@zhuzhonghua2-fqawb" } ], "disk_free": 36541222912, "disk_free_details": { "rate": 0 }, "fd_used": 28, "fd_used_details": { "rate": 0 }, "io_read_avg_time": 0, "io_read_avg_time_details": { "rate": 0 }, "io_read_bytes": 3784705, "io_read_bytes_details": { "rate": 0 }, "io_read_count": 3, "io_read_count_details": { "rate": 0 }, "io_seek_avg_time": 0.054, "io_seek_avg_time_details": { "rate": 0 }, "io_seek_count": 4, "io_seek_count_details": { "rate": 0 }, "io_sync_avg_time": 0, "io_sync_avg_time_details": { "rate": 0 }, "io_sync_count": 2, "io_sync_count_details": { "rate": 0 }, "io_write_avg_time": 0, "io_write_avg_time_details": { "rate": 0 }, "io_write_bytes": 524, "io_write_bytes_details": { "rate": 0 }, "io_write_count": 2, "io_write_count_details": { "rate": 0 }, "mem_used": 53393008, "mem_used_details": { "rate": 0 }, "mnesia_disk_tx_count": 2, "mnesia_disk_tx_count_details": { "rate": 0 }, "mnesia_ram_tx_count": 34, "mnesia_ram_tx_count_details": { "rate": 0 }, "proc_used": 181, "proc_used_details": { "rate": 0 }, "queue_index_journal_write_count": 1, "queue_index_journal_write_count_details": { "rate": 0 }, "queue_index_read_count": 1, "queue_index_read_count_details": { "rate": 0 }, "queue_index_write_count": 1, "queue_index_write_count_details": { "rate": 0 }, "sockets_used": 1, "sockets_used_details": { "rate": 0 }, "partitions": [], "os_pid": "795", "fd_total": 1024, "sockets_total": 829, "mem_limit": 3301929779, "mem_alarm": false, "disk_free_limit": 50000000, "disk_free_alarm": false, "proc_total": 1048576, "rates_mode": "basic", "uptime": 100629724, "run_queue": 0, "processors": 4, "exchange_types": [ { "name": "headers", "description": "AMQP headers exchange, as per the AMQP specification", "enabled": true }, { "name": "topic", "description": "AMQP topic exchange, as per the AMQP specification", "enabled": true }, { "name": "direct", "description": "AMQP direct exchange, as per the AMQP specification", "enabled": true }, { "name": "fanout", "description": "AMQP fanout exchange, as per the AMQP specification", "enabled": true } ], "auth_mechanisms": [ { "name": "PLAIN", "description": "SASL PLAIN authentication mechanism", "enabled": true }, { "name": "AMQPLAIN", "description": "QPid AMQPLAIN mechanism", "enabled": true }, { "name": "RABBIT-CR-DEMO", "description": "RabbitMQ Demo challenge-response authentication mechanism", "enabled": false } ], "applications": [ { "name": "amqp_client", "description": "RabbitMQ AMQP Client", "version": "3.5.7" }, { "name": "inets", "description": "INETS CXC 138 49", "version": "6.3.3" }, { "name": "kernel", "description": "ERTS CXC 138 10", "version": "5.1" }, { "name": "mnesia", "description": "MNESIA CXC 138 12", "version": "4.14.1" }, { "name": "mochiweb", "description": "MochiMedia Web Server", "version": "2.7.0-rmq3.5.7-git680dba8" }, { "name": "os_mon", "description": "CPO CXC 138 46", "version": "2.4.1" }, { "name": "rabbit", "description": "RabbitMQ", "version": "3.5.7" }, { "name": "rabbitmq_management", "description": "RabbitMQ Management Console", "version": "3.5.7" }, { "name": "rabbitmq_management_agent", "description": "RabbitMQ Management Agent", "version": "3.5.7" }, { "name": "rabbitmq_web_dispatch", "description": "RabbitMQ Web Dispatcher", "version": "3.5.7" }, { "name": "sasl", "description": "SASL CXC 138 11", "version": "3.0.1" }, { "name": "stdlib", "description": "ERTS CXC 138 10", "version": "3.1" }, { "name": "webmachine", "description": "webmachine", "version": "1.10.3-rmq3.5.7-gite9359c7" }, { "name": "xmerl", "description": "XML parser", "version": "1.3.12" } ], "contexts": [ { "description": "RabbitMQ Management", "path": "/", "port": "15672" } ], "log_file": "/opt/rabbitmq/sbin/../var/log/rabbitmq/rabbit@hiddenzhu-8drdc.log", "sasl_log_file": "/opt/rabbitmq/sbin/../var/log/rabbitmq/rabbit@hiddenzhu-8drdc-sasl.log", "db_dir": "/opt/rabbitmq/sbin/../var/lib/rabbitmq/mnesia/rabbit@hiddenzhu-8drdc", "config_files": [ "/opt/rabbitmq/sbin/../etc/rabbitmq/rabbitmq.config (not found)" ], "net_ticktime": 60, "enabled_plugins": [ "rabbitmq_management" ], "name": "rabbit@hiddenzhu-8drdc", "type": "disc", "running": true }, { "cluster_links": [ { "peer_addr": "10.198.198.10", "peer_port": 29226, "sock_addr": "10.198.197.73", "sock_port": 25672, "stats": { "send_bytes": 2714033, "send_bytes_details": { "rate": 8.2 }, "recv_bytes": 49847976, "recv_bytes_details": { "rate": 491.6 } }, "name": "rabbit@hiddenzhu-8drdc" } ], "disk_free": 34482147328, "disk_free_details": { "rate": -2457.6 }, "fd_used": 36, "fd_used_details": { "rate": 0.4 }, "io_read_avg_time": 0, "io_read_avg_time_details": { "rate": 0 }, "io_read_bytes": 479585, "io_read_bytes_details": { "rate": 0 }, "io_read_count": 6, "io_read_count_details": { "rate": 0 }, "io_seek_count": 1, "io_seek_count_details": { "rate": 0 }, "io_sync_avg_time": 0, "io_sync_avg_time_details": { "rate": 0 }, "io_write_avg_time": 0, "io_write_avg_time_details": { "rate": 0 }, "mem_used": 44401848, "mem_used_details": { "rate": 7947.2 }, "mnesia_disk_tx_count": 6, "mnesia_disk_tx_count_details": { "rate": 0 }, "mnesia_ram_tx_count": 44, "mnesia_ram_tx_count_details": { "rate": 0 }, "proc_used": 194, "proc_used_details": { "rate": 0 }, "queue_index_read_count": 1, "queue_index_read_count_details": { "rate": 0 }, "sockets_used": 1, "sockets_used_details": { "rate": 0 }, "partitions": [], "os_pid": "3806", "fd_total": 1024, "sockets_total": 829, "mem_limit": 3301929779, "mem_alarm": false, "disk_free_limit": 50000000, "disk_free_alarm": false, "proc_total": 1048576, "rates_mode": "basic", "uptime": 100632422, "run_queue": 0, "processors": 4, "exchange_types": [ { "name": "headers", "description": "AMQP headers exchange, as per the AMQP specification", "enabled": true }, { "name": "topic", "description": "AMQP topic exchange, as per the AMQP specification", "enabled": true }, { "name": "direct", "description": "AMQP direct exchange, as per the AMQP specification", "enabled": true }, { "name": "fanout", "description": "AMQP fanout exchange, as per the AMQP specification", "enabled": true } ], "auth_mechanisms": [ { "name": "PLAIN", "description": "SASL PLAIN authentication mechanism", "enabled": true }, { "name": "AMQPLAIN", "description": "QPid AMQPLAIN mechanism", "enabled": true }, { "name": "RABBIT-CR-DEMO", "description": "RabbitMQ Demo challenge-response authentication mechanism", "enabled": false } ], "applications": [ { "name": "amqp_client", "description": "RabbitMQ AMQP Client", "version": "3.5.7" }, { "name": "inets", "description": "INETS CXC 138 49", "version": "6.3.3" }, { "name": "kernel", "description": "ERTS CXC 138 10", "version": "5.1" }, { "name": "mnesia", "description": "MNESIA CXC 138 12", "version": "4.14.1" }, { "name": "mochiweb", "description": "MochiMedia Web Server", "version": "2.7.0-rmq3.5.7-git680dba8" }, { "name": "os_mon", "description": "CPO CXC 138 46", "version": "2.4.1" }, { "name": "rabbit", "description": "RabbitMQ", "version": "3.5.7" }, { "name": "rabbitmq_management", "description": "RabbitMQ Management Console", "version": "3.5.7" }, { "name": "rabbitmq_management_agent", "description": "RabbitMQ Management Agent", "version": "3.5.7" }, { "name": "rabbitmq_web_dispatch", "description": "RabbitMQ Web Dispatcher", "version": "3.5.7" }, { "name": "sasl", "description": "SASL CXC 138 11", "version": "3.0.1" }, { "name": "stdlib", "description": "ERTS CXC 138 10", "version": "3.1" }, { "name": "webmachine", "description": "webmachine", "version": "1.10.3-rmq3.5.7-gite9359c7" }, { "name": "xmerl", "description": "XML parser", "version": "1.3.12" } ], "contexts": [ { "description": "RabbitMQ Management", "path": "/", "port": "15672" } ], "log_file": "/opt/rabbitmq/sbin/../var/log/rabbitmq/rabbit@zhuzhonghua2-fqawb.log", "sasl_log_file": "/opt/rabbitmq/sbin/../var/log/rabbitmq/rabbit@zhuzhonghua2-fqawb-sasl.log", "db_dir": "/opt/rabbitmq/sbin/../var/lib/rabbitmq/mnesia/rabbit@zhuzhonghua2-fqawb", "config_files": [ "/opt/rabbitmq/sbin/../etc/rabbitmq/rabbitmq.config (not found)" ], "net_ticktime": 60, "enabled_plugins": [ "rabbitmq_management" ], "name": "rabbit@zhuzhonghua2-fqawb", "type": "disc", "running": true } ]
这段json中的mem_used, disk_free, socket_used, proc_used, fd_used分别对应上面监控图中的内存占用,磁盘剩余空间,Socket句柄数,Broker子进程数以及文件句柄数。
下面是一个demo代码,主要使用HttpClient以及jackson来调用相关参数。
相关代码(有点长):
# -*- coding: utf-8 -*- """ File Name rabbitmq_monitor Created on 2018/7/4 @author: jj RabbitMQ 监控信息 """ from six.moves import urllib from util.resource_base import Resource class AdminAPI(Resource): """ The entrypoint for interacting with the RabbitMQ Management HTTP API """ def overview(self): """ Various random bits of information that describe the whole system """ return self._api_get('/api/overview') def get_cluster_name(self): """ Name identifying this RabbitMQ cluster. """ return self._get( url=self.url + '/api/cluster-name', headers=self.headers, auth=self.auth ) def list_nodes(self): """ A list of nodes in the RabbitMQ cluster. """ return self._api_get('/api/nodes') def get_node(self, name, memory=False, binary=False): """ An individual node in the RabbitMQ cluster. Set "memory=true" to get memory statistics, and "binary=true" to get a breakdown of binary memory use (may be expensive if there are many small binaries in the system). """ return self._api_get( url='/api/nodes/{0}'.format(name), params=dict( binary=binary, memory=memory, ), ) def list_extensions(self): """ A list of extensions to the management plugin. """ return self._api_get('/api/extensions') def get_definitions(self): """ The server definitions - exchanges, queues, bindings, users, virtual hosts, permissions and parameters. Everything apart from messages. This method can be used for backing up the configuration of a server or cluster. """ return self._api_get('/api/definitions') def post_definitions(self, data): """ The server definitions - exchanges, queues, bindings, users, virtual hosts, permissions and parameters. Everything apart from messages. POST to upload an existing set of definitions. Note that: - The definitions are merged. Anything already existing on the server but not in the uploaded definitions is untouched. - Conflicting definitions on immutable objects (exchanges, queues and bindings) will cause an error. - Conflicting definitions on mutable objects will cause the object in the server to be overwritten with the object from the definitions. - In the event of an error you will be left with a part-applied set of definitions. This method can be used for restoring the configuration of a server or cluster. :param data: The definitions for a RabbitMQ server :type data: dict """ self._api_post('/api/definitions', data=data) def list_connections(self): """ A list of all open connections. """ return self._api_get('/api/connections') def get_connection(self, name): """ An individual connection. :param name: The connection name :type name: str """ return self._api_get('/api/connections/{0}'.format( urllib.parse.quote_plus(name) )) def delete_connection(self, name, reason=None): """ Closes an individual connection. Give an optional reason :param name: The connection name :type name: str :param reason: An option reason why the connection was deleted :type reason: str """ headers = {'X-Reason': reason} if reason else {} self._api_delete( '/api/connections/{0}'.format( urllib.parse.quote_plus(name) ), headers=headers, ) def list_connection_channels(self, name): """ List of all channels for a given connection. :param name: The connection name :type name: str """ return self._api_get('/api/connections/{0}/channels'.format( urllib.parse.quote_plus(name) )) def list_channels(self): """ A list of all open channels. """ return self._api_get('/api/channels') def get_channel(self, name): """ Details about an individual channel. :param name: The channel name :type name: str """ return self._api_get('/api/channels/{0}'.format( urllib.parse.quote_plus(name) )) def list_consumers(self): """ A list of all consumers. """ return self._api_get('/api/consumers') def list_consumers_for_vhost(self, vhost): """ A list of all consumers in a given virtual host. :param vhost: The vhost name :type vhost: str """ return self._api_get('/api/consumers/{0}'.format( urllib.parse.quote_plus(vhost) )) def list_exchanges(self): """ A list of all exchanges. """ return self._api_get('/api/exchanges') def list_exchanges_for_vhost(self, vhost): """ A list of all exchanges in a given virtual host. :param vhost: The vhost name :type vhost: str """ return self._api_get('/api/exchanges/{0}'.format( urllib.parse.quote_plus(vhost) )) def get_exchange_for_vhost(self, exchange, vhost): """ An individual exchange :param exchange: The exchange name :type exchange: str :param vhost: The vhost name :type vhost: str """ return self._api_get('/api/exchanges/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(exchange) )) def create_exchange_for_vhost(self, exchange, vhost, body): """ Create an individual exchange. The body should look like: :: { "type": "direct", "auto_delete": false, "durable": true, "internal": false, "arguments": {} } The type key is mandatory; other keys are optional. :param exchange: The exchange name :type exchange: str :param vhost: The vhost name :type vhost: str :param body: A body for the exchange. :type body: dict """ self._api_put( '/api/exchanges/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(exchange)), data=body ) def delete_exchange_for_vhost(self, exchange, vhost, if_unused=False): """ Delete an individual exchange. You can add the parameter ``if_unused=True``. This prevents the delete from succeeding if the exchange is bound to a queue or as a source to another exchange. :param exchange: The exchange name :type exchange: str :param vhost: The vhost name :type vhost: str :param if_unused: Set to ``True`` to only delete if it is unused :type if_unused: bool """ self._api_delete( '/api/exchanges/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(exchange)), params={ 'if-unused': if_unused }, ) def list_bindings(self): """ A list of all bindings. """ return self._api_get('/api/bindings') def list_bindings_for_vhost(self, vhost): """ A list of all bindings in a given virtual host. :param vhost: The vhost name :type vhost: str """ return self._api_get('/api/bindings/{}'.format( urllib.parse.quote_plus(vhost) )) def list_vhosts(self): """ A list of all vhosts. """ return self._api_get('/api/vhosts') def get_vhost(self, name): """ Details about an individual vhost. :param name: The vhost name :type name: str """ return self._api_get('/api/vhosts/{0}'.format( urllib.parse.quote_plus(name) )) def delete_vhost(self, name): """ Delete a vhost. :param name: The vhost name :type name: str """ self._api_delete('/api/vhosts/{0}'.format( urllib.parse.quote_plus(name) )) def create_vhost(self, name, tracing=False): """ Create an individual vhost. :param name: The vhost name :type name: str :param tracing: Set to ``True`` to enable tracing :type tracing: bool """ data = {'tracing': True} if tracing else {} self._api_put( '/api/vhosts/{0}'.format(urllib.parse.quote_plus(name)), data=data, ) def list_users(self): """ A list of all users. """ return self._api_get('/api/users') def get_user(self, name): """ Details about an individual user. :param name: The user's name :type name: str """ return self._api_get('/api/users/{0}'.format( urllib.parse.quote_plus(name) )) def delete_user(self, name): """ Delete a user. :param name: The user's name :type name: str """ self._api_delete('/api/users/{0}'.format( urllib.parse.quote_plus(name) )) def create_user(self, name, password, password_hash=None, tags=None): """ Create a user :param name: The user's name :type name: str :param password: The user's password. Set to "" if no password is desired. Takes precedence if ``password_hash`` is also set. :type password: str :param password_hash: An optional password hash for the user. :type password_hash: str :param tags: A list of tags for the user. Currently recognised tags are "administrator", "monitoring" and "management". If no tags are supplied, the user will have no permissions. :type tags: list of str """ data = { 'tags': ', '.join(tags or []) } if password: data['password'] = password elif password_hash: data['password_hash'] = password_hash else: data['password_hash'] = "" self._api_put( '/api/users/{0}'.format(urllib.parse.quote_plus(name)), data=data, ) def list_user_permissions(self, name): """ A list of all permissions for a given user. :param name: The user's name :type name: str """ return self._api_get('/api/users/{0}/permissions'.format( urllib.parse.quote_plus(name) )) def whoami(self): """ Details of the currently authenticated user. """ return self._api_get('/api/whoami') def list_permissions(self): """ A list of all permissions for all users. """ return self._api_get('/api/permissions') def get_user_permission(self, vhost, name): """ An individual permission of a user and virtual host. :param vhost: The vhost name :type vhost: str :param name: The user's name :type name: str """ return self._api_get('/api/permissions/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(name) )) def delete_user_permission(self, name, vhost): """ Delete an individual permission of a user and virtual host. :param name: The user's name :type name: str :param vhost: The vhost name :type vhost: str """ self._api_delete('/api/permissions/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(name) )) def create_user_permission(self, name, vhost, configure=None, write=None, read=None): """ Create a user permission :param name: The user's name :type name: str :param vhost: The vhost to assign the permission to :type vhost: str :param configure: A regex for the user permission. Default is ``.*`` :type configure: str :param write: A regex for the user permission. Default is ``.*`` :type write: str :param read: A regex for the user permission. Default is ``.*`` :type read: str """ data = { 'configure': configure or '.*', 'write': write or '.*', 'read': read or '.*', } self._api_put( '/api/permissions/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(name) ), data=data ) def list_policies(self): """ A list of all policies """ return self._api_get('/api/policies') def list_policies_for_vhost(self, vhost): """ A list of all policies for a vhost. """ return self._api_get('/api/policies/{0}'.format( urllib.parse.quote_plus(vhost) )) def get_policy_for_vhost(self, vhost, name): """ Get a specific policy for a vhost. :param vhost: The virtual host the policy is for :type vhost: str :param name: The name of the policy :type name: str """ return self._api_get('/api/policies/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(name), )) def create_policy_for_vhost( self, vhost, name, definition, pattern=None, priority=0, apply_to='all'): """ Create a policy for a vhost. :param vhost: The virtual host the policy is for :type vhost: str :param name: The name of the policy :type name: str :param definition: The definition of the policy. Required :type definition: dict :param priority: The priority of the policy. Defaults to 0 :param pattern: The pattern of resource names to apply the policy to :type pattern: str :type priority: int :param apply_to: What resource type to apply the policy to. Usually "exchanges", "queues", or "all". Defaults to "all" :type apply_to: str Example :: # Makes all queues and exchanges on vhost "/" highly available >>> api.create_policy_for_vhost( ... vhost="/", ... name="ha-all", ... definition={"ha-mode": "all"}, ... pattern="", ... apply_to="all") """ data = { "pattern": pattern, "definition": definition, "priority": priority, "apply-to": apply_to } self._api_put( '/api/policies/{0}/{1}'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(name), ), data=data, ) def delete_policy_for_vhost(self, vhost, name): """ Delete a specific policy for a vhost. :param vhost: The virtual host of the policy :type vhost: str :param name: The name of the policy :type name: str """ self._api_delete('/api/policies/{0}/{1}/'.format( urllib.parse.quote_plus(vhost), urllib.parse.quote_plus(name), )) def is_vhost_alive(self, vhost): """ Declares a test queue, then publishes and consumes a message. Intended for use by monitoring tools. :param vhost: The vhost name to check :type vhost: str """ return self._api_get('/api/aliveness-test/{0}'.format( urllib.parse.quote_plus(vhost) ))