2024年5月3日发(作者:)

KafkaListenerContainerFactory

在 Spring for Apache Kafka 中是一个关键

组件,它用于配置和创建监听器容器,这些容器负责从 Apache Kafka 主题中

消费消息。当您在 Spring Boot 应用程序中使用

@KafkaListener

注解来声明

一个消息处理器方法时,这个工厂将会被用来创建实际处理消息的监听器容

器。

以下是一个详细的使用步骤和分析:

1. 引入依赖与配置

首先确保项目中包含了 Spring Kafka 以及相关 Kafka 客户端的依赖。例如,

在 Maven 或 Gradle 构建脚本中引入 Spring Kafka 依赖:

Xml

1

2

3

4

spring-kafka

5

最新版本号

6

7

8

9

implementation ':spring-kafka:最

新版本号'

2. 配置

KafkaListenerContainerFactory

在 Spring Boot 应用程序中,通常会通过 Java 配置或 YAML 配置文件来配

KafkaListenerContainerFactory

。以下是一个基于 Java 配置的例子:

Java

1

import erConfig;

2

import ;

3

import

uration;

4

import

rentKafkaListenerC

ontainerFactory;

5

import erFactory;

6

import

tKafkaConsumerFactor

y;

7

8

@Configuration

9

public class KafkaConsumerConfig {

10

11

@Value("${rap-servers}")

12

private String bootstrapServers;

13

14

@Bean

15

public ConsumerFactory

consumerFactory() {

16

Map configProps = new

HashMap<>();

17

(RAP_SERVERS_CONFIG,

bootstrapServers);

18

(_ID_CONFIG,

"my-consumer-group");

19

(_DESERIALIZER_CLASS_CON

FIG, );

20

(_DESERIALIZER_CLASS_C

ONFIG, );

21

return new

DefaultKafkaConsumerFactory<>(configProps);

22

}

23

24

@Bean

25

public

ConcurrentKafkaListenerContainerFactory

kafkaListenerContainerFactory() {

26

ConcurrentKafkaListenerContainerFactory

String> factory = new

ConcurrentKafkaListenerContainerFactory<>();

27

sumerFactory(consumerFactory());

28

29

//

可以进一步配置如下:

30

//

设置并发消费者数量

31

currency(3);

32

33

//

开启批量消费

34

chListener(true);

35

36

//

设置容器属性,如轮询超时

37

ContainerProperties containerProps =

tainerProperties();

38

lTimeout(3000L);

39

40

return factory;

41

}

42

}

3. 使用

@KafkaListener

注解

现在可以定义一个服务类,其中包含一个使用

@KafkaListener

注解的方法来监

听指定的主题:

Java

1

import

istener;

2

import e;

3

4

@Service

5

public class KafkaMessageService {

6

7

@KafkaListener(topics = "my-topic")

8

public void listen(String message) {

9

n("Received message: " +

message);

10

}

11

12

//

如果开启了批量消费,则接收参数应为

List

13

@KafkaListener(topics = "my-batch-topic", batch =

true)

14

public void listenBatch(List messages) {

15

h(message ->

n("Received batch message: " +

message));

16

}

17

}

这样,当 Kafka 代理中有新的消息发布到“my-topic”主题时,

listen

方法就会

被调用来处理消息。如果配置了批量消费,

listenBatch

方法则会一次性处理

多个消息。

总结起来,

KafkaListenerContainerFactory

是 Kafka 消费者配置的核心部

分,它允许我们对消费者的行为进行细粒度的控制,包括但不限于消费者的并

发数、是否批量消费、消费超时等配置。同时,结合

@KafkaListener

注解,

Spring Kafka 提供了一种非常方便的方式来集成 Apache Kafka 的消息消费逻

辑到 Spring 应用程序中。