参考
https://my.oschina.net/mdxlcj/blog/3096142
https://www.dazhuanlan.com/2019/08/22/5d5dfd4320a5b/
pom.xml
1 2 3 4 5
| <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency>
|
rabitmq.properties
1 2 3 4 5 6 7 8 9 10 11 12 13
| mq.host=127.0.0.1 mq.username=admin mq.virtual-host=/ mq.password=123456 mq.port=5672
mq.channel-cache-size=50 mq.concurrentConsumers=3 mq.maxConcurrentConsumers=10
mq.acknowledgeMode=MANUAL
mq.task-executor.pool-size=100
|
spring.xml
1 2
| <context:property-placeholder location="classpath:rabitmq.properties" ignore-unresolvable="true" /> <import resource="spring-rabbitmq.xml"/>
|
spring-rabbitmq.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<bean name="rabbitUtil" class="com.bjtcrj.gms.common.utils.RabbitUtil"></bean> <bean name="queueUtils" class="com.bjtcrj.gms.event.rabbitmq.QueueUtils"></bean> <rabbit:annotation-driven/>
<rabbit:connection-factory id="connectionFactorymq" host="${mq.host}" port="${mq.port}" virtual-host="${mq.virtual-host}" username="${mq.username}" password="${mq.password}"/> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactorymq" message-converter="jsonMessageConverter" />
<rabbit:admin connection-factory="connectionFactorymq" auto-startup="true" />
<bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactorymq"/> <property name="concurrentConsumers" value="${mq.concurrentConsumers}"/> <property name="maxConcurrentConsumers" value="${mq.maxConcurrentConsumers}"/> <property name="messageConverter" ref="jsonMessageConverter"/> <property name="taskExecutor"> <task:executor id="amqpTaskExecutor" pool-size="${mq.task-executor.pool-size}"/> </property> <property name="acknowledgeMode" value="${mq.acknowledgeMode}"/> </bean> <context:component-scan base-package="com.bjtcrj.scm.event.rabbitmq" /> </beans>
|
队列初始化工具类——QueueUtils.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean;
@Component public class QueueUtils { public static final String GMS_EVENT_QUEUE_BACK = "gms_event_queue_back"; public static final String GMS_EVENT_QUEUE_DONE_FEEDBACK = "gms_event_queue_done_feedback";
@Bean public Queue directQueue1() { return new Queue(GMS_EVENT_QUEUE_BACK); } @Bean public Queue directQueue2() { return new Queue(GMS_EVENT_QUEUE_DONE_FEEDBACK); } }
|
生产者 RecordProducer
1 2 3 4 5 6 7 8 9 10 11 12 13
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class RecordProducer { @Autowired private RabbitTemplate rabbitTemplate;
public void send(Record record) { rabbitTemplate.convertAndSend(QueueUtils.GMS_EVENT_QUEUE_RECORD, record); } }
|
消费者 RecordConsumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException;
@Component public class RecordConsumer {
@RabbitListener(queues = QueueUtils.SCM_EVENT_QUEUE_RECORD) public void receive(Message msg, Record record, Channel channel) throws IOException { System.out.println("scm_event_record_queue: " + record.getTasknum());
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
} }
|
消息实体类 Record
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class Record implements Serializable { private String tasknum; private String eventtype; private String maintype; private String processtime; private String Case_grade; private String street_code; private String streetname; private String community_code; private String communityname; private String grid_code; private String grid_codename; private String casePosDesc; private String caseDesc; private String posX; private String posY; private String registrant; private String username; private String Telephone; private String imgs; private String oprator; private String operator; private String suggestion; private String img_record; }
|
使用
1 2 3 4 5 6 7 8 9 10 11
| @Autowired private RecordProducer recordProducer;
@RequestMapping(value = "/test") public void test(HttpServletRequest request, HttpServletResponse response) throws Exception{ for (int i = 0; i < 10; i++) { Record record = Record.builder().tasknum(i+"").build(); recordProducer.send(record); Thread.sleep(1000); } }
|