2024年5月3日发(作者:)
KafkaListenerContainerFactory
在 Spring for Apache Kafka 中是一个关键
组件,它用于配置和创建监听器容器,这些容器负责从 Apache Kafka 主题中
消费消息。当您在 Spring Boot 应用程序中使用
@KafkaListener
注解来声明
一个消息处理器方法时,这个工厂将会被用来创建实际处理消息的监听器容
器。
以下是一个详细的使用步骤和分析:
1. 引入依赖与配置
首先确保项目中包含了 Spring Kafka 以及相关 Kafka 客户端的依赖。例如,
在 Maven 或 Gradle 构建脚本中引入 Spring Kafka 依赖:
Xml
1
2
3
4
5
6
7
8
9
implementation ':spring-kafka:最
新版本号'
2. 配置
KafkaListenerContainerFactory
在 Spring Boot 应用程序中,通常会通过 Java 配置或 YAML 配置文件来配
置
KafkaListenerContainerFactory
。以下是一个基于 Java 配置的例子:
Java
1
import erConfig;
2
import ;
3
import
uration;
4
import
rentKafkaListenerC
ontainerFactory;
5
import erFactory;
6
import
tKafkaConsumerFactor
y;
7
8
@Configuration
9
public class KafkaConsumerConfig {
10
11
@Value("${rap-servers}")
12
private String bootstrapServers;
13
14
@Bean
15
public ConsumerFactory
consumerFactory() {
16
Map
HashMap<>();
17
(RAP_SERVERS_CONFIG,
bootstrapServers);
18
(_ID_CONFIG,
"my-consumer-group");
19
(_DESERIALIZER_CLASS_CON
FIG, );
20
(_DESERIALIZER_CLASS_C
ONFIG, );
21
return new
DefaultKafkaConsumerFactory<>(configProps);
22
}
23
24
@Bean
25
public
ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory() {
26
ConcurrentKafkaListenerContainerFactory String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 27 sumerFactory(consumerFactory()); 28 29 // 可以进一步配置如下: 30 // 设置并发消费者数量 31 currency(3); 32 33 // 开启批量消费 34 chListener(true); 35 36 // 设置容器属性,如轮询超时 37 ContainerProperties containerProps = tainerProperties(); 38 lTimeout(3000L); 39 40 return factory; 41 } 42 } 3. 使用 @KafkaListener 注解 现在可以定义一个服务类,其中包含一个使用 @KafkaListener 注解的方法来监 听指定的主题: Java 1 import istener; 2 import e; 3 4 @Service 5 public class KafkaMessageService { 6 7 @KafkaListener(topics = "my-topic") 8 public void listen(String message) { 9 n("Received message: " + message); 10 } 11 12 // 如果开启了批量消费,则接收参数应为 List 13 @KafkaListener(topics = "my-batch-topic", batch = true) 14 public void listenBatch(List 15 h(message -> n("Received batch message: " + message)); 16 } 17 } 这样,当 Kafka 代理中有新的消息发布到“my-topic”主题时, listen 方法就会 被调用来处理消息。如果配置了批量消费, listenBatch 方法则会一次性处理 多个消息。 总结起来, KafkaListenerContainerFactory 是 Kafka 消费者配置的核心部 分,它允许我们对消费者的行为进行细粒度的控制,包括但不限于消费者的并 发数、是否批量消费、消费超时等配置。同时,结合 @KafkaListener 注解, Spring Kafka 提供了一种非常方便的方式来集成 Apache Kafka 的消息消费逻 辑到 Spring 应用程序中。


发布评论