RabbitMQ应用场景:订单自动过期取消

场景描述

场景描述:当用户下单后,状态为待支付,假如在规定的过期时间内尚未支付金额,那么就应该设置订单状态为取消。在不用MQ的情况下,我们可以设置一个定时器,每秒轮询数据库查找超出过期时间且未支付的订单,然后修改状态,但是这种方式会占用很多资源,所以在这里我们可以利用RabbitMQ的死信队列。

死信队列与普通队列一样,在以下情况下会变成死信队列:

  • 消息被拒绝(basic.reject / basic.nack)并且不再重新投递requeue=false
  • 消息过期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
  • 队列超出最大长度

模拟案例

下面是基于Spring Boot的简单模拟案例:

(1)pom.xml文件

1
2
3
4
5
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)application.properties文件

1
2
3
4
5
6
7
8
##rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 消息发送到交换机确认机制,是否确认回调
spring.rabbitmq.publisher-confirms=true

(3)queue以及Exchange的创建与绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

/**
* 订单死信队列交换机标识符 属性值不能改,写死
*/
private static final String ORDER_DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 订单死信队列交换机绑定键 标识符 属性值不能改,写死
*/
private static final String ORDER_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

//----------------------------订单死信定义------------------------------
// 订单过期流程: 消息(创建的订单号)---> 发送到订单死信队列,不消费(设置过期时间)---> (超过设定的过期时间)根据ORDER_DEAD_LETTER_QUEUE_KEY路由死信交换机 ---> 重新消费,根据ORDER_DEAD_LETTER_ROUTING_KEY转发到转发队列(取出消息订单号查找订单,假如仍然未支付就取消订单)---> end

/**
* orderDeadLetterExchange(direct类型交换机)
*/
@Bean("orderDeadLetterExchange")
public Exchange orderDeadLetterExchange() {
return ExchangeBuilder.directExchange("ORDER_DL_EXCHANGE").durable(true).build();
}

/**
* 声明一个订单死信队列.
* x-dead-letter-exchange 对应 死信交换机
* x-dead-letter-routing-key 对应 死信队列
*/
@Bean("orderDeadLetterQueue")
public Queue orderDeadLetterQueue() {
// 参数
Map<String, Object> args = new HashMap<>(2);
// 出现dead letter之后将dead letter重新发送到指定exchange
args.put(ORDER_DEAD_LETTER_QUEUE_KEY, "ORDER_DL_EXCHANGE");
// 出现dead letter之后将dead letter重新按照指定的routing-key发送
args.put(ORDER_DEAD_LETTER_ROUTING_KEY, "RED_KEY");
// name队列名字 durable是否持久化,true保证消息的不丢失, exclusive是否排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除, autoDelete如果该队列没有任何订阅的消费者的话,该队列是否会被自动删除, arguments参数map
return new Queue("ORDER_DL_QUEUE", true, false, false, args);
}

/**
* 定义订单死信队列转发队列.
*/
@Bean("orderRedirectQueue")
public Queue orderRedirectQueue() {
return new Queue("ORDER_REDIRECT_QUEUE", true, false, false);
}

/**
* 死信路由通过 DL_KEY 绑定键绑定到订单死信队列上.
*/
@Bean
public Binding orderDeadLetterBinding() {
return new Binding("ORDER_DL_QUEUE", Binding.DestinationType.QUEUE, "ORDER_DL_EXCHANGE", "DL_KEY", null);

}

/**
* 死信路由通过 RED_KEY 绑定键绑定到订单转发队列上.
*/
@Bean
public Binding orderRedirectBinding() {
return new Binding("ORDER_REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "ORDER_DL_EXCHANGE", "RED_KEY", null);
}
}

(4)消息的发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;

@RestController
public class OrderController {

@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping("/test")
public void sendMessage(@RequestParam String orderNo){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 生成一个消息的唯一id,可不选
// 声明消息处理器 设置消息的编码以及消息的过期时间 时间毫秒值 为字符串
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间 一分钟
int expiration = 1000 * 20;
messageProperties.setExpiration(String.valueOf(expiration));
return message;
};
// 向ORDER_DL_EXCHANGE 发送消息 形成死信 在OrderQueueReceiver类处理死信交换机转发给转发队列的信息
rabbitTemplate.convertAndSend("ORDER_DL_EXCHANGE", "DL_KEY", orderNo, messagePostProcessor, correlationData);
System.out.println(new Date() + "发送消息,订单号为" + orderNo);
}
}

(5)消息的消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

@Component
public class MesageReceiver {

/**
* 监听转发队列 走逻辑判断,尚未支付且超过过期时间的订单号设置为失效订单
* @param message 信息包装类
* @param channel 通道
*/
@RabbitListener(queues = {"ORDER_REDIRECT_QUEUE"})
public void redirect(Message message, Channel channel) throws IOException {
// 从队列中取出订单号
byte[] body = message.getBody();
String orderNo = new String(body,"UTF-8");
System.out.println(new Date() + "消费消息,订单号为" + orderNo);
// 确认消息有没有被收到,false表示手动确认 在处理完消息时,返回应答状态
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

(6)测试结果

RabbitMQ死信队列测试结果

发送消息时可以看到RabbitMQ管理界面的ORDER_DL_QUEUE队列有一条待消费的消息,然后在20秒过期后变成死信队列发送至ORDER_DL_EXCHANGE交换器,然后交换器根据路由转发到ORDER_REDIRECT_QUEUE队列,并被监听消费掉。

RabbitMQ控制台死信队列监测

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2020 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :