Active MQ
Table of Contents
1. Active MQ
Active MQ 是老牌的消息中间件,它支持的协议比较多,如:AMQP、AUTO、MQTT、OpenWire、REST、RSS and Atom、Stomp、WSIF、WS Notification、XMPP 。
1.1. 安装
ActiveMQ 的 Docker 安装方式如下:
$ docker pull rmohr/activemq $ docker run -p 61613:61613 -p 8161:8161 rmohr/activemq
1.2. 客户端
Golang Stomp 客户端实例:https://github.com/go-stomp/stomp/blob/master/examples/client_test/main.go
上面的客户端没有实现连接池,要使用连接池,可以参考:https://github.com/silenceper/pool
2. Topic VS. Queue
Active MQ 中,Topic 和 Queue 的最大区别在于 Topic 是以广播的形式,通知所有在线监听的客户端有新的消息(一对多的模式),没有监听的客户端将收不到消息;而 Queue 则是以点对点的形式通知多个处于监听状态的客户端中的一个。
Topic 的工作模式如图 1 所示。
Figure 1: Topic 的工作模式(一对多)
Queue 的工作模式如图 2 所示。
Figure 2: Queue 的工作模式(一对一)
Queue/Topic 的不同设置,可实现不同的消息安全级别,如表 1 所示。
消息类型 | 消息是否持久化 | 消息是否有 Durable 订阅者 | 消费者延迟启动时,消息是否保留 | Broker 重启时,消息是否保留 |
---|---|---|---|---|
Queue | N | - | Y | N |
Queue | Y | - | Y | Y |
Topic | N | N | N | N |
Topic | N | Y | Y | N |
Topic | Y | N | N | N |
Topic | Y | Y | Y | Y |
3. 延迟消息
要启用延迟消息功能,请在配置文件 activemq.xml 中设置 broker 的 schedulerSupport
属性为 "true"
,如:
<broker schedulerSupport="true">
在发送消息时指定 AMQ_SCHEDULED_DELAY
属性为想要延迟发送的的毫秒数,这样 Active MQ 收到消息后不会马上投递到 topic/queue 中,而是到达指定时间后才投递。
可以在 Active MQ Console(如图 3)的“Scheduled”页面下看到延迟发送的消息。
Figure 3: Active MQ Console
参考:https://activemq.apache.org/delay-and-schedule-message-delivery
4. 重新投递
重新投递分两种:一是 consumer 自己重新投递(客户端机制,这需要 client 库支持),一是 broker 重新投递(服务端机制)。
要启用 broker 重新投递机制,请在 activemq.xml 中设置下面代码:
<plugins> <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true"> <redeliveryPolicyMap> <redeliveryPolicyMap> <redeliveryPolicyEntries> <!-- a destination specific policy --> <redeliveryPolicy queue="SpecialQueue" maximumRedeliveries="4" redeliveryDelay="10000"/> </redeliveryPolicyEntries> <defaultEntry> <!-- the fallback policy for all other destinations --> <redeliveryPolicy maximumRedeliveries="10" initialRedeliveryDelay="5000" redeliveryDelay="10000"/> </defaultEntry> </redeliveryPolicyMap> </redeliveryPolicyMap> </redeliveryPlugin> </plugins>
参考:http://activemq.apache.org/message-redelivery-and-dlq-handling.html
4.1. 死信队列
如果 broker 投递给消费者消息, 消息者没有 ACK 或 NACK, 则会触发重新投递, 投递超过一定次数则会进入死信队列(Dead Letter Queue), 默认只有一个公共的死信队列 ActiveMQ.DLQ, 这不方便调试,往往我们给不同的 topic/queue 分别设置死信队列,这可以通过修改 activemq.xml 来实现,关键配置如下:
<policyEntry queue=">" > <deadLetterStrategy> <!-- Use the prefix 'DLQ.' for the destination name, and make the DLQ a queue rather than a topic --> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry>
5. 保证消息可靠
消息队列的消息可靠性可以从下面三点来保证:
1、发送消息时,要等待服务器返回 Acknowledgement 后,发送方才认为发送成功。
2、发送消息时,设置消息的持久化属性,这样就算 broker 突然宕机也不会丢失消息。
3、订阅消息时,启用消费确认功能。即客户端完成对应业务逻辑后,才向服务器发送 Ack 消息。
如果客户端是使用 golang stomp 库,那么上面三点都有相应的配置来实现,如:
sub, err := stompConn.Subscribe("my_queue1", stomp.AckClientIndividual) // 保证消息可靠,第3点 ... if err := stompConn.Send("my_queue1", "text/plain", msg, stomp.SendOpt.Receipt, // 保证消息可靠,第1点 stomp.SendOpt.Header("persistent", "true")); err != nil { // 保证消息可靠,第2点 return err }