Active MQ

Table of Contents

1. Active MQ

Active MQ 是老牌的消息中间件,它支持的协议比较多,如:AMQPAUTOMQTTOpenWireRESTRSS and AtomStompWSIFWS NotificationXMPP

1.1. 安装

ActiveMQ 的 Docker 安装方式如下:

$ docker pull rmohr/activemq
$ docker run -p 61613:61613 -p 8161:8161 rmohr/activemq

1.2. 客户端

Golang Stomp 客户端实例:https://github.com/go-stomp/stomp/blob/master/examples/client_test/main.go
上面的客户端没有实现连接池,要使用连接池,可以参考:https://github.com/silenceper/pool

2. Topic VS. Queue

Active MQ 中,Topic 和 Queue 的最大区别在于 Topic 是以广播的形式,通知所有在线监听的客户端有新的消息(一对多的模式),没有监听的客户端将收不到消息;而 Queue 则是以点对点的形式通知多个处于监听状态的客户端中的一个。

Topic 的工作模式如图 1 所示。

active_mq_topic.gif

Figure 1: Topic 的工作模式(一对多)

Queue 的工作模式如图 2 所示。

active_mq_queue.png

Figure 2: Queue 的工作模式(一对一)

Queue/Topic 的不同设置,可实现不同的消息安全级别,如表 1 所示。

Table 1: Queue/Topic 的不同设置,可实现不同的消息安全级别
消息类型 消息是否持久化 消息是否有 Durable 订阅者 消费者延迟启动时,消息是否保留 Broker 重启时,消息是否保留
Queue N - Y N
Queue Y - Y Y
Topic N N N N
Topic N Y Y N
Topic Y N N N
Topic Y Y Y Y

3. 延迟消息

要启用延迟消息功能,请在配置文件 activemq.xml 中设置 broker 的 schedulerSupport 属性为 "true" ,如:

<broker schedulerSupport="true">

在发送消息时指定 AMQ_SCHEDULED_DELAY 属性为想要延迟发送的的毫秒数,这样 Active MQ 收到消息后不会马上投递到 topic/queue 中,而是到达指定时间后才投递。

可以在 Active MQ Console(如图 3)的“Scheduled”页面下看到延迟发送的消息。

active_mq_console.gif

Figure 3: Active MQ Console

参考:https://activemq.apache.org/delay-and-schedule-message-delivery

4. 重新投递

重新投递分两种:一是 consumer 自己重新投递(客户端机制,这需要 client 库支持),一是 broker 重新投递(服务端机制)。

要启用 broker 重新投递机制,请在 activemq.xml 中设置下面代码:

        <plugins>
            <redeliveryPlugin fallbackToDeadLetter="true"
                              sendToDlqIfMaxRetriesExceeded="true">
                <redeliveryPolicyMap>
                    <redeliveryPolicyMap>
                        <redeliveryPolicyEntries>
                            <!-- a destination specific policy -->
                            <redeliveryPolicy queue="SpecialQueue"
                                              maximumRedeliveries="4"
                                              redeliveryDelay="10000"/>
                        </redeliveryPolicyEntries>

                        <defaultEntry>
                            <!-- the fallback policy for all other destinations -->
                            <redeliveryPolicy maximumRedeliveries="10"
                                              initialRedeliveryDelay="5000"
                                              redeliveryDelay="10000"/>
                        </defaultEntry>
                    </redeliveryPolicyMap>
                </redeliveryPolicyMap>
            </redeliveryPlugin>
        </plugins>

参考:http://activemq.apache.org/message-redelivery-and-dlq-handling.html

4.1. 死信队列

如果 broker 投递给消费者消息, 消息者没有 ACK 或 NACK, 则会触发重新投递, 投递超过一定次数则会进入死信队列(Dead Letter Queue), 默认只有一个公共的死信队列 ActiveMQ.DLQ, 这不方便调试,往往我们给不同的 topic/queue 分别设置死信队列,这可以通过修改 activemq.xml 来实现,关键配置如下:

                <policyEntry queue=">" >
                  <deadLetterStrategy>
                      <!--
                          Use the prefix 'DLQ.' for the destination name, and make
                          the DLQ a queue rather than a topic
                      -->
                      <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
                  </deadLetterStrategy>
                </policyEntry>

5. 保证消息可靠

消息队列的消息可靠性可以从下面三点来保证:
1、发送消息时,要等待服务器返回 Acknowledgement 后,发送方才认为发送成功。
2、发送消息时,设置消息的持久化属性,这样就算 broker 突然宕机也不会丢失消息。
3、订阅消息时,启用消费确认功能。即客户端完成对应业务逻辑后,才向服务器发送 Ack 消息。

如果客户端是使用 golang stomp 库,那么上面三点都有相应的配置来实现,如:

sub, err := stompConn.Subscribe("my_queue1", stomp.AckClientIndividual)  // 保证消息可靠,第3点

...
if err := stompConn.Send("my_queue1", "text/plain", msg,
		stomp.SendOpt.Receipt,                                     // 保证消息可靠,第1点
		stomp.SendOpt.Header("persistent", "true")); err != nil {  // 保证消息可靠,第2点

	return err
}

Author: cig01

Created: <2018-12-18 Tue>

Last updated: <2020-11-05 Thu>

Creator: Emacs 27.1 (Org mode 9.4)