参考
基于 RabbitMQ 的实时消息推送:https://developer.ibm.com/zh/technologies/messaging/articles/os-cn-rabbit-mq/
预置 exchange 名称
Name |
Default pre declared names |
说明 |
Direct exchange |
amq.direct |
完全根据 key 进行投递,只有 key 与绑定时的 routing key 完全一致的消息才会收到消息。也可以都不指定 key |
Fanount exchange |
amq.fanout |
完全不关心 key,直接采取广播的方式进行消息投递,与该交换机绑定的所有队列都会收到消息 |
Topic exchange |
amq.topic |
会根据 key 进行模式匹配然后进行投递,与设置的 routing key 匹配上的队列才能收到消息 |
Heades exchange |
amq.headers |
使用消息头代替 routing key 作为关键字进行路由,不过在实际应用过程中这种类型的 exchange 使用较少 |
安装RabbitMQ并开通 STOMP 通道「Websocket」
参考 《RabbitMQ环境部署》、《RabbitMQ-Spring》
后台工具类-生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.bjtcrj.scm.common.utils;
import com.bjtcrj.scm.common.exception.ApplicationRuntimeException; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component @Slf4j public class RabbitUtil { public static final String SCM_YJHL = "scm_yjhl";
@Autowired private RabbitTemplate rabbitTemplate;
public void sendExchange(String routingkey, String msg) throws ApplicationRuntimeException { try { if(StringEx.isNull(routingkey)) { return; } if ("".equals(msg) || msg==null){ return; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); rabbitTemplate.send(SCM_YJHL, routingkey, new Message(msg.getBytes("UTF-8"),messageProperties)); log.info("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); log.error("RabbitMQ send error!"); } } }
|
前端-消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| <!DOCTYPE HTML> <html> <head> <title>RabbitMQ + WebSocket Exchange 测试</title> <meta charset="UTF-8"> </head>
<body> <div id="receiveMsg" style="width: 500px; height: 400px;"> </div> <input type="text" id="sendMsg"> <button type="button" id="sendBtn">发送</button> <button type="button" id="leaveBtn">断开</button> </body> <script src="http://www.jq22.com/jquery/1.7.2/jquery.min.js"></script> <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script> <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.js"></script> <script type="text/javascript">
if (typeof WebSocket == 'undefined') { console.log('不支持websocket') }
var url = "ws://127.0.0.1:15674/ws"; const queuename_send = "/exchange/scm_yjhl/willaccept"; const queuename_subscribe = "/exchange/scm_yjhl/willaccept";
var client = Stomp.client(url);
var on_connect = function (x) { client.subscribe(queuename_subscribe, function (data) { console.log("收到数据:" + JSON.stringify(data)); var msg = data.body; $('#receiveMsg').append("<p>对方:" + msg + "</p>"); data.ack(); }, {ack: 'client'}); };
var on_error = function (error) { console.log(error.headers.message); };
client.connect('admin', '123456', on_connect, on_error, '/');
$("#leaveBtn").click(function () { client.disconnect(function () { console.log("See you next time!"); }); });
$("#sendBtn").click(function () { var sendMsg = $("#sendMsg").val(); $('#receiveMsg').append("<p>我自己:" + sendMsg + "</p>"); $("#sendMsg").val(''); client.send(queuename_send, {priority: 9}, sendMsg); });
$(document).keydown(function (event) { if (event.keyCode == 13) { $("#sendBtn").click(); } }); </script> </html>
|
上面提到的 destination 在 RabbitMQ Web STOM 中进行了相关的定义,根据使用场景的不同,主要有以下 4 种:
/exchange/{exchangename}/{routingKey}
exchangename 必须已存在
对于 SUBCRIBE frame,destination 一般为/exchange/exchangename/[/pattern] 的形式。该 destination 会创建一个唯一的、自动删除的、名为queue,并根据 pattern 将该 queue 绑定到所给的 exchange,实现对该队列的消息订阅。
对于 SEND frame,destination 一般为/exchange/exchangename/[/routingKey] 的形式。这种情况下消息就会被发送到定义的 exchange 中,并且指定了 routingKey
/topic/{routingKey}
对于 SUBCRIBE frame,destination 创建出自动删除的、非持久的 queue 并根据 routingkey 为绑定到 amq.topic exchange 上,同时实现对该 queue 的订阅。
对于 SEND frame,消息会被发送到 amq.topic exchange 中
/queue/{queuename}
queuename 可以不存在,订阅后会被自动创建出来
对于 SUBCRIBE frame,destination 会定义的共享 queue「routingKey」,并且实现对该队列的消息订阅。
对于 SEND frame,destination 只会在第一次发送消息的时候会定义的共享 queue。该消息会被发送到默认的 exchange 中,routingKey 即为 /{routingKey},消息最终发送到queuename队列中
/amq/queue/{queuename}
queuename 必须已存在
这种情况下无论是 SUBCRIBE frame 还是 SEND frame 都不会产生 queue。但如果该 queue 不存在,SUBCRIBE frame 会报错。
对于 SUBCRIBE frame,destination 会实现对队列的消息订阅。
对于 SEND frame,消息会通过默认的 exhcange 直接被发送到队列中。
总结:前两种适用于「一对多通信」,不需要确认。后两种适用于「点对点通信」,一般需要手动确认