使用RabbitMQ实现未支付订单在30分钟后自动过期

RabbitMQ延迟队列应用

延迟队列可以实现消息在投递到Exchange之后,经过一定的时间之后再路由到相应的Queue。最后被消费者监听消费。即:生产者投递的消息经过一段时间之后再被消费者消费

常见业务场景:订单在30分钟内还未支付则自动取消。

该业务的其他实现方案:

  • 使用Redis,设置过期时间,监听过期事件。
  • 使用RabbitMQ的过期队列与死信队列,设置消息的存活时间,在设置的时间内未被消费,即会投递到死信队列,我们监听死信队列即可。

本文介绍使用RabbitMQ延迟队列来实现。

RabbitMQ延迟队列实现

RabbitMQ实现延迟队列需要依赖插件rabbitmq-delayed-message-exchange

RabbitMQ插件列表: https://www.rabbitmq.com/community-plugins.html
延迟队列插件下载地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

  • 将下载的插件copy到RabbitMQ的plugins目录
  • Mac下的插件路径为/usr/local/rabbitmq/3.8.7/plugins
  • 安装并启用
1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 重启RabbitMQ

业务相关代码编写

实现订单在规定的时间内还未支付则过期

  • 订单实体(仅保留相关字段)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

import java.time.LocalDateTime;

/**
* 订单
*/
@Getter
@Setter
@Builder
@TableName("t_order")
public class Order {

/**
* 主键
*/
@TableId(value = "id", type = IdType.UUID)
private String id;

/**
* 下单用户主键
*/
@TableField("user_id")
private String userId;

/**
* 订单状态
*
* @see OrderStatusEnum
*/
@TableField("status")
private int status;

/**
* 支付时间
*/
@TableField("pay_date_time")
private LocalDateTime payDateTime;

/**
* 订单过期时间
*/
@TableField("expire_date_time")
private LocalDateTime expireDateTime;

/**
* 下单时间
*/
@TableField("create_date_time")
private LocalDateTime createDateTime;
}
  • 订单状态枚举(仅保留相关状态)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* 订单状态
*/
@Getter
@AllArgsConstructor
public enum OrderStatusEnum {

/**
* 100=待支付
*/
TO_BE_PAID(100, "待支付"),

/**
* 200=已过期
*/
EXPIRED(200, "已过期"),

/**
* 300=已支付
*/
PAID(300, "已支付");

/**
* 订单状态
*/
private final int status;

/**
* 描述
*/
private final String description;

}
  • OrderMapper
1
2
3
4
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

public interface OrderMapper extends BaseMapper<Order> {
}
  • 模拟下定的接口OrderController,为了简单起见,省略了Service层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import com.futao.springboot.learn.rabbitmq.doc.delaymessage.entity.Order;
import com.futao.springboot.learn.rabbitmq.doc.delaymessage.entity.OrderStatusEnum;
import com.futao.springboot.learn.rabbitmq.doc.delaymessage.mapper.OrderMapper;
import com.futao.springboot.learn.rabbitmq.doc.delaymessage.rabbitmq.OrderSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneOffset;

/**
* 订单接口
*/
@RestController
@RequestMapping("/order")
public class OrderController {

@Resource
private OrderMapper orderMapper;

@Autowired
private OrderSender orderSender;

/**
* 下单
*/
@PostMapping
public void add() {
Order order = Order
.builder()
.userId("joe")
//待支付
.status(OrderStatusEnum.TO_BE_PAID.getStatus())
.createDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)))
.build();
//订单入库
orderMapper.insert(order);
//投递延迟消息
orderSender.send(order.getId());
}
}

RabbitMQ相关代码编写

  • 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
spring:
rabbitmq:
host: localhost
port: 5672
username: futao
password: 123456789
virtual-host: delay-vh
connection-timeout: 15000
# 发送确认
publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
mandatory: true
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 手动签收ACK
acknowledge-mode: manual

app:
rabbitmq:
# 延迟时长设置
delay:
order: 10S
# 队列定义
queue:
order:
delay: order-delay-queue
# 交换机定义
exchange:
order:
delay: order-delay-exchange
  • 延迟交换机,队列定义与绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* 队列,交换机定义与绑定
