Rabbitmq消息ack机制
AI-摘要
Tianli GPT
AI初始化中...
介绍自己
生成本文简介
推荐相关文章
前往主页
前往tianli博客
一、初始化项目
1、引入依赖
<!--RabbitMq自动配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--消息转换器,用于配置消息序列化-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
2、配置文件
server:
port: 8080
spring:
application:
name: rabbitmq-demo
rabbitmq:
host: 127.0.0.1
port: 5673
username: root
password: 123456
3、配置消息转化器
- 在SpringBoot启动类中注入
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
4、RabbitMq配置类
/**
* @Description: mq配置类,配置交换机 队列 和 绑定
* @Date: 2024/3/26
* @Author: tajiaoyezi
*/
@Configuration
public class RabbitConfig {
/**
* 创建 Queue
*/
@Bean
public Queue demoQueue() {
return new Queue(MqMessage.QUEUE, // Queue 名字
true, // durable: 是否持久化
false, // exclusive: 是否排它
false); // autoDelete: 是否自动删除
}
/**
* 创建 Direct Exchange
*/
@Bean
public DirectExchange demoExchange() {
return new DirectExchange(MqMessage.EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}
/**
* 创建 Binding
* Exchange:MqMessage.EXCHANGE
* Routing key:MqMessage.ROUTING_KEY
* Queue:MqMessage.QUEUE
*/
@Bean
public Binding demoBinding() {
return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(MqMessage.ROUTING_KEY);
}
}
5、消息实体类
/**
* @Description: 消息
* @Date: 2024/3/26
* @Author: tajiaoyezi
*/
@Data
@Builder
public class MqMessage implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 交换机
*/
public static final String EXCHANGE = "DEMO_EXCHANGE";
/**
* 消息队列
*/
public static final String QUEUE = "DEMO_QUEUE";
/**
* routingKey
*/
public static final String ROUTING_KEY = "DEMO_ROUTING_KEY";
/**
* 内容
*/
private String content;
/**
* 类型
*/
private String type;
}
6、生产者
/**
* @Description: 生产者
* @Date: 2024/3/26
* @Author: tajiaoyezi
*/
@Component
@RequiredArgsConstructor
public class DemoProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
public void sendMessage(String content,String fileType) {
MqMessage mqMessage = MqMessage.builder()
.content(content)
.type(fileType)
.build();
// 发送消息
rabbitTemplate.convertAndSend(MqMessage.EXCHANGE, MqMessage.ROUTING_KEY, mqMessage);
}
}
7、消费者
/**
* @Description: 消费者
* @Date: 2024/3/26
* @Author: tajiaoyezi
*/
@Slf4j
@Component
public class DemoConsumer {
/**
* 消费者
*
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(MqMessage.QUEUE),
exchange = @Exchange(name = MqMessage.EXCHANGE),
key = MqMessage.ROUTING_KEY
), concurrency = "1")
@RabbitHandler
public void onMessage(MqMessage message, Channel channel, Message messageStatus) {
// 获取消息状态信息
long deliveryTag = messageStatus.getMessageProperties().getDeliveryTag();
try {
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().threadId(), message);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
二、自动ack
- 测试自动ack,这里生产者不进行修改,修改配置文件和消费者
1、配置文件
server:
port: 8080
spring:
application:
name: rabbitmq-demo
rabbitmq:
host: 127.0.0.1
port: 5673
username: root
password: 123456
listener:
simple:
default-requeue-rejected: false # 消息拒绝后是否重新入队
retry:
enabled: true # 是否启动消息重试机制
initial-interval: 1000 # 重试间隔 1000毫秒
max-attempts: 3 # 重试次数 3次
max-interval: 10000 # 重试间隔 10000毫秒
multiplier: 2.0 # 重试间隔倍数
stateless: true # 是否无状态,true:无状态,false:有状态
2、消费者
- 手动抛出异常,测试自动ack,重试次数为3
/**
* @Description: 消费者
* @Date: 2024/3/26
* @Author: tajiaoyezi
*/
@Slf4j
@Component
public class DemoConsumer {
/**
* 消费者
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(MqMessage.QUEUE),
exchange = @Exchange(name = MqMessage.EXCHANGE),
key = MqMessage.ROUTING_KEY
), concurrency = "1")
@RabbitHandler
public void onMessage(MqMessage message, Channel channel, Message messageStatus) {
// 获取消息状态信息
long deliveryTag = messageStatus.getMessageProperties().getDeliveryTag();
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().threadId(), message);
// 手动制造异常,模拟消息处理失败
int i = 1 / 0;
}
}
- 这里可以看到消息重试了三次后抛出异常,符合配置文件中的重试次数为3

三、手动ack
1、配置文件
- 新增配置
acknowledge-mode: manual开启手动ack
server:
port: 8080
spring:
application:
name: rabbitmq-demo
rabbitmq:
host: 127.0.0.1
port: 5673
username: root
password: 123456
listener:
simple:
default-requeue-rejected: false # 消息拒绝后是否重新入队
acknowledge-mode: manual # 消息确认模式,manual:手动确认,不设置默认自动
2、消费者
/**
* @Description: 消费者
* @Date: 2024/3/26
* @Author: tajiaoyezi
*/
@Slf4j
@Component
public class DemoConsumer {
/**
* 消费者
*
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(MqMessage.QUEUE),
exchange = @Exchange(name = MqMessage.EXCHANGE),
key = MqMessage.ROUTING_KEY
), concurrency = "1")
@RabbitHandler
public void onMessage(MqMessage message, Channel channel, Message messageStatus) {
// 获取消息状态信息
long deliveryTag = messageStatus.getMessageProperties().getDeliveryTag();
try {
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().threadId(), message);
int a=1/0;
} catch (Exception e) {
// 判断消息状态进行手动ack
if (messageStatus.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收");
try {
channel.basicReject(deliveryTag, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
// 再拒绝消息后,可以进行一些其他操作,让消息进入死信队列或者其他的消息补偿机制
// ......
} else {
log.info("手动消息重发");
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
}
- 这里手动进行了一次消息重发,再次失败后禁止了消息重发

本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 leaflei
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果