升级 Spring Cloud Stream Kafka 至 3.x 支持批量消费的几个问题 时间: 2021-04-26 17:45 分类: JAVA Web,Spring,JAVA ####前言 前几年在使用`Spring Cloud Stream Kafka`的时候发现并不支持批量消费,当时在`Github`上看到批量消费已经放在计划中了。 最近看了下在`3.x`版本中已经支持了,于是,就得升级各种依赖了。 ####问题 之前使用的是`Sping Boot 2.1.9.RELEASE`版本 + `Spring Cloud Greenwich.RELEASE`版本。 想要将`Spring Cloud Stream Kafka`升级至`3.x`版本,就得升级`Spring Cloud`的版本。 官网查看一下,最终决定升级至`2020.0.2`版本,升级`Spring Cloud`版本时,注意`Spring Boot`的版本,因为两者可能会有各种兼容问题,最简单直接的办法就是直接打开`Spring Cloud`的官方文档,最最开头会有对应的`Spring Boot`的版本。 我这里`Spring Boot`的版本选择`2.4.3`。 一切准备就绪后,就是改写原来的代码了。 之前的做法是使用`@StreamListener`注解实现消费,查看`3.x`的版本会发现,这个注解已经被标记为`@Deprecated`了。 最新的做法就是函数式编程,具体参见文档吧。 首先配置文件`consumer`属性下`batch-mode: true`开启批量模式。 批量消费直接定义一个返回`Consumer`类型的`Bean`即可,如下: ``` @Bean public Consumer>> handle() { return message -> { Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); List torrents = message.getPayload(); //save to mongodb and index to es torrentService.upsertAndIndex(torrents); //no error, execute acknowledge if (acknowledgment != null) { acknowledgment.acknowledge(); } }; } ``` 他对应的`binder id`为函数名`handle` + `-` + `0`,最后面的`0`是自增的(多个输入,使用`Function`函数时,具体参见文档吧)。 但是,如果我有一个`Topic`,想要两个不同的`Group`消费,注意:这两个`Group`是互不相干的。官方只给出了使用`Function`函数式接收多个输入的做法。 如果你想这还不简单,直接再加个`Consumer`的函数呗,结果你会发现,消费函数失效了,并且控制台并没有打印错误日志。 这个问题困扰了我好久,一直以为是使用不当导致的,后来注意到控制台启动时的`INFO`日志,说不能定义多个函数式`Bean`,要使用的话必须在配置文件中配置。 具体做法就是一个配置`spring.cloud.function.definition=函数名1;函数名2...`。 升级了`Spring Cloud`和`Spring Boot`,项目中的`Spring Data Elasticsearch`、`Spring Data Mongodb`均出现了问题,后面再继续讲怎么填坑吧。 标签: 无