* 延迟队列插件`rabbitmq-delayed-message-exchange`下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
*/
@Configuration
public class Declare {

/**
* 订单队列 - 接收延迟投递的订单
*
* @param orderQueue 订单队列名称
* @return
*/
@Bean
public Queue orderDelayQueue(@Value("${app.rabbitmq.queue.order.delay}") String orderQueue) {
return QueueBuilder
.durable(orderQueue)
.build();
}

/**
* 订单交换机-延迟交换机 - 消息延迟一定时间之后再投递到绑定的队列
*
* @param orderExchange 订单延迟交换机
* @return
*/
@Bean
public Exchange orderDelayExchange(@Value("${app.rabbitmq.exchange.order.delay}") String orderExchange) {
Map<String, Object> args = new HashMap<>(1);
args.put("x-delayed-type", "topic");
return new CustomExchange(orderExchange, "x-delayed-message", true, false, args);
}

/**
* 订单队列-交换机 绑定
*
* @param orderQueue 订单队列
* @param orderDelayExchange 订单交换机
* @return
*/
@Bean
public Binding orderBinding(Queue orderDelayQueue, Exchange orderDelayExchange) {
return BindingBuilder
.bind(orderDelayQueue)
.to(orderDelayExchange)
.with("order.delay.*")
.noargs();
}

}

可以看出队列就是普通的队列,重点在交换机的设定上。声明延迟交换机需要设置参数x-delayed-type,值为交换机类型,可以是fanout,topic,direct。并且设置交换机的type为x-delayed-message

定义完成后可以启动Spring Boot应用程序,在RabbitMQ管理后台查看ExchangeQueue

rabbitmq-delay-exchange

可以看到,除了默认的交换机,Spring Boot已经帮我们创建好了延迟交换机order-delay-exchange,并且此时messages delayed为0,因为我们还未向交换机投递消息。

  • 可以继续查看交换机的路由类型与绑定的队列

rabbitmq-exchange-detail

  • 在查看队列,为普通的队列

rabbitmq-queue-detail

  • 回到代码中,定义消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.time.Duration;

@Slf4j
@Component
public class OrderSender {

/**
* 订单延迟时长
*/
@Value("${app.rabbitmq.delay.order}")
private Duration delayTime;

/**
* 订单延迟交换机名称
*/
@Value("${app.rabbitmq.exchange.order.delay}")
private String orderExchange;

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 订单信息投递到延迟交换机
*
* @param orderId 订单主键
*/
public void send(String orderId) {
rabbitTemplate.convertAndSend(orderExchange, "order.delay.abc", orderId, message -> {
MessageProperties messageProperties = message.getMessageProperties();
//设置消息的延迟投递时间
messageProperties.setDelay((int) delayTime.toMillis());
return message;
});
log.info("订单[{}]投递到MQ", orderId);
}
}

在消息投递之前为每条消息都设置了延迟时长setDelay()。调用消费者的代码在上面OrderController中,下定之后,订单数据落库,并且向MQ中投递延迟消息。

  • 消费者:监听过期的订单信息,并且将DB中相应的订单设置为已过期。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.futao.springboot.learn.rabbitmq.doc.delaymessage.entity.Order;
import com.futao.springboot.learn.rabbitmq.doc.delaymessage.entity.OrderStatusEnum;
import com.futao.springboot.learn.rabbitmq.doc.delaymessage.mapper.OrderMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;

