原文

【MQ篇】Spring Boot 整合 Kafka 消息队列_九七年生于初夏的博客-CSDN博客

步骤

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1234

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
kafka:
# kafka服务地址
bootstrap-servers: ip:9092
producer:
# 生产者消息key序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 生产者消息value序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 消费者组
group-id: test-consumer-group
# 消费者消息value反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者消息value反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
12345678910111213141516

添加消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* 消费者
* @author Strive
* @date 2022/4/29 10:38
*/
@Component
@Slf4j
@KafkaListener(
topics = {"mingyue"},
groupId = "test-consumer-group")
public class MsgConsumer {

@KafkaHandler
public void receive(String message) {
log.info("消费者接收到的消息是:" + message);
}
}
12345678910111213141516171819202122

添加生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 生产者
* @author Strive
* @date 2022/4/29 10:39
*/
@RestController
public class MsgProducerController {

@Autowired private KafkaTemplate kafkaTemplate;

@RequestMapping("/send")
public void send(String message) {
kafkaTemplate.send("mingyue", message);
}
}

测试

接口方法:http://localhost:8080/send?message=111

后台日志:

1
11:06:21.920  INFO 18036 --- [ntainer#0-0-C-1] c.c.mingyue.kafka.consumer.MsgConsumer   : 消费者接收到的消息是:111