商品库存消息消费开发

This commit is contained in:
jieyuu 2024-09-02 20:34:20 +08:00
parent dc84ba667a
commit 7a79145dc2
6 changed files with 167 additions and 1 deletions

View File

@ -0,0 +1,19 @@
package net.jieyuu.feign;
import net.jieyuu.utils.JsonData;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "xdclass-order-service")
public interface ProductOrderFeignService {
/**
* 查询订单状态
*
* @param outTradeNo
* @return
*/
@GetMapping("/api/order/v1/query_state")
JsonData queryProductOrderState(@RequestParam("out_trade_no") String outTradeNo);
}

View File

@ -22,4 +22,11 @@ public interface ProductMapper extends BaseMapper<ProductDO> {
* @return
*/
int lockProductStock(@Param("productId") long productId, @Param("buyNum") int buyNum);
/**
* 解锁商品库存
* @param productId
* @param buyNum
*/
void unlockProductStock(@Param("productId")Long productId, @Param("buyNum")Integer buyNum);
}

View File

@ -0,0 +1,60 @@
package net.jieyuu.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.jieyuu.model.ProductMessage;
import net.jieyuu.service.ProductService;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = "${mqconfig.stock_release_queue}")
public class ProductStockMQListener {
@Autowired
private ProductService productService;
@Autowired
private RedissonClient redissonClient;
@RabbitHandler
public void releaseProductStock(ProductMessage productMessage, Message message, Channel channel) throws IOException {
log.info("监听到消息:releaseProductStock", productMessage);
long msgTag = message.getMessageProperties().getDeliveryTag();
boolean flag = productService.releaseProductStock(productMessage);
// 防止同时解锁多任务并发进入
// 串行消费不需要加锁
try {
if (flag) {
// 确认消息消费成功 不需要重新入队
channel.basicAck(msgTag, false);
} else {
log.error("释放商品库存异常 flag=false,{}", productMessage);
// 选择重新入队
channel.basicReject(msgTag, true);
}
} catch (IOException e) {
log.error("释放商品库存异常:{},msg:{}", e, productMessage);
// 选择重新入队
// 再发生问题就往外抛
channel.basicReject(msgTag, true);
}
}
// @RabbitHandler
// public void releaseCouponRecord2(String msg, Message message, Channel channel) throws IOException {
// log.info(msg);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
//
// }
}

View File

@ -1,6 +1,7 @@
package net.jieyuu.service;
import net.jieyuu.model.ProductMessage;
import net.jieyuu.request.LockProductRequest;
import net.jieyuu.utils.JsonData;
import net.jieyuu.vo.ProductVO;
@ -50,4 +51,12 @@ public interface ProductService {
* @return
*/
JsonData lockProductStock(LockProductRequest lockProductRequest);
/**
* 释放商品库存
*
* @param productMessage
* @return
*/
boolean releaseProductStock(ProductMessage productMessage);
}

View File

@ -6,8 +6,11 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import net.jieyuu.config.RabbitMQConfig;
import net.jieyuu.enums.BizCodeEnum;
import net.jieyuu.enums.CouponStateEnum;
import net.jieyuu.enums.ProductOrderStateEnum;
import net.jieyuu.enums.StockTaskStateEnum;
import net.jieyuu.exception.BizException;
import net.jieyuu.feign.ProductOrderFeignService;
import net.jieyuu.mapper.ProductTaskMapper;
import net.jieyuu.model.ProductDO;
import net.jieyuu.mapper.ProductMapper;
@ -54,6 +57,9 @@ public class ProductServiceImpl extends ServiceImpl<ProductMapper, ProductDO> im
@Autowired
private RabbitMQConfig rabbitMQConfig;
@Autowired
private ProductOrderFeignService productOrderFeignService;
/**
* 分页查询商品列表
*
@ -153,7 +159,7 @@ public class ProductServiceImpl extends ServiceImpl<ProductMapper, ProductDO> im
ProductMessage productMessage = new ProductMessage();
productMessage.setOutTradeNo(outTradeNo);
productMessage.setTaskId(productTaskDO.getId());
rabbitTemplate.convertAndSend(rabbitMQConfig.getEventExchange(), rabbitMQConfig.getStockReleaseDelayRoutingKey(), productMessage);
log.info("商品库存锁定信息延迟消息发送成功:{}", productMessage);
}
@ -161,6 +167,65 @@ public class ProductServiceImpl extends ServiceImpl<ProductMapper, ProductDO> im
return null;
}
/**
* 释放商品库存
*
* @param productMessage
* @return
*/
@Override
public boolean releaseProductStock(ProductMessage productMessage) {
// 查询商品任务
ProductTaskDO taskDO = productTaskMapper.selectOne(new QueryWrapper<ProductTaskDO>().eq("id", productMessage.getTaskId()));
if (taskDO == null) {
log.warn("工作单不存在,消息体为:{}", productMessage);
}
// lock状态才处理
if (StockTaskStateEnum.LOCK.name().equalsIgnoreCase(taskDO.getLockState())) {
// 调用feign查询对应订单的状态
// 锁定状态才处理
JsonData jsonData = productOrderFeignService.queryProductOrderState(productMessage.getOutTradeNo());
// 正常响应
if (jsonData.getCode() == 0) {
// 判断状态
String state = jsonData.getData().toString();
if (ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)) {
// 状态是NEW返回给消息队列重新投递
log.warn("状态是NEW,返回给消息队列,重新投递:{}", productMessage);
return false;
}
// 订单已支付
if (ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)) {
// 已经支付修改task状态为finish
taskDO.setLockState(StockTaskStateEnum.FINISH.name());
productTaskMapper.update(taskDO, new QueryWrapper<ProductTaskDO>().eq("id", productMessage.getTaskId()));
log.info("订单已经支付,修改库存锁定工单为FINISH状态:{}", productMessage);
return true;
}
}
log.warn("订单不存在,或订单取消,确认消息,修改task状态为CANCLE,恢复商品库存,message:{}", productMessage);
taskDO.setLockState(StockTaskStateEnum.CANCEL.name());
// 修改task状态为CANCEL
productTaskMapper.update(taskDO, new QueryWrapper<ProductTaskDO>().eq("id", productMessage.getTaskId()));
// 恢复商品库存
// 即锁定库存的值减去当前的buyNum
productMapper.unlockProductStock(taskDO.getProductId(),taskDO.getBuyNum());
return true;
} else {
// 工单状态不是lock
// 认定为可能为重复消费,不需要重新投递
log.warn("工单状态不为LOCK,state={},消息体:{}", taskDO.getLockState(), productMessage);
return true;
}
}
private ProductVO beanProcess(ProductDO obj) {
ProductVO productVO = new ProductVO();
BeanUtils.copyProperties(obj, productVO);

View File

@ -28,6 +28,12 @@
where id = #{productId}
and stock - lock_stock >={buyNum}
</update>
<!--解锁商品库存-->
<update id="unlockProductStock">
update product
set lock_stock = lock_stock - #{buyNum}
where id = #{productId}
</update>
</mapper>