Spring Cloud 基于Apache Kafka 的 Stream 实现

如果你的应用使用了Apache Kafka,你需要把它和Spring Cloud进行整合。需要在应用中添加如下依赖。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

然后就是Spring Cloud Stream的标准配置了。需要在@Configuration类上使用@EnableBinding声明需要应用的Binding。

@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
  
}

上面代码定义需要使用的Binding是Source和Sink接口中声明的input和output两个Binding。然后可以在application.properties文件中声明这两个Binding对应的destination,它们对应于kafka的Topic。如果指定的Topic还未创建,默认会自动进行创建。

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group

如果你的Kafka服务器不是本机或者监听端口不是默认的9092,则还需要通过spring.cloud.stream.kafka.binder.brokers指定Kafka的服务地址。

spring.cloud.stream.kafka.binder.brokers=localhost:9092

之后就是照常的使用Spring Cloud Stream的相关API进行操作了。如下是发送消息的示例。

@Component
@Slf4j
public class SourceProducer {
    @Autowired
    private Source source;
    public void sendMessages(String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        log.info("发送了一条消息-{}", msg);
        this.source.output().send(message);
    }
}

如下是监听消息的示例。

@Component
@Slf4j
public class SinkConsumer {
    @StreamListener(Sink.INPUT)
    public void inputConsumer(Message<String> message) {
        String payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();
        log.info("从Binding-{}收到信息-{}, headers:{}", Sink.INPUT, payload, headers);
    }
  
}

由于笔者的上一篇文章——Spring Cloud Stream基于RocketMQ的实现已经介绍了Spring Cloud Stream的一些规范,这里就不再赘述了。

从Kafka服务,也就是从Spring Cloud Stream的Binder的角度来讲可以定义的参数可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties。比较核心的参数有:

  • spring.cloud.stream.kafka.binder.brokers:用来指定Kafka服务的地址,可以是host,也可以是host:port格式,如:spring.cloud.stream.kafka.binder.brokers=localhost,10.10.10.1:9092。默认是localhost。
  • spring.cloud.stream.kafka.binder.defaultBrokerPort:Kafka服务的完整地址应该是host+port,当brokers只定义了host时,将默认取该属性定义的port作为Kafka服务的port,默认是9092。
  • spring.cloud.stream.kafka.binder.autoCreateTopics:指定Topic不存在时是否需要自动创建,默认是true。

Spring Cloud Stream有多种不同的实现,比如RocketMQ/Kafka/RabbitMQ。不同的实现者在Producer和Consumer上也可能是有些差别的,或者是有些特性的。整合Spring Cloud Stream后这些特性的属性也是可以进行配置的。可以通过spring.cloud.stream.kafka.bindings.xxx.consumer.yyy指定名为xxx的这个Consumer角色的yyy属性。可以通过spring.cloud.stream.kafka.bindings.xxx.producer.yyy指定名为xxx的Producer的yyy属性。Kafka实现的Producer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties,Consumer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties。Consumer特性的参数主要有:

  • autoRebalanceEnabled:默认为true。当设置为true时,会对分区进行负载均衡,有Consumer加入或退出时会对Topic的分区重新分配。设置为false时,每个Consumer分配的Topic分区是固定的,不会变。
  • autoCommitOffset:默认为true。当设置为true时表示消息处理完后会自动提交offset;如果设置为false则会在消息的header中添加一个key为kafka_acknowledgment,value为org.springframework.kafka.support.Acknowledgment类型的对象的header,消费者可以在处理消息后从header中获取该对象进行手动响应消息的处理情况。
  • startOffset:指定新的消费者组加入的时候起始的消费位置,可选值有earliest和latest。默认是null,相当于earliest。