阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

重试机制为Kafka带来春天

29次阅读
没有评论

共计 2336 个字符,预计需要花费 6 分钟才能阅读完成。

导读 最近业务上用到了 Spring Kafka,所以系统性的探索了下 Spring Kafka 的各种用法,发现了很多实用的特性,下面介绍下 Spring Kafka 的消息重试机制。

哈喽,大家好,我是指北君。

最近业务上用到了 Spring Kafka,所以系统性的探索了下 Spring Kafka 的各种用法,发现了很多实用的特性,下面介绍下 Spring Kafka 的消息重试机制。

0. 前言

原生 Kafka 是不支持消息重试的。但是 Spring Kafka 2.7+ 封装了 Retry Topic 这个功能。

1. @RetryableTopic

使用注解的方式启用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可:

@Slf4j
@Component
public class DemoConsumer {
    @RetryableTopic
    @KafkaListener(topics = "topic1", groupId = "group1")
    public void onMsg(ConsumerRecord record){log.info("topic: {}", record.topic());
        throw new RuntimeException("kafka exception");
    }

}

这样就开启了 Spring Kafka 的消息重试机制:默认重试 3 次,间隔为 1 秒。

我们在方法里模拟了抛出异常,运行后可以发现打印了 3 条日志,间隔时间大约为 1 秒,重试的 topic 为原 topic 加上后缀“-retry”

2022-11-12 12:14:10.230  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1
2022-11-12 12:14:11.315  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-0  
2022-11-12 12:14:12.310  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-1
2. DLT 死信队列

如果 3 次重试后依旧失败,会将消息发送到 DLT,

默认情况,消息被发送到死信队列后,会输出一条日志。

2022-11-12 12:14:13.324  INFO 1023 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: topic1-dlt@233

DLT 的 topic 为原 topic 加上后缀“-dlt”

我们可以使用 @DltHandler 注解来定义进入死信队列后的操作:

@DltHandler
public void dltHandler(ConsumerRecord record){log.info("topic:{}, key:{}, value:{}", record.topic(), record.key(), record.value());
}
3. 自定义 @RetryableTopic

可以自定义重试次数、延迟时间、topic 命名策略等等,支持使用 Spring EL 表达式读取配置。

@Slf4j
@Component
public class DemoConsumer {
    @RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = "5000", multiplier = "2"),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
    )
    @KafkaListener(topics = "topic2", groupId = "group1")
    public void onMsg2(ConsumerRecord record){log.info("topic: {}", record.topic());
        throw new RuntimeException("kafka exception");
    }

}

注解属性说明:

attempts:重试次数,默认为 3。

@Backoff delay:消费延迟时间,单位为毫秒。

@Backoff multiplier:延迟时间系数,此例中 attempts = 4,delay = 5000,multiplier = 2,则间隔时间依次为 5s、10s、20s、40s,最大延迟时间受 maxDelay 限制。

fixedDelayTopicStrategy:可选策略包括:SINGLE_TOPIC、MULTIPLE_TOPICS

4. 配置类

以上介绍的是注解的方式,只对注解下的方法有效。如果想让多个方法都用相同的消息重试配置,那么可以使用配置类方式:

@Bean
public RetryTopicConfiguration retryTopic(KafkaTemplate template){
    return RetryTopicConfigurationBuilder
            .newInstance()
            .maxAttempts(4)
            .fixedBackOff(5000)
            .includeTopic("topic1")
            .create(template);
}
小结

以上就是 Spring Kafka 消息重试机制的简单应用~ 希望能够帮助那些正在使用 Spring Kafka 或即将使用的人少走一些弯路、少踩一点坑。

阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

代金券:在阿里云专用满减优惠券

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-07-24发表,共计2336字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中