Kafka

so-fast提供对分布式消息队列kafka的支持。

如果使用该功能,请现在pom.xml中添加依赖:

<dependency>
    <groupId>com.sofast.cloud</groupId>
    <artifactId>so-fast-kafka-starter</artifactId>
</dependency>

并在配置文件中进行配置:


spring:
  kafka:
      bootstrap-servers: 127.0.0.1:9092

这里只是一个配置示例,具体针对kafka server,消费者,生产者的消息配置可以参考文末sf-kafka.properties文件。

使用示例

简易示例
下面示例创建了一个生产者,default_topic。消费者用@KafkaListener注解进行加监听,topics表示监听的topic,支持同时监听多个,用英文逗号分隔

/**
 * 生产者
 */
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @PostMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("default_topic", normalMessage);
    }
}

/**
 * 消费者
 */
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"default_topic"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}
生产者-callback使用示例
下面示例创建了一个带callback的生产者,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理

/**
 * 第一种写法
 */
@PostMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("default_topic", callbackMessage).addCallback(success -> {
        // 消息发送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息发送到的分区
        int partition = success.getRecordMetadata().partition();
        // 消息在分区内的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("发送消息失败:" + failure.getMessage());
    });
}

/**
 * 第二种写法
 */
@PostMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("default_topic", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送消息失败:"+ex.getMessage());
        }

        @Override
        public void onSuccess(SendResult<String, Object> result) {
            System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
        }
    });
}
生产者-事务使用示例
/**
 * 如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务
 */
@PostMapping("/kafka/transaction")
public void sendMessage4(){
    // 声明事务:后面报错消息不会发出去
    kafkaTemplate.executeInTransaction(operations -> {
        operations.send("default_topic","test executeInTransaction");
        throw new RuntimeException("fail");
    });

    // 不声明事务:后面报错但前面消息已经发送成功了
   kafkaTemplate.send("default_topic","test executeInTransaction");
   throw new RuntimeException("fail");
}
消费者-指定参数消费使用示例
```
/**
 *  ① id:消费者ID;
 *  ② groupId:消费组ID;
 *  ③ topics:监听的topic,可监听多个;
 *  ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
 *
 *  监听default_topic的0号分区,同时监听other_topic的0号分区和other_topic的1号分区里面offset从8开始的消息。
 */
@KafkaListener(id = "consumer1",groupId = "sofast-group",topicPartitions = {
        @TopicPartition(topic = "default_topic", partitions = { "0" }),
        @TopicPartition(topic = "other_topic", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
```
消费者-批量消费使用示例
application.yml加入配置:

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
/**
 * 接收消息时用List来接收
 */
@KafkaListener(id = "consumer2",groupId = "sofast-group", topics = "default_topic")
public void onMessage2(List<ConsumerRecord<?, ?>> records) {
    System.out.println(">>>批量消费一次,records.size()="+records.size());
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.value());
    }
}
消费者-异常处理器
/**
 * 通过异常处理器,我们可以处理consumer在消费时发生的异常。
 * 新建一个异常处理器,用@Bean注入
 */
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
    return (message, exception, consumer) -> {
        System.out.println("消费异常:"+message.getPayload());
        return null;
    };
}

/**
 * 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
 */
@KafkaListener(topics = {"default_topic"},errorHandler = "consumerAwareErrorHandler")
public void onMessage3(ConsumerRecord<?, ?> record) throws Exception {
    throw new Exception("简单消费-模拟异常");
}

/**
 * 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
 */
@KafkaListener(topics = "default_topic",errorHandler="consumerAwareErrorHandler")
public void onMessage4(List<ConsumerRecord<?, ?>> records) throws Exception {
    System.out.println("批量消费一次...");
    throw new Exception("批量消费-模拟异常");
}
消费者-消息过滤器
/**
 * 消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
 * 配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
 */
@Component
public class KafkaConsumer {
    @Autowired
    ConsumerFactory consumerFactory;

    /**
     * 消息过滤器
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略(过滤奇数和偶数)
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            //返回true消息则被过滤
            return true;
        });
        return factory;
    }

    /**
     * 消息过滤监听
     */
    @KafkaListener(topics = {"default_topic"},containerFactory = "filterContainerFactory")
    public void onMessage5(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}
消费者-消息转发
/**
 * 在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
 */
@KafkaListener(topics = {"default_topic"})
@SendTo("other_topic")
public String onMessage6(ConsumerRecord<?, ?> record) {
    return record.value()+"-forward message";
}

附件:sf-kafka.properties

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=ip:port,ip:port
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.security.sasl.mechanism=GSSAPI
spring.kafka.security.sasl.kerberos.service.name=kafka

###########【初始化Topic配置】###########
# topic名称
spring.kafka.topic=test
# topic分区数量
spring.kafka.topic.numPartitions=2
# topic副本数量
spring.kafka.topic.replicationFactor=1

###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=xxxxxx.CustomizePartitioner

###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
# 消费者线程并发数
spring.kafka.consumer.currency=2
Copyright © 2020. 恩梯梯数据(中国)信息技术有限公司. all right reserved,powered by Gitbook该文件修订时间: 2021-03-15 14:41:07

results matching ""

    No results matching ""