Spring Cloud Stream 3.x 使用:Supplier / Consumer / Function 时间: 2021-04-28 16:12 分类: JAVA Web,Spring,JAVA ####前言 之前用的`Spring Cloud Stream 2.x`,一切都没什么问题,最近升级了`3.x`,发现用得比较混乱,不管是国内外,发现用的都比较少,遇到问题搜索几乎都没有答案,可能是这东西还是不够成熟,所以整理一下思路。 ####问题 由于`3.x`直接废弃了很多的类,比如:`@StreamListener`、`@Input`、`@Output`等。 虽然被标记为`Deprecated`依旧可用使用,但是我们还是选择使用最新用法为好。 没有了`@StreamListener`,那么我们用什么替代呢,答案就是函数式编程:`Consumer / Function`。 #####Consumer 顾名思义,它就是用来消费的,之前消费是通过`@StreamListener`注解来实现的,但是`2.x`版本是不支持批量消费的,如果你有批量消费的需求,还是尽快升级到`3.x`版本,并且使用最新特性,因为`3.x`版本`@StreamListener`虽然依旧可用,但还是不支持批量消费的。 用法介绍: ``` @Bean public Consumer> handle() { return list -> { //do something }; } ``` 通过定义如上`Bean`,就是一个消费者,对应的消费通道是`handle-in-0`,规则就是函数名称 + `in` + `递增的数字`。 对应的`binding`配置为:`spring.cloud.stream.bindings.handle-input-0` #####Function 从函数类型来看,它是个接收一个输入、一个输出的函数,因此,它既是生产者也是消费者,用法如下: ``` @Bean public Function uppercase() { return value -> value.toUpperCase(); } ``` 它是一个很简单的程序,就是将输入转为大写然后再输出,对应两个通道:`uppercase-input-0`、`uppercase-output-0`,命名规则同`Consumer`。 #####Supplier 最后再说下`Supplier`,前面两个可以说是用来消费的,虽然`Function`也有生产端,但是很多时候我们是不需要先接收消息,而是直接将消息发送到对应主题上。 用法介绍: ``` @Bean Supplier hhh() { return () -> "ssssss"; } ``` 但是我感觉对于我目前而言,不知道该用在什么地方,因为它是类似于定时器,默认是每秒触发一次,这里简单的返回一个字符串,可以替换成其他动态获取数据的接口,然后将数据发送到`hhh-output-0`通道。 然而我们更多的时候是需要主动的发送消息,类似于`KafkaTemplate.send()`方法直接调用。 但是在这里不得不说下,如果你引入了`Spring Cloud Stream Kafka Binder`那么你是可以直接使用`KafkaTemplate`的。 一旦你使用`KafkaTemplate`,那么配置文件中就要多出一项配置`spring.kafka.bootstrap-server`,注意它不是在`spring.cloud`下,因此可以说它是和`Spring Cloud Stream`完全隔离的,所以你不仅要配置`spring.cloud.stream.kafka.binder.brokers`还要配置`spring.kafka.bootstrap-server`,是不是感觉怪怪的。 之前我从`2.x`升级到`3.x`发送消息就是用的`KafkaTemplate`,结果发现部署的时候,配置了`spring.cloud.stream.kafka.binder.brokers`还一直报`localhost/9092`连不上,明明配置的另一台机器的`ip`,可它还是去连`localhost`,后来才发现是使用`KafkaTemplate`的问题。 上面说了`Supplier`,是不是感觉有点鸡肋,要想类似`KafkaTemplate`发送消息该怎么做呢? #####StreamBridge 用法同`KafkaTemplate`类似,在需要的地方之间`@Autowired`引入即可,发送消息`send`接口: ``` public boolean send(String bindingName, Object data) { return this.send(bindingName, data, MimeTypeUtils.APPLICATION_JSON); } ``` 使用: > streamBridge.send("message-out", "hello world"); 如果你直接这样使用,会报如下错误: ``` Caused by: java.lang.NullPointerException: Cannot invoke "java.util.function.Function.apply(Object)" because "functionToInvoke" is null ``` 我不知道这是目前版本的`BUG`还是哪里没有配置的问题,官方说它能够发送任何对象消息,但我测试任何类型都发送不出去。 虽然它默认的`contentType`为`application/json`,并且也有默认的`MappingJackson2MessageConverter`消息转换器。 定位错误日志,发现它会去找一个`Function functionToInvoke`,为什么去找一个`Function`这点我没弄明白,因为前面我们知道了`Function`是个接收一个输入一个输出的函数。 如果我们类似地去定义一个`Function`的`Bean`,发现不再报错了,但是一旦定义一个`Function`的`Bean`,就多了一个输入与一个输出主题,会自动地创建在`Kafka`中,我们只是简单地想调用`StreamBridge.send`发送消息到指定通道而已,如果说`Function`作为一个类型转换器,那么大可不必要,`MappingJackson2MessageConverter`是干嘛的? 好了,暂且不说这个类,就当它是个`BUG`或者使用不当吧,搜索也搜索不出个什么答案。那么退而求其次吧,还有个办法能让它正常工作,它有个配置叫`spring.cloud.stream.kafka.bindings.channel-name.producer.useNativeEncoding`,默认值为`false`,我们将它改为`true`。 然后`streamBridge`如下发送消息: ``` streamBridge.send("message-out", "aaaa".getBytes()); ``` 其中`aaaa`一般为对象序列化后的`json`字符串。 ####总结 主要还是将`Spring Cloud Stream`与`Spring kafka`区分开来使用,统一使用其中一种,如实混合使用的话总感觉怪怪的,但是`StreamBridge`到底该如何直接发送对象消息,官方说要做额外配置,但具体又没有给出,并且官方`Demo`也没看到做什么配置,真是脑瓜子疼啊。 最后说下一个项目中定义多个`Supplier / Consumer / Function`的问题,如果不做配置,那么定义多个`Supplier / Consumer / Function`时,你会发现一个都不生效了,`Spring`会把他们都当成普通的`Bean`处理。 因此,在整个项目只有一个`Supplier / Consumer / Function`的时候,`Spring`会把它默认地当做消息函数来处理,如果有多个,为了能更好的区分消息与普通的`Bean`,需要在配置文件中指定:`spring.cloud.function.definition=aaa;bbb;ccc` 其中`aaa`、`bbb`、`ccc`为消息处理函数。 标签: 无
现在好像可以不定义supplier而直接通过streamBridge.send(BINDING_NAME, payload)中的BINDING_NAME在配置中找到对应的topic。
但有一个BINGING没有定义的类好像又感觉怪怪的。。