admin

Spring Cloud 整合 Kafka
前言最近在开发一个分布式DHT网络爬虫时,需要用到消息中间件,最开始想到的就是非常简单的ActiveMQ,但是后来...
扫描右侧二维码阅读全文
21
2019/02

Spring Cloud 整合 Kafka

前言

最近在开发一个分布式DHT网络爬虫时,需要用到消息中间件,最开始想到的就是非常简单的ActiveMQ,但是后来发现它貌似不支持一个主题多个消费者消费,只能点对点的消费,或者发布订阅者模式进行消费,而我现在的需求是:一个主题多个消费者,并且要保证每条消息只被其中某一个消费者消费。
后来又想到RabbitMQ,但是在安装的时候发现这东西有点恶心啊,由于它使用Erlang开发的,所以安装需要安装Erlang环境,进入官网找到安装教程,真的是看的头晕,对于Linux操作系统版本与Erlang有着严格要求,又继续搜了下关于它的评价,发现并不是很好,所以懒得折腾直接放弃了。
最终选择了Kafka

Kafka 安装

下载并解压:

> tar xvf kafka_2.11-2.1.0.tgz
> cd kafka_2.11-2.1.0.tgz

Kafka 自带了 Zookeeper,如果自己安装了,就不必启动 Kafka 自带的 Zookeeper 了,下面是启动 Kafka 自带 Zookeeper 以及启动 Kafka 的方法。
在 Windows 上运行:

> bin\windows\zookeeper-server-start.bat config\zookeeper.properties
> bin\windows\kafka-server-start.bat config\server.properties

在 Linux 或者 Mac 上:

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties

有关Kafka的一些配置在这里就不赘述了,我这里除了修改了分区数量其他全部使用默认。

编写 Demo 程序

由于爬虫项目使用的是Spring cloud开发,所以直接使用现成的starter进行整合。
建立Maven父项目(spring-kafka-demo),pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me._0o0</groupId>
    <artifactId>spring-cloud-kafka-demo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>kafka-producer</module>
        <module>kafka-consumer</module>
        <module>common</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
        <skipTests>true</skipTests>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

</project>

接着新建kafka-producer模块,pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-cloud-kafka-demo</artifactId>
        <groupId>me._0o0</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>me._0o0</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <artifactId>kafka-producer</artifactId>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

定义 Kafka 流对象:

public interface OutputMessageService {
    @Output("message-out")
    MessageChannel getOutput();
}

需要注意的是:
@Output("message-out")中的message-out并不是指Kafka中的主题名称,它就只是一个通道名称而已,在看其他人文章的时候,可能有人会说把它和消费者的@Input中的保持一致,但这并不是必须的,后面可以看到我这里就是两个不同的名字。

绑定通道

在主程序的类上面添加@EnableBinding(OutputMessageService.class)注解即可。

application.yml 配置文件

server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.0.2:9092
      bindings:
        message-out:
          destination: messages
          contentType: application/json

配置格式:spring.cloud.stream.bindings.<channelName>.xxx
xxx属性见org.springframework.cloud.stream.config.BindingProperties类:

public static final MimeType DEFAULT_CONTENT_TYPE;
private static final String COMMA = ",";
private String destination;                         //这里才是主题名称
private String group;                               //group id
private String contentType;                         //传输类型
private String binder;
private ConsumerProperties consumer;
private ProducerProperties producer;

创建消息对象

@Getter @Setter @ToString @Builder
public class Message {
    private int id;
    private String content;
}

添加 REST API 接口

@EnableBinding(OutputMessageService.class)
@RestController
@SpringBootApplication
public class ProducerApplication {

    private OutputMessageService outputMessageService;

    public ProducerApplication(OutputMessageService outputMessageService) {
        this.outputMessageService = outputMessageService;
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(ProducerApplication.class).run(args);
    }

    @GetMapping("message")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void message(String content) {
        Message msg = Message.builder()
                .id(new Random().nextInt())
                .content(content)
                .build();
        outputMessageService.getOutput()
                .send(MessageBuilder.withPayload(msg)
                        .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                        .build());
    }
}

添加kafka-comsumer消费者模块,pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-cloud-kafka-demo</artifactId>
        <groupId>me._0o0</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>me._0o0</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <artifactId>kafka-consumer</artifactId>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

定义 Kafka 流对象:

public interface InputMessageService {
    @Input("message-in")
    MessageChannel input();
}

可以看到这里@Input("message-in")与上面的@Output("message-out")通道名称是不必一样的。
接着同样也需要在主程序类上添加绑定通道注解:
@EnableBinding(InputMessageService.class)

添加主题监听器

@Slf4j
@EnableBinding(InputMessageService.class)
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(ConsumerApplication.class).run(args);
    }

    @StreamListener("message-in")
    public void handleMessage(Message msg) {
        log.info("Receive message: [{}]", msg);
    }
}

application.yml 配置文件

server:
  port: 8080
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.0.2:9092
      bindings:
        message-in:
          group: group-0
          destination: messages
          contentType: application/json

这里的 Kafka 配置就只有新增了group与之前的生产者有所区别。
最后我们就可以测试了,程序跑起来我们可以看到Spring打印出来Kafka的一些配置参数。

验证是否符合自己最初的需求

我最初的需求就是一个主题对应多个消费者,一条消息只被其中某一个消费者消费
一开始我使用Kafka默认配置启动,当我跑起两个消费者程序时发现,始终只有一个消费者程序收到消息,原因:
默认配置中num.partitions参数是 1,也就是说默认创建主题的分区数是 1,而Kafka中一个分区只能对应一个消费者,所以导致了上面的情况发生。
接着修改num.partitions参数为 4,删除原先messages主题:

> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic messages

重启Kafka以及生产者和消费者程序,查看messages分区情况:

> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic messages
Topic:messages    PartitionCount:4    ReplicationFactor:1    Configs:
Topic: messages    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
Topic: messages    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
Topic: messages    Partition: 2    Leader: 0    Replicas: 0    Isr: 0
Topic: messages    Partition: 3    Leader: 0    Replicas: 0    Isr: 0

再次调用接口发送消息到Kafka,可以发现每个消费者消费两个分区,并且每个消息只会被其中某一个消费者消费,完全符合我的需求。
还有个需要注意的就是两个消费者程序配置文件中的group参数必须一致,也就是上面的group-0,最开始我没有指定该参数,结果就是Spring Cloud Stream会自动随机生成一个,导致两个消费者不在同一个消费组,消息模式将变成发布-订阅模式:即同一条消息,两个消费者都会收到。

完整源码下载:spring-cloud-kafka-demo.zip

Last modification:February 21st, 2019 at 04:06 pm
If you think my article is useful to you, please feel free to appreciate

Leave a Comment