订单服务消息队列配置

This commit is contained in:
jieyuu 2024-09-23 21:13:06 +08:00
parent 97727917c0
commit d37c46342f
3 changed files with 149 additions and 5 deletions

View File

@ -0,0 +1,128 @@
package net.jieyuu.config;
import lombok.Data;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
@Value("${mqconfig.order_event_exchange}")
private String eventExchange;
/**
* 延迟队列
*/
@Value("${mqconfig.order_close_delay_queue}")
private String orderCloseDelayQueue;
/**
* 关单队列
*/
@Value("${mqconfig.order_close_queue}")
private String orderCloseQueue;
/**
* 进入延迟队列的路由key
*/
@Value("${mqconfig.order_close_delay_routing_key}")
private String orderCloseDelayRoutingKey;
/**
* 进入死信队列的路由key
*/
@Value("${mqconfig.order_close_routing_key}")
private String orderCloseRoutingKey;
/**
* 过期时间
*/
@Value("${mqconfig.ttl}")
private Integer ttl;
/**
* 消息转换器
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 自动创建交换机 Topic类型 也可dirct交换机
* 一个微服务一个交换机
*
* @return
*/
@Bean
public Exchange orderEventExchange() {
return new TopicExchange(eventExchange, true, false);
}
/**
* 延迟队列
*
* @return
*/
@Bean
public Queue orderCloseDelayQueue() {
Map<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-routing-key", orderCloseRoutingKey);
args.put("x-dead-letter-exchange", eventExchange);
args.put("x-message-ttl", ttl);
return new Queue(orderCloseDelayQueue, true, false, false, args);
}
/**
* 死信队列 普通队列用于被监听
*
* @return
*/
@Bean
public Queue orderCloseQueue() {
return new Queue(orderCloseQueue, true, false, false);
}
/**
* 延迟队列建立绑定关系
*
* @return
*/
@Bean
public Binding orderCloseDelayBinding() {
return new Binding(orderCloseDelayQueue, Binding.DestinationType.QUEUE, eventExchange, orderCloseDelayRoutingKey, null);
}
/**
* 死信队列建立绑定关系
*
* @return
*/
@Bean
public Binding orderCloseBinding() {
return new Binding(orderCloseQueue, Binding.DestinationType.QUEUE, eventExchange, orderCloseRoutingKey, null);
}
}

View File

@ -112,7 +112,7 @@ public class ProductOrderServiceImpl extends ServiceImpl<ProductOrderMapper, Pro
//创建订单项 //创建订单项
this.saveproductOrderItems(orderOutTradeNo, productOrderDO.getId(), orderItemList); this.saveproductOrderItems(orderOutTradeNo, productOrderDO.getId(), orderItemList);
// 发送延迟消息用于自动关单 todo // 发送延迟消息用于自动关单 todo
//创建支付 todo //创建支付 todo
@ -146,11 +146,7 @@ public class ProductOrderServiceImpl extends ServiceImpl<ProductOrderMapper, Pro
return itemDO; return itemDO;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
productOrderItemMapper.insertBatch(orderItemDOList); productOrderItemMapper.insertBatch(orderItemDOList);
} }
/** /**

View File

@ -43,4 +43,24 @@ logging:
level: level:
root: INFO root: INFO
#自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue
mqconfig:
#延迟队列,不能被监听消费
order_close_delay_queue: order.close.delay.queue
#延迟队列的消息过期后转发的队列
order_close_queue: order.close.queue
#交换机
order_event_exchange: order.event.exchange
#进入延迟队列的路由key
order_close_delay_routing_key: order.close.delay.routing.key
#消息过期进入释放队列的key,进入死信队列的key
order_close_routing_key: order.close.routing.key
#消息过期时间,毫秒,测试改为15秒
ttl: 15000