参考

https://my.oschina.net/mdxlcj/blog/3096142

https://www.dazhuanlan.com/2019/08/22/5d5dfd4320a5b/

pom.xml

1
2
3
4
5
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.3.RELEASE</version>
</dependency>

rabitmq.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
mq.host=127.0.0.1
mq.username=admin
mq.virtual-host=/
mq.password=123456
mq.port=5672

mq.channel-cache-size=50
mq.concurrentConsumers=3
mq.maxConcurrentConsumers=10
# 确认方式 MANUAL 手动,AUTO 自动,NONE 自动确认
mq.acknowledgeMode=MANUAL
# 线程池数量 = 并发数 * 监听数
mq.task-executor.pool-size=100

spring.xml

1
2
<context:property-placeholder location="classpath:rabitmq.properties" ignore-unresolvable="true" />
<import resource="spring-rabbitmq.xml"/>

spring-rabbitmq.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<bean name="rabbitUtil" class="com.bjtcrj.gms.common.utils.RabbitUtil"></bean>
<bean name="queueUtils" class="com.bjtcrj.gms.event.rabbitmq.QueueUtils"></bean>

<!--启用注解监听消息-->
<rabbit:annotation-driven/>

<!--创建连接工厂-->
<rabbit:connection-factory id="connectionFactorymq"
host="${mq.host}"
port="${mq.port}"
virtual-host="${mq.virtual-host}"
username="${mq.username}"
password="${mq.password}"/>
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

<!-- 定义rabbit template 用于数据的接收和发送 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactorymq" message-converter="jsonMessageConverter" />

<!--通过指定admin信息,当前生产的exchange和queue信息会在admin自动生成-->
<rabbit:admin connection-factory="connectionFactorymq" auto-startup="true" />

<!--消息监听容器,配合注解监听消息-->
<bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactorymq"/>
<!--并发消费者数量-->
<property name="concurrentConsumers" value="${mq.concurrentConsumers}"/>
<!--最大数量-->
<property name="maxConcurrentConsumers" value="${mq.maxConcurrentConsumers}"/>
<!--消息转换-->
<property name="messageConverter" ref="jsonMessageConverter"/>
<!--任务线程池-->
<property name="taskExecutor">
<task:executor id="amqpTaskExecutor" pool-size="${mq.task-executor.pool-size}"/>
</property>
<!--手动确认-->
<property name="acknowledgeMode" value="${mq.acknowledgeMode}"/>
</bean>

<!--消息队列初始化后,扫描消费者绑定队列-->
<context:component-scan base-package="com.bjtcrj.scm.event.rabbitmq" />
</beans>

队列初始化工具类——QueueUtils.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;

@Component
public class QueueUtils {
public static final String GMS_EVENT_QUEUE_BACK = "gms_event_queue_back";
public static final String GMS_EVENT_QUEUE_DONE_FEEDBACK = "gms_event_queue_done_feedback";

@Bean
public Queue directQueue1() {
return new Queue(GMS_EVENT_QUEUE_BACK);
}
@Bean
public Queue directQueue2() {
return new Queue(GMS_EVENT_QUEUE_DONE_FEEDBACK);
}
}

生产者 RecordProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RecordProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void send(Record record) {
rabbitTemplate.convertAndSend(QueueUtils.GMS_EVENT_QUEUE_RECORD, record);
}
}

消费者 RecordConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class RecordConsumer {
/**
* 监听器监听指定的Queue.
*/
@RabbitListener(queues = QueueUtils.SCM_EVENT_QUEUE_RECORD)
public void receive(Message msg, Record record, Channel channel) throws IOException {
System.out.println("scm_event_record_queue: " + record.getTasknum());

//todo

//消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
//ack返回false,并重新回到队列
// channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
//true拒绝消息 false 确认接受到消息
// channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
}
}

消息实体类 Record

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Record implements Serializable {
private String tasknum;
private String eventtype;
private String maintype;
private String processtime;
private String Case_grade;
private String street_code;
private String streetname;
private String community_code;
private String communityname;
private String grid_code;
private String grid_codename;
private String casePosDesc;
private String caseDesc;
private String posX;
private String posY;
private String registrant;
private String username;
private String Telephone;
private String imgs;
private String oprator;
private String operator;
private String suggestion;
private String img_record;
}

使用

1
2
3
4
5
6
7
8
9
10
11
@Autowired
private RecordProducer recordProducer;

@RequestMapping(value = "/test")
public void test(HttpServletRequest request, HttpServletResponse response) throws Exception{
for (int i = 0; i < 10; i++) {
Record record = Record.builder().tasknum(i+"").build();
recordProducer.send(record);
Thread.sleep(1000);
}
}