优惠券微服务延迟消息消费完成

This commit is contained in:
jieyuu 2024-08-27 23:34:38 +08:00
parent 7ded360e7c
commit c9e8e99fe4
7 changed files with 182 additions and 6 deletions

View File

@ -8,7 +8,7 @@ public enum StockTaskStateEnum {
/**
* 完成
*/
FINISHED,
FINISH,
/**
* 取消
*/

View File

@ -0,0 +1,20 @@
package net.jieyuu.feign;
import net.jieyuu.utils.JsonData;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "xdclass-order-service")
public interface ProductOrderFeignService {
/**
* 查询订单状态
*
* @param outTradeNo
* @return
*/
@GetMapping("/api/order/v1/query_state")
JsonData queryProductOrderState(@RequestParam("out_trade_no") String outTradeNo);
}

View File

@ -24,5 +24,13 @@ public interface CouponRecordMapper extends BaseMapper<CouponRecordDO> {
* @param lockCouponRecordIds
* @return
*/
int lockUseStateBatch(@Param("userId") Long id, @Param("useState")String useState, @Param("lockCouponRecordIds")List<Long> lockCouponRecordIds);
int lockUseStateBatch(@Param("userId") Long id, @Param("useState") String useState, @Param("lockCouponRecordIds") List<Long> lockCouponRecordIds);
/**
* 更新优惠券使用记录
*
* @param couponRecordId
* @param state
*/
void updateState(@Param("couponRecordId") Long couponRecordId,@Param("state") String state);
}

View File

@ -0,0 +1,65 @@
package net.jieyuu.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.jieyuu.model.CouponRecordMessage;
import net.jieyuu.service.CouponRecordService;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
@Slf4j
@Component
@RabbitListener(queues = "#{mqconfig.coupon_release_queue}")
public class CouponMQListener {
@Autowired
private CouponRecordService couponRecordService;
@Autowired
private RedissonClient redissonClient;
/**
* 重复消费幂等性
* 消费失败重新入队最大重试次数
* 消费失败不重新入队而是通过日志后插入数据库
*
* @param recordMessage
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException {
log.info("监听到消息:releaseCouponRecord:{}", recordMessage);
long msgTag = message.getMessageProperties().getDeliveryTag();
boolean flag = couponRecordService.releaseCouponRecord(recordMessage);
// 防止同时解锁多任务并发进入
// 串行消费不需要加锁
// Lock lock = redissonClient.getLock("lock:coupon_record_release:" + recordMessage.getTaskId());
// lock.lock();
try {
if (flag) {
// 确认消息消费成功 不需要重新入队
channel.basicAck(msgTag, false);
} else {
log.error("释放优惠券失败 flag=false,{}", recordMessage.toString());
// 选择重新入队
channel.basicReject(msgTag, true);
}
} catch (IOException e) {
log.error("释放优惠券异常:{},msg:{}", e, recordMessage.toString());
// 选择重新入队
// 再发生问题就往外抛
channel.basicReject(msgTag, true);
}
}
}

View File

@ -2,6 +2,7 @@ package net.jieyuu.service;
import net.jieyuu.model.CouponRecordDO;
import com.baomidou.mybatisplus.extension.service.IService;
import net.jieyuu.model.CouponRecordMessage;
import net.jieyuu.request.LockCouponRecordRequest;
import net.jieyuu.utils.JsonData;
import net.jieyuu.vo.CouponRecordVO;
@ -41,4 +42,12 @@ public interface CouponRecordService extends IService<CouponRecordDO> {
* @return
*/
JsonData lockCouponRecords(LockCouponRecordRequest recordRequest);
/**
* 释放优惠券记录
*
* @param recordMessage
* @return
*/
boolean releaseCouponRecord(CouponRecordMessage recordMessage);
}

View File

