SpringBoot使用redis实现延时队列

harry发布

使用Redis实现延迟队列

常见延迟队列实现方式

延迟队列的实现方式有很多种,通过程序的方式实现,例如 JDK 自带的延迟队列 DelayQueue,通过 MQ 框架来实现,例如 RocketMQ、RabbitMQ等,通过 Redis 的方式来实现延迟队列 。

Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。

优点

  • 灵活方便,Redis 是互联网公司的标配,无序额外搭建相关环境;

  • 可进行消息持久化,大大提高了延迟队列的可靠性;

  • 分布式支持,不像 JDK 自身的 DelayQueue;

  • 高可用性,利用 Redis 本身高可用方案,增加了系统健壮性。

缺点

需要使用无限循环的方式来执行任务检查,会消耗少量的系统资源。

结合以上优缺点,我们决定使用 Redis 来实现延迟队列,具体实现代码如下。

一、Redis实现延迟队列

  • 失效监听

  • redisson实现发布订阅延迟

二、redis失效监听事件 集成KeyExpirationEventMessageListener类实现redis失效监听事件

三、此种实现面临的问题 redis的失效监听事件会存在一定的时间差,并且当数据量越大时,误差会越大。 redis的失效监听事件会将所有key失效都会通知到onMessage,如果针对一个key,分布式业务的场景下,会出现重复消费的问题。(可以增加分布式锁的实现,但是redisson分布式锁提供了另一种延迟队列的实现方式)

代码示例:

POM 依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

消息生产者: MessageProvider


@Component
public class MessageProvider {
    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
    @Resource
    private RedisMq redisMq;
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * 发送消费信息
     *
     * @param message
     * @return
     */
    public boolean sendMessage(@Validated RedisMessage message) {
        Assert.notNull(message);
        //The priority is if there is no creation time
        message.setCreateTime(System.currentTimeMillis());
        message.setId(IdUtil.fastUUID());
        long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        try {
            redisMq.addMsgPool(message);
            redisMq.enMessage(RedisMq.QUEUE_NAME + message.getGroup(), delayTime, message.getId());
            logger.info("RedisMq发送消费信息{},当前时间:{},消费时间预计{}", message, new Date(), sdf.format(delayTime));
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("RedisMq 消息发送失败,当前时间:{}", new Date());
            return false;
        }
        return true;
    }

    /**
     * 发送消费信息
     *
     * @param body
     * @param group
     * @param delay
     * @return
     */
    public boolean orderDelayQueue(long body, String group, long delay) {
        RedisMessage message = new RedisMessage();
        message.setBody(body);
        message.setGroup(group);
        message.setDelay(delay);
        message.setTtl(delay * 2);
        return sendMessage(message);
    }
}

消息主体:RedisMessage


@Data
@Accessors(chain = true)
public class RedisMessage implements Serializable {

    /**
     * 消息队列组
     */
    private String group;
    /**
     * 消息id
     */
    private String id;

    /**
     * 消息延迟/ 秒
     */
    @NotNull(message = "消息延时时间不能为空")
    private long delay;

    /**
     * 消息存活时间 单位:秒
     */
    @NotNull(message = "消息存活时间不能为空")
    private long ttl;

    /**
     * 消息体,对应业务内容
     */
    private long body;

    /**
     * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
     * 用来消除时间的影响
     */
    private long createTime;
}

Redis消息队列


@Component
public class RedisMq {

    /**
     * 消息池前缀,以此前缀加上传递的消息id作为key,以消息 MSG_POOL 的消息体body作为值存储
     */
    public static final String MSG_POOL = "Message:Pool:";

    /**
     * zset队列 名称 queue
     */
    public static final String QUEUE_NAME = "Message:Queue:";

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 存入消息池
     *
     * @param message
     * @return
     */
    public boolean addMsgPool(RedisMessage message) {
        if (null != message) {
            ValueOperations<String, Object> valueOperations = redisTemplate.opsForValue();
            valueOperations.set(MSG_POOL + message.getGroup() + message.getId(), JSONUtil.toJsonStr(message), message.getTtl(), TimeUnit.SECONDS);
            return true;
        }
        return false;
    }

