Spring Cloud 整合 Kafka 时间: 2019-02-21 15:55 分类: Spring,消息中间件,JAVA ###前言 最近在开发一个分布式`DHT网络爬虫`时,需要用到消息中间件,最开始想到的就是非常简单的`ActiveMQ`,但是后来发现它貌似不支持一个主题多个消费者消费,只能点对点的消费,或者发布订阅者模式进行消费,而我现在的需求是:一个主题多个消费者,并且要保证每条消息只被其中某一个消费者消费。 后来又想到`RabbitMQ`,但是在安装的时候发现这东西有点恶心啊,由于它使用`Erlang`开发的,所以安装需要安装`Erlang`环境,进入官网找到安装教程,真的是看的头晕,对于`Linux`操作系统版本与`Erlang`有着严格要求,又继续搜了下关于它的评价,发现并不是很好,所以懒得折腾直接放弃了。 最终选择了`Kafka`。 ###Kafka 安装 [下载][https://kafka.apache.org/downloads]并解压: ``` > tar xvf kafka_2.11-2.1.0.tgz > cd kafka_2.11-2.1.0.tgz ``` Kafka 自带了 Zookeeper,如果自己安装了,就不必启动 Kafka 自带的 Zookeeper 了,下面是启动 Kafka 自带 Zookeeper 以及启动 Kafka 的方法。 在 Windows 上运行: ``` > bin\windows\zookeeper-server-start.bat config\zookeeper.properties > bin\windows\kafka-server-start.bat config\server.properties ``` 在 Linux 或者 Mac 上: ``` > bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties ``` 有关`Kafka`的一些配置在这里就不赘述了,我这里除了修改了分区数量其他全部使用默认。 ###编写 Demo 程序 由于爬虫项目使用的是`Spring cloud`开发,所以直接使用现成的`starter`进行整合。 建立`Maven`父项目(spring-kafka-demo),`pom.xml`如下: ```xml 4.0.0 me._0o0 spring-cloud-kafka-demo pom 1.0-SNAPSHOT kafka-producer kafka-consumer common org.springframework.boot spring-boot-starter-parent 2.1.2.RELEASE UTF-8 UTF-8 1.8 Greenwich.RELEASE true org.projectlombok lombok true org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka-test test org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-starter-stream-kafka org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import ``` 接着新建`kafka-producer`模块,`pom.xml`如下: ```xml spring-cloud-kafka-demo me._0o0 1.0-SNAPSHOT 4.0.0 jar me._0o0 common 1.0-SNAPSHOT compile kafka-producer org.springframework.boot spring-boot-maven-plugin ``` ####定义 Kafka 流对象: ```java public interface OutputMessageService { @Output("message-out") MessageChannel getOutput(); } ``` 需要注意的是: `@Output("message-out")`中的`message-out`并不是指`Kafka`中的主题名称,它就只是一个通道名称而已,在看其他人文章的时候,可能有人会说把它和消费者的`@Input`中的保持一致,但这并不是必须的,后面可以看到我这里就是两个不同的名字。 ####绑定通道 在主程序的类上面添加`@EnableBinding(OutputMessageService.class)`注解即可。 ####application.yml 配置文件 ``` server: port: 8081 spring: cloud: stream: kafka: binder: brokers: 192.168.0.2:9092 bindings: message-out: destination: messages contentType: application/json ``` 配置格式:`spring.cloud.stream.bindings..xxx` `xxx`属性见`org.springframework.cloud.stream.config.BindingProperties`类: ```java public static final MimeType DEFAULT_CONTENT_TYPE; private static final String COMMA = ","; private String destination; //这里才是主题名称 private String group; //group id private String contentType; //传输类型 private String binder; private ConsumerProperties consumer; private ProducerProperties producer; ``` ####创建消息对象 ```java @Getter @Setter @ToString @Builder public class Message { private int id; private String content; } ``` ####添加 REST API 接口 ``` @EnableBinding(OutputMessageService.class) @RestController @SpringBootApplication public class ProducerApplication { private OutputMessageService outputMessageService; public ProducerApplication(OutputMessageService outputMessageService) { this.outputMessageService = outputMessageService; } public static void main(String[] args) { new SpringApplicationBuilder(ProducerApplication.class).run(args); } @GetMapping("message") @ResponseStatus(HttpStatus.ACCEPTED) public void message(String content) { Message msg = Message.builder() .id(new Random().nextInt()) .content(content) .build(); outputMessageService.getOutput() .send(MessageBuilder.withPayload(msg) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build()); } } ``` 添加`kafka-comsumer`消费者模块,`pom.xml`如下: ```xml spring-cloud-kafka-demo me._0o0 1.0-SNAPSHOT 4.0.0 jar me._0o0 common 1.0-SNAPSHOT compile kafka-consumer org.springframework.boot spring-boot-maven-plugin ``` ####定义 Kafka 流对象: ```java public interface InputMessageService { @Input("message-in") MessageChannel input(); } ``` 可以看到这里`@Input("message-in")`与上面的`@Output("message-out")`通道名称是不必一样的。 接着同样也需要在主程序类上添加绑定通道注解: `@EnableBinding(InputMessageService.class)` ####添加主题监听器 ```java @Slf4j @EnableBinding(InputMessageService.class) @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { new SpringApplicationBuilder(ConsumerApplication.class).run(args); } @StreamListener("message-in") public void handleMessage(Message msg) { log.info("Receive message: [{}]", msg); } } ``` ####application.yml 配置文件 ``` server: port: 8080 spring: cloud: stream: kafka: binder: brokers: 192.168.0.2:9092 bindings: message-in: group: group-0 destination: messages contentType: application/json ``` 这里的 Kafka 配置就只有新增了`group`与之前的生产者有所区别。 最后我们就可以测试了,程序跑起来我们可以看到`Spring`打印出来`Kafka`的一些配置参数。 ####验证是否符合自己最初的需求 我最初的需求就是`一个主题对应多个消费者,一条消息只被其中某一个消费者消费`。 一开始我使用`Kafka`默认配置启动,当我跑起两个消费者程序时发现,始终只有一个消费者程序收到消息,原因: 默认配置中`num.partitions`参数是 1,也就是说默认创建主题的分区数是 1,而`Kafka`中一个分区只能对应一个消费者,所以导致了上面的情况发生。 接着修改`num.partitions`参数为 4,删除原先`messages`主题: ``` > bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic messages ``` 重启`Kafka`以及生产者和消费者程序,查看`messages`分区情况: ``` > bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic messages Topic:messages PartitionCount:4 ReplicationFactor:1 Configs: Topic: messages Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: messages Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: messages Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: messages Partition: 3 Leader: 0 Replicas: 0 Isr: 0 ``` 再次调用接口发送消息到`Kafka`,可以发现每个消费者消费两个分区,并且每个消息只会被其中某一个消费者消费,完全符合我的需求。 还有个需要注意的就是两个消费者程序配置文件中的`group`参数必须一致,也就是上面的`group-0`,最开始我没有指定该参数,结果就是`Spring Cloud Stream`会自动随机生成一个,导致两个消费者不在同一个消费组,消息模式将变成`发布-订阅`模式:即同一条消息,两个消费者都会收到。 完整源码下载:[spring-cloud-kafka-demo.zip][1] [1]: https://0o0.me/usr/uploads/2019/02/2064627924.zip 标签: kafka