参考

基于 RabbitMQ 的实时消息推送:https://developer.ibm.com/zh/technologies/messaging/articles/os-cn-rabbit-mq/

预置 exchange 名称

Name Default pre declared names 说明
Direct exchange amq.direct 完全根据 key 进行投递,只有 key 与绑定时的 routing key 完全一致的消息才会收到消息。也可以都不指定 key
Fanount exchange amq.fanout 完全不关心 key,直接采取广播的方式进行消息投递,与该交换机绑定的所有队列都会收到消息
Topic exchange amq.topic 会根据 key 进行模式匹配然后进行投递,与设置的 routing key 匹配上的队列才能收到消息
Heades exchange amq.headers 使用消息头代替 routing key 作为关键字进行路由,不过在实际应用过程中这种类型的 exchange 使用较少

安装RabbitMQ并开通 STOMP 通道「Websocket」

参考 《RabbitMQ环境部署》、《RabbitMQ-Spring》

后台工具类-生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.bjtcrj.scm.common.utils;

import com.bjtcrj.scm.common.exception.ApplicationRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class RabbitUtil {
public static final String SCM_YJHL = "scm_yjhl";

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendExchange(String routingkey, String msg) throws ApplicationRuntimeException {
try {
if(StringEx.isNull(routingkey)) {
return;
}
if ("".equals(msg) || msg==null){
return;
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
// 3.exchangename 为空时默认exchange(AMQP default)
rabbitTemplate.send(SCM_YJHL, routingkey, new Message(msg.getBytes("UTF-8"),messageProperties));
log.info("product send a msg: " + msg);
} catch (IOException e) {
e.printStackTrace();
log.error("RabbitMQ send error!");
}
}
}

前端-消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<!DOCTYPE HTML>
<html>
<head>
<title>RabbitMQ + WebSocket Exchange 测试</title>
<meta charset="UTF-8">
</head>

<body>
<div id="receiveMsg" style="width: 500px; height: 400px;">
</div>
<input type="text" id="sendMsg">
<button type="button" id="sendBtn">发送</button>
<button type="button" id="leaveBtn">断开</button>
</body>
<script src="http://www.jq22.com/jquery/1.7.2/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.js"></script>
<script type="text/javascript">

if (typeof WebSocket == 'undefined') {
console.log('不支持websocket')
}

// 获得Stomp client对象
var url = "ws://127.0.0.1:15674/ws";
const queuename_send = "/exchange/scm_yjhl/willaccept";
//收到数据后会自动确认-适用于「一对多通信」
//exchangename 必须已存在
const queuename_subscribe = "/exchange/scm_yjhl/willaccept";
// const queuename_subscribe = "/exchange/rabbitmq_direct";
// const queuename_subscribe = "/exchange/rabbitmq_direct/123";
// const queuename_subscribe = "/exchange/amq.direct/123";
// const queuename_subscribe = "/exchange/amq.fanout";
// const queuename_subscribe = "/exchange/amq.topic";
// const queuename_subscribe = "/topic/123";

//收到数据后需要手动确认-适用于「点对点通信」
//queue 222 可以不存在
// const queuename_subscribe = "/queue/222";
//queue 666 必须存在
// const queuename_subscribe = "/amq/queue/666";
var client = Stomp.client(url);

// 定义连接成功回调函数
var on_connect = function (x) {
//data.body是接收到的数据
client.subscribe(queuename_subscribe, function (data) {
console.log("收到数据:" + JSON.stringify(data));
var msg = data.body;
$('#receiveMsg').append("<p>对方:" + msg + "</p>");
//确认收到消息:确认后销毁
data.ack();
}, {ack: 'client'});
};

// 定义错误时回调函数
var on_error = function (error) {
console.log(error.headers.message);
};

// 连接RabbitMQ
client.connect('admin', '123456', on_connect, on_error, '/');

$("#leaveBtn").click(function () {
client.disconnect(function () {
console.log("See you next time!");
});
});

$("#sendBtn").click(function () {
var sendMsg = $("#sendMsg").val();
$('#receiveMsg').append("<p>我自己:" + sendMsg + "</p>");
$("#sendMsg").val('');
client.send(queuename_send, {priority: 9}, sendMsg);
});

$(document).keydown(function (event) {
if (event.keyCode == 13) {
$("#sendBtn").click();
}
});
</script>
</html>

上面提到的 destination 在 RabbitMQ Web STOM 中进行了相关的定义,根据使用场景的不同,主要有以下 4 种:

  1. /exchange/{exchangename}/{routingKey}

    • exchangename 必须已存在

    • 对于 SUBCRIBE frame,destination 一般为/exchange/exchangename/[/pattern] 的形式。该 destination 会创建一个唯一的、自动删除的、名为queue,并根据 pattern 将该 queue 绑定到所给的 exchange,实现对该队列的消息订阅。

      对于 SEND frame,destination 一般为/exchange/exchangename/[/routingKey] 的形式。这种情况下消息就会被发送到定义的 exchange 中,并且指定了 routingKey

  2. /topic/{routingKey}

    对于 SUBCRIBE frame,destination 创建出自动删除的、非持久的 queue 并根据 routingkey 为绑定到 amq.topic exchange 上,同时实现对该 queue 的订阅。

    对于 SEND frame,消息会被发送到 amq.topic exchange 中

  3. /queue/{queuename}

    • queuename 可以不存在,订阅后会被自动创建出来

    • 对于 SUBCRIBE frame,destination 会定义的共享 queue「routingKey」,并且实现对该队列的消息订阅。

      对于 SEND frame,destination 只会在第一次发送消息的时候会定义的共享 queue。该消息会被发送到默认的 exchange 中,routingKey 即为 /{routingKey},消息最终发送到queuename队列中

  4. /amq/queue/{queuename}

    • queuename 必须已存在

    • 这种情况下无论是 SUBCRIBE frame 还是 SEND frame 都不会产生 queue。但如果该 queue 不存在,SUBCRIBE frame 会报错。

      对于 SUBCRIBE frame,destination 会实现对队列的消息订阅。

      对于 SEND frame,消息会通过默认的 exhcange 直接被发送到队列中。

总结:前两种适用于「一对多通信」,不需要确认。后两种适用于「点对点通信」,一般需要手动确认