    /**
     * 从消息池中删除消息
     *
     * @param group
     * @param id
     */
    public void delMsgPool(String group, String id) {
        redisTemplate.delete(MSG_POOL + group + id);
    }

    /**
     * 向队列中添加消息
     *
     * @param key
     * @param score 优先级
     * @param val
     * @return 返回消息id
     */
    public void enMessage(String key, long score, String val) {
        ZSetOperations<String, Object> zSetOperations = redisTemplate.opsForZSet();
        zSetOperations.add(key, val, score);
    }

    /**
     * 从队列删除消息
     *
     * @param id
     * @return
     */
    public void delMessage(String key, String id) {
        ZSetOperations<String, Object> zSetOperations = redisTemplate.opsForZSet();
        zSetOperations.remove(key, id);
    }

}

Redis消息消费者


@Slf4j
@Component
public class RedisMqConsumer {

    @Resource
    private RedisMq redisMq;

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private MessageProvider provider;

    @Resource
    private List<RedisMqExecute> mqExecutes; // 定义一个接口,定义 getQueueName名称

    @Scheduled(cron = "* * * * * ? ")
    public void baseMonitor() {
        long current = System.currentTimeMillis();
        log.info("baseMonitor===》方法进来了,now time:{}", DateUtil.date(current));

        for (RedisMqExecute mqExecute : mqExecutes) {
            // 消息队列名称
            String queueName = RedisMq.QUEUE_NAME + mqExecute.getQueueName();
            // 消息池前缀,以此前缀加上传递的消息id作为key
            String msgPool = RedisMq.MSG_POOL + mqExecute.getQueueName();

            //查询当前已过期的消息队列
            Set<Object> set = redisTemplate.opsForZSet().rangeByScore(queueName, 0, current);
            if (null != set) {
                for (Object id : set) {
                    long score = redisTemplate.opsForZSet().score(queueName, id.toString()).longValue();
                    // 确认到期,然后执行消费
                    if (current >= score) {
                        String message;
                        RedisMessage redisMessage = null;
                        try {
                            message = (String) redisTemplate.opsForValue().get(msgPool + id);
                            log.debug("RedisMessage success now Time:{}", DateUtil.date(current));
                            if (null == message) {
                                return;
                            }
                            redisMessage = JSONUtil.toBean(message, RedisMessage.class);
                            mqExecute.execute(redisMessage);
                        } catch (Exception e) {
                            e.printStackTrace();
                            // 如果发生异常,将其放回队列
                            log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}", new Date());
                            provider.sendMessage(redisMessage);
                        } finally {
                            redisMq.delMessage(queueName, id.toString());
                            if (redisMessage != null) {
                                redisMq.delMsgPool(redisMessage.getGroup(), id.toString());
                            }
                        }
                    }
                }
            }
        }
    }
}

消费主体类型:RedisMqExecute


public interface RedisMqExecute {

    /**
     * 获取队列名称 可定义枚举值 QueueNameEnums
     * @return
     */
    String getQueueName();
    /**
     * 统一的通过执行期执行
     *
     * @param message
     * @return
     */
    boolean execute(RedisMessage message);
}

消费实现:OrderCarInMqExecuteImpl


@Slf4j
@Service
public class OrderCarInMqExecuteImpl implements RedisMqExecute {
    @Override
    public String getQueueName() {
        return QueueNameEnums.IN.getKey();
    }

    @Override
    public boolean execute(RedisMessage message) {
        log.info("执行了 CarInMq ======》{} ,message.getBody:{}, 消费时间:{}", message,message.getBody(), DateUtil.date());
        // 业务逻辑........
        return true;
    }
}

0 条评论

发表回复

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用*标注