/**
* 延迟订单监听
*/
@Slf4j
@Component
public class OrderConsumer {

@Resource
private OrderMapper orderMapper;

/**
* 延迟订单监听,设置订单为已过期
*
* @param orderId
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = "${app.rabbitmq.queue.order.delay}")
public void consumer(String orderId, Channel channel, Message message) throws IOException, InterruptedException {
log.info("消费者接收到延迟订单[{}]", orderId);
//将订单状态设置成已超时过期
orderMapper.update(null,
Wrappers.<Order>lambdaUpdate()
.eq(Order::getId, orderId)
//待支付状态
.eq(Order::getStatus, OrderStatusEnum.TO_BE_PAID.getStatus())
//设置成已过期
.set(Order::getStatus, OrderStatusEnum.EXPIRED.getStatus())
//设置过期时间
.set(Order::getExpireDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)))
);
log.info("订单业务处理结束.....进行消息ack签收");
//msg ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

为了方便查看到延迟投递的效果,在消息投递和接收处都打印了日志,测试时可以看到消息投递和消息的时间间隔。

测试

把订单过期时长设置为10S

1
2
3
4
app:
rabbitmq:
delay:
order: 10S

下单

1
2020-05-15 22:22:05.387  INFO 72852  OrderSender       : 订单[666ae86aabe2a1b3120b34bb5f447bbe]投递到MQ

可以看到,打印出了投递日志,订单主键为666ae86aabe2a1b3120b34bb5f447bbe的订单在2020-05-15 22:22:05.387进行了投递,此时数据库中该订单的状态为100,待支付。

此时查看rabbitMQ控制台Exchange详情可以发现,messages delayed:1,即目前有一条消息处于延迟状态。

等待10S后

1
2
2020-05-15 22:22:05.387  OrderSender       : 订单[666ae86aabe2a1b3120b34bb5f447bbe]投递到MQ
2020-05-15 22:22:15.320 OrderConsumer : 消费者接收到延迟订单[666ae86aabe2a1b3120b34bb5f447bbe]

可以看到OrderConsumer在10S后2020-05-15 22:22:15.320接收到了主键为666ae86aabe2a1b3120b34bb5f447bbe的订单消息。距离投递时间2020-05-15 22:22:05.387为10S。此时查看DB中订单状态,订单状态为200已过期,且过期时间为2020-05-15 22:22:15

达到了订单在我们指定的时间后过期。

严重风险提示

在实际业务使用中,如果消费者的消费能力比较低下,会存在已经过期的消息阻塞积压在队列,无法在指定的时间内过期,导致业务出现异常。实际上,按照我们业务意图,队里Queue里是不应该有大量消息存在的,因为投递到过期队列的消息已经是过期了的,应该立即被消费掉。

为了降低消费者的消费能力,进行如下处理:

(1)设置消费者的最大并发数为1,并进行手动签收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spring:
rabbitmq:
...
listener:
simple:
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
acknowledge-mode: manual

app:
rabbitmq:
delay:
# 订单过期时间为1分钟
order: 1M

(2)消费者在处理消息时休眠10S

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RabbitListener(queues = "${app.rabbitmq.queue.order.delay}")
public void consumer(String orderId, Channel channel, Message message) throws IOException, InterruptedException {
log.info("消费者接收到延迟订单[{}]", orderId);
//将订单状态设置成已超时过期
orderMapper.update(null,
Wrappers.<Order>lambdaUpdate()
.eq(Order::getId, orderId)
//待支付状态
.eq(Order::getStatus, OrderStatusEnum.TO_BE_PAID.getStatus())
//设置成已过期
.set(Order::getStatus, OrderStatusEnum.EXPIRED.getStatus())
//设置过期时间
.set(Order::getExpireDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)))
);
TimeUnit.SECONDS.sleep(10); //休眠10S
log.info("订单业务处理结束.....进行消息ack签收");
//msg ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

(3)向MQ投递两条消息,预期两条消息都在1分钟后正常过期

(4)输出日志:

1
2
3
4
5
6
7
8
2020-05-15 20:18:05.269  OrderSender       : 订单[d6fd965b11f8db0fafb762d305db83b0]投递到MQ
2020-05-15 20:18:05.765 OrderSender : 订单[77ceb7f1bfbbcaf627224ac75e96b0e5]投递到MQ

2020-05-15 20:19:05.279 OrderConsumer : 消费者接收到延迟订单[d6fd965b11f8db0fafb762d305db83b0]
2020-05-15 20:19:15.316 OrderConsumer : 订单业务处理结束.....进行消息ack签收

2020-05-15 20:19:15.318 OrderConsumer : 消费者接收到延迟订单[77ceb7f1bfbbcaf627224ac75e96b0e5]
2020-05-15 20:19:25.330 OrderConsumer : 订单业务处理结束.....进行消息ack签收

第一个订单d6fd965b11f8db0fafb762d305db83b0投递时间为2020-05-15 20:18:05.269。1分钟后2020-05-15 20:19:05.279接收到了通知,并且处理了10S后进行了签收ack 第二个订单77ceb7f1bfbbcaf627224ac75e96b0e5投递时间为2020-05-15 20:18:05.765。1分钟过后并没有收到通知,而是在第一个订单处理完毕之后,2020-05-15 20:19:15.318才收到了通知,比预期的时间长了10秒,实际延迟时间为1分钟+10秒。出现了业务异常

导致这个问题的原因就是消费者无法及时消费消息并更新订单状态。所以我们在进行开发时,需要考虑实际的数据量大小,消费者消费能力。及时关注队列消息积压情况,灵活调整消费者并发数量,优化消费者代码,提高消费者消费能力,保证代码的执行结果符合我们的预期。

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2021 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :