Kafka
so-fast提供对分布式消息队列kafka的支持。
如果使用该功能,请现在pom.xml中添加依赖:
<dependency>
<groupId>com.sofast.cloud</groupId>
<artifactId>so-fast-kafka-starter</artifactId>
</dependency>
并在配置文件中进行配置:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
这里只是一个配置示例,具体针对kafka server,消费者,生产者的消息配置可以参考文末sf-kafka.properties文件。
使用示例
简易示例
下面示例创建了一个生产者,default_topic。消费者用@KafkaListener注解进行加监听,topics表示监听的topic,支持同时监听多个,用英文逗号分隔
/**
* 生产者
*/
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@PostMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("default_topic", normalMessage);
}
}
/**
* 消费者
*/
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"default_topic"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
生产者-callback使用示例
下面示例创建了一个带callback的生产者,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理
/**
* 第一种写法
*/
@PostMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("default_topic", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
/**
* 第二种写法
*/
@PostMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("default_topic", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
生产者-事务使用示例
/**
* 如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务
*/
@PostMapping("/kafka/transaction")
public void sendMessage4(){
// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations -> {
operations.send("default_topic","test executeInTransaction");
throw new RuntimeException("fail");
});
// 不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("default_topic","test executeInTransaction");
throw new RuntimeException("fail");
}
消费者-指定参数消费使用示例
```
/**
* ① id:消费者ID;
* ② groupId:消费组ID;
* ③ topics:监听的topic,可监听多个;
* ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
*
* 监听default_topic的0号分区,同时监听other_topic的0号分区和other_topic的1号分区里面offset从8开始的消息。
*/
@KafkaListener(id = "consumer1",groupId = "sofast-group",topicPartitions = {
@TopicPartition(topic = "default_topic", partitions = { "0" }),
@TopicPartition(topic = "other_topic", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
```
消费者-批量消费使用示例
application.yml加入配置:
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
/**
* 接收消息时用List来接收
*/
@KafkaListener(id = "consumer2",groupId = "sofast-group", topics = "default_topic")
public void onMessage2(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
消费者-异常处理器
/**
* 通过异常处理器,我们可以处理consumer在消费时发生的异常。
* 新建一个异常处理器,用@Bean注入
*/
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return null;
};
}
/**
* 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
*/
@KafkaListener(topics = {"default_topic"},errorHandler = "consumerAwareErrorHandler")
public void onMessage3(ConsumerRecord<?, ?> record) throws Exception {
throw new Exception("简单消费-模拟异常");
}
/**
* 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
*/
@KafkaListener(topics = "default_topic",errorHandler="consumerAwareErrorHandler")
public void onMessage4(List<ConsumerRecord<?, ?>> records) throws Exception {
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}
消费者-消息过滤器
/**
* 消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
* 配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
*/
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;
/**
* 消息过滤器
*/
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 消息过滤策略(过滤奇数和偶数)
factory.setRecordFilterStrategy(consumerRecord -> {
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}
/**
* 消息过滤监听
*/
@KafkaListener(topics = {"default_topic"},containerFactory = "filterContainerFactory")
public void onMessage5(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}
消费者-消息转发
/**
* 在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
*/
@KafkaListener(topics = {"default_topic"})
@SendTo("other_topic")
public String onMessage6(ConsumerRecord<?, ?> record) {
return record.value()+"-forward message";
}
附件:sf-kafka.properties
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=ip:port,ip:port
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.security.sasl.mechanism=GSSAPI
spring.kafka.security.sasl.kerberos.service.name=kafka
###########【初始化Topic配置】###########
# topic名称
spring.kafka.topic=test
# topic分区数量
spring.kafka.topic.numPartitions=2
# topic副本数量
spring.kafka.topic.replicationFactor=1
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=xxxxxx.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
# 消费者线程并发数
spring.kafka.consumer.currency=2