@ -6,10 +6,9 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import net.jieyuu.config.InterceptorConfig;
import net.jieyuu.config.RabbitMQConfig;
import net.jieyuu.enums.BizCodeEnum;
import net.jieyuu.enums.CouponStateEnum;
import net.jieyuu.enums.StockTaskStateEnum;
import net.jieyuu.enums.*;
import net.jieyuu.exception.BizException;
import net.jieyuu.feign.ProductOrderFeignService;
import net.jieyuu.interceptor.LoginInterceptor;
import net.jieyuu.mapper.CouponTaskMapper;
import net.jieyuu.model.CouponRecordDO;
@ -22,10 +21,13 @@ import net.jieyuu.service.CouponRecordService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import net.jieyuu.utils.JsonData;
import net.jieyuu.vo.CouponRecordVO;
import org.nustaq.serialization.annotations.Flat;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.HashMap;
@ -55,6 +57,9 @@ public class CouponRecordServiceImpl extends ServiceImpl<CouponRecordMapper, Cou
@Autowired
private RabbitMQConfig rabbitMQConfig;
@Autowired
private ProductOrderFeignService orderFeignService;
@Override
public Map<String, Object> page(int page, int size) {
LoginUser loginUser = LoginInterceptor.threadLocal.get();
@ -135,6 +140,68 @@ public class CouponRecordServiceImpl extends ServiceImpl<CouponRecordMapper, Cou
}
}
/**
* 解锁优惠券记录
* 1)查询task是否存在
* 2)查询订单状态
*
* @param recordMessage
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public boolean releaseCouponRecord(CouponRecordMessage recordMessage) {
CouponTaskDO taskDO = couponTaskMapper.selectOne(
new QueryWrapper<CouponTaskDO>().eq("id", recordMessage.getTaskId())
);
if (taskDO == null) {
log.warn("工作单不存在,消息:{}", recordMessage);
// 不需入队
return true;
}
// 判断是否为锁定状态
if (taskDO.getLockState().equalsIgnoreCase(StockTaskStateEnum.LOCK.name())) {
// 锁定状态才处理
JsonData jsonData = orderFeignService.queryProductOrderState(recordMessage.getOutTradeNo());
// 正常响应
if (jsonData.getCode() == 0) {
// 判断状态
String state = jsonData.getData().toString();
if (ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)) {
// 状态是NEW返回给消息队列重新投递
log.warn("状态是NEW,返回给消息队列,重新投递:{}", recordMessage);
return false;
}
// 订单已支付
if (ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)) {
// 已经支付修改task状态为finish
taskDO.setLockState(StockTaskStateEnum.FINISH.name());
couponTaskMapper.update(taskDO, new QueryWrapper<CouponTaskDO>().eq("id", taskDO.getId()));
log.info("订单已经支付,修改库存锁定工单为FINISH状态:{}", recordMessage);
return true;
}
}
log.warn("订单不存在,或订单取消,确认消息,修改task状态为CANCLE,恢复优惠券使用记录为NEW,message:{}", recordMessage);
taskDO.setLockState(StockTaskStateEnum.CANCEL.name());
// 修改task状态为CANCEL
couponTaskMapper.update(taskDO, new QueryWrapper<CouponTaskDO>().eq("id", taskDO.getId()));
// 恢复优惠券使用记录为NEW
couponRecordMapper.updateState(taskDO.getCouponRecordId(), CouponStateEnum.NEW.name());
return true;
} else {
// 工单状态不是lock
// 认定为可能为重复消费,不需要重新投递
log.warn("工单状态不为LOCK,state={},消息体:{}", taskDO.getLockState(), recordMessage);
return true;
}
}
private CouponRecordVO beanProcess(CouponRecordDO couponRecordDO) {
CouponRecordVO couponRecordVO = new CouponRecordVO();
BeanUtils.copyProperties(couponRecordDO, couponRecordVO);

View File

@ -24,7 +24,7 @@
, coupon_id, create_time, use_state, user_id, user_name, coupon_title, start_time, end_time, order_id, price, condition_price
</sql>
<!-- 批量更新优惠券 -->
<!-- 批量更新优惠券 -->
<update id="lockUseStateBatch">
update coupon_record
set use_state =#{useState} where user_id = #{userId} and use_state = 'NEW'
@ -34,4 +34,11 @@
</foreach>
</update>
<!-- 更新优惠券状态 -->
<update id="updateState">
update coupon_record
set use_state=#{state}
where id = #{couponRecordId}
</update>
</mapper>