原文
【MQ篇】Spring Boot 整合 Kafka 消息队列_九七年生于初夏的博客-CSDN博客
步骤
添加依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1234
|
修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| spring: kafka: bootstrap-servers: ip:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test-consumer-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 12345678910111213141516
|
添加消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
@Component @Slf4j @KafkaListener( topics = {"mingyue"}, groupId = "test-consumer-group") public class MsgConsumer {
@KafkaHandler public void receive(String message) { log.info("消费者接收到的消息是:" + message); } } 12345678910111213141516171819202122
|
添加生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController public class MsgProducerController {
@Autowired private KafkaTemplate kafkaTemplate;
@RequestMapping("/send") public void send(String message) { kafkaTemplate.send("mingyue", message); } }
|
测试
接口方法:http://localhost:8080/send?message=111
后台日志:
1
| 11:06:21.920 INFO 18036 --- [ntainer#0-0-C-1] c.c.mingyue.kafka.consumer.MsgConsumer : 消费者接收到的消息是:111
|