java redis实现消息队列功能

背景:

需求: 业务中需要批量处理任务,且需要每个任务间隔一段时间。最好在不同服务器同时运行不影响每个任务间隔。
部署环境: 没有mq队列,有redis。
秉着尽量不多增加系统复杂度的情况,使用redis来实现队列功能。

首先看一下代码:

1.1.核心代码


import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @program: wys-service
 * @description:
 * @author: wuyuanshn
 * @create: 2023-06-07 14:20
 **/
@Slf4j
@Service
public class ListRedisQueue {
    @Resource
    private RedisServiceUtils redisServiceUtils;


    //队列名 OBJECT可以替换为自己的项目名
    public static final String LOCK_KEY = "OBJECT_LIST_QUEUE_%s";

    @Resource
    private RedisTemplate redisTemplate;

    public void produce(String key, String message) {
        redisTemplate.opsForList().rightPush(key, message);

    }

    public void produce(String key, Object... vs) {
        redisTemplate.opsForList().rightPush(key, vs);
    }

    public void produce(String key, List<String> messageList) {
        redisTemplate.opsForList().rightPushAll(key, messageList);
    }

    public void consume(String key) {
        String lockKey = String.format(LOCK_KEY, key);
        try {
            boolean lock = redisServiceUtils.getLock(lockKey, 1800);
            if (!lock) {
                log.info("ListRedisQueue consume lock ");
                return;
            }

            while (true) {
                String msg = (String) redisTemplate.opsForList().leftPop(key);
                if (StringUtils.isBlank(msg)) {
                    log.info("ListRedisQueue consume 获取 end");
                    //执行队列结束触发任务
                    executeEndByKey(key);
                    return;
                }
                log.info("ListRedisQueue consume 获取消息:{}", msg);
                //执行队列任务
                executeByKey(key, msg);

                //更新key过期时间
                redisTemplate.expire(lockKey, 15, TimeUnit.MINUTES);

                try {
                    Random random = new Random();
                    int i = random.nextInt(2000) + 1000;
                    log.info("ListRedisQueue consume sleep:{}", i);
                    Thread.sleep(i);
                } catch (Exception e) {
                    log.error("ListRedisQueue consume sleep error", e);
                }
            }
        } catch (Exception e) {
            log.error("ListRedisQueue consume error", e);
        } finally {
            redisServiceUtils.releaseLock(lockKey);
        }

    }

    /**
     * 执行队列任务
     *
     * @param key
     * @param value
     */
    public void executeByKey(String key, String value) {
        ListQueueNameEnum structureEnum = ListQueueNameEnum.getStructureEnum(key);
        if (structureEnum == null) {
            return;
        }
        switch (structureEnum) {
            case URL_BASE:
                if (StringUtils.isNotBlank(value)) {
                    try {
                        //业务
                    } catch (Exception e) {
                        log.error("ListRedisQueue executeByKey ERROR ", e);
                    }
                }
                return;
            default:
        }
    }

    /**
     * 执行队列结束触发任务
     *
     * @param key
     */
    public void executeEndByKey(String key) {
        ListQueueNameEnum structureEnum = ListQueueNameEnum.getStructureEnum(key);
        if (structureEnum == null) {
            return;
        }
        switch (structureEnum) {
            case URL_BASE:
				// 结束业务
                return;
            default:

        }


    }


}



代码讲解:

  • 每个任务(相当于mq消息)接收到后需要如何处理,就在此方法中编辑 executeByKey 。
  • 任务(mq消息)处理完毕时,如果需要做相关逻辑操作,如通知某程序等,在executeEndByKey方法编写。
  • 睡眠时间 random.nextInt(2000) + 1000; 可以根据自己的业务时间情况进行修改。
  • redis锁 代码没有粘贴,使用自己项目中的锁即可。如果项目中没有 网上也有很多。 RedisServiceUtils

1.2.可以根据不同业务类型创建不同枚举(相当于mq名称)



import org.apache.commons.lang.StringUtils;

/**
 * @program: wys-service
 * @description: redis队列名称列表
 * @author: wuyuanshn
 * @create: 2023-06-07 17:15
 **/
public enum ListQueueNameEnum {
    TX_URL_BASE("URL_BASE", "信息"),
    ;

    private String code;
    private String value;

    ListQueueNameEnum(String code, String value) {
        this.code = code;
        this.value = value;
    }

    public String getValue() {
        return value;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public void setValue(String value) {
        this.value = value;
    }

    /**
     * 根据统计编号获取对应枚举实例
     *
     * @param value
     * @return
     */
    public static ListQueueNameEnum getStructureEnum(String value) {
        if (StringUtils.isBlank(value)) {
            return null;
        }
        for (ListQueueNameEnum result : ListQueueNameEnum.values()) {
            if (result.name().equals(value)) {
                return result;
            }
        }
        return null;
    }

}


1.3.测试接口

测试只需要调用consume接口 即可。


    @Resource
    private ListRedisQueue listRedisQueue;

 @PostMapping(value = "/produce")
    public void produce() {
        for (int i = 0; i < 1; i++) {
            listRedisQueue.produce(ListQueueNameEnum.URL_BASE.getCode(), "xxxx", "1233", "345566");
        }
    }

/** 测试只需要调用consume 此方法即可。
*/
    @PostMapping(value = "/consume")
    public void consume() {
      produce(); 
      log.info("生产消息完毕");
        listRedisQueue.consume(ListQueueNameEnum.URL_BASE.getCode());
    }

总结

是使用redis代替mq功能,终究还是比较复杂,虽然实现了多项目间公用一个队列的需求。但是并不完美。如果可以还是尽量用合适的服务如mq等来实现业务。

原文地址:https://blog.csdn.net/wuyuanshun/article/details/131480263

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读1.3k次。在 Redis 中,键(Keys)是非常重要的概念,它们代表了存储在数据库中的数据的标识符。对键的有效管理和操作是使用 Redis 数据库的关键一环,它直接影响到数据的存取效率、系统的稳定性和开发的便利性。本文将深入探讨 Redis 中键的管理和操作,包括键的命名规范、常用的键操作命令以及一些最佳实践。我们将详细介绍如何合理命名键、如何使用键的过期和持久化特性、如何批量删除键等技巧,旨在帮助读者更好地理解并灵活运用 Redis 中的键,从而提高数据管理和操作的效率和可靠性。
文章浏览阅读3.3k次,点赞44次,收藏88次。本篇是对单节点的应用,但从中我们也能推断出一些关于集群的应用,不过大多数公司能搞个主从就已经是不错了,所以你能学会这个已经算是很有用了,关于ES,博主前面也讲过一些基础应用,创建一个工具类利用ES的数据模型进行存储就可以达到一个canal同时对Redis和ES的同步,如果担心出问题,可以把Canal搞成集群的形式,这个后续有时间博主再给大家做讲解。今天就到这里了,觉得不错就支持一下吧。_canal redis
文章浏览阅读8.4k次,点赞8次,收藏18次。Spring Boot 整合Redis实现消息队列,RedisMessageListenerContainer的使用,Pub/Sub模式的优缺点_springboot redis 消息队列
文章浏览阅读978次,点赞25次,收藏21次。在Centos上安装Redis5.0保姆级教程!_centos7 安装redis5.0服务器
文章浏览阅读1.2k次,点赞21次,收藏22次。Docker-Compose部署Redis(v7.2)主从模式首先需要有一个redis主从集群,才能接着做redis哨兵模式。_warning: sentinel was not able to save the new configuration on disk!!!: dev
文章浏览阅读2.2k次,点赞59次,收藏38次。合理的JedisPool资源池参数设置能为业务使用Redis保驾护航,本文将对JedisPool的使用、资源池的参数进行详细说明,最后给出“最合理”配置。_jedispool资源池优化
文章浏览阅读1.9k次。批量删除指定前缀的Key有两中方法,一种是借助 redis-cli,另一种是通过 SCAN命令来遍历所有匹配前缀的 key,并使用 DEL命令逐个删除它们。_redis删除前缀的key
文章浏览阅读890次,点赞18次,收藏20次。1. Redis时一个key-cakye的数据库,key一般是String类型,不过value类型有很多。eg.String Hash List Set SortedSet (基本) | GEO BitMap HyperLog (特殊)2.Redis为了方便学习,将操作不同类型的命令做了分组,在官网可以进行查询。
文章浏览阅读1.1k次,点赞19次,收藏26次。若不使用Redisson,而是用synchronized(this),此时会造成对服务器的加锁,若开始大量查询ID为1的商品,每台机器都会先跑一遍加个锁,然后在查询ID为2的数据,此时需要等待ID为1的锁释放,所以需要将this对象调整为全局商品ID。若在执行bgsave命令时,还有其他redis命令被执行(主线程数据修改),此时会对数据做个副本,然后bgsave命令执行这个副本数据写入rdb文件,此时主线程还可以继续修改数据。在当前redis目录下会生成aof文件,对redis修改数据的命令进行备份。
文章浏览阅读1.5k次,点赞39次,收藏24次。本文全面剖析Redis集群在分布式环境下的数据一致性问题,从基础原理到高级特性,涵盖主从复制、哨兵模式、持久化策略等关键点,同时也分享了关于监控、故障模拟与自适应写一致性策略的实践经验。_redis集群一致性
文章浏览阅读1k次。RDB因为是二进制文件,在保存的时候体积也是比较小的,它恢复的比较快,但是它有可能会丢数据,我们通常在项目中也会使用AOF来恢复数据,虽然AOF恢复的速度慢一些,但是它丢数据的风险要小很多,在AOF文件中可以设置刷盘策略,我们当时设置的就是每秒批量写入一次命令。AOF的含义是追加文件,当redis操作写命令的时候,都会存储这个文件中,当redis实例宕机恢复数据的时候,会从这个文件中再次执行一遍命令来恢复数据。:在Redis中提供了两种数据持久化的方式:1、RDB 2、AOF。
文章浏览阅读1k次,点赞24次,收藏21次。NoSQL(No only SQL)数据库,泛指非关系型数据库,实现对于传统数据库而言的。NoSQL 不依赖业务逻辑方式进行存储,而以简单的 key-value 模式存储。因此大大增加了数据库的扩展能力。不遵循SQL标准不支持ACID远超于SQL的性能Redis是当前比较热门的NOSQL系统之一,它是一个开源的使用ANSI c语言编写的key-value存储系统(区别于MySQL的二维表格的形式存储。
文章浏览阅读988次,点赞17次,收藏19次。在上面的步骤中,我们已经开启了 MySQL 的远程访问功能,但是,如果使用 MySQL 管理工具 navicat 连接 MySQL 服务端时,还是可能会出现连接失败的情况。在实际工作中,如果我们需要从其他地方访问和管理 MySQL 数据库,就需要开启 MySQL 的远程访问功能并设置相应的权限。这对于我们的工作效率和数据安全都有很大的帮助。通过查看 MySQL 用户表,我们可以看到’host’为’%’,说明 root 用户登录 MySQL 的时候,可以允许任意的 IP 地址访问 MySQL 服务端。
文章浏览阅读956次。Redis Desktop Manager(RDM)是一款用于管理和操作Redis数据库的图形化界面工具。提供了简单易用的界面,使用户能够方便地执行各种Redis数据库操作,并且支持多个Redis服务器的连接_redisdesktopmanager安装包
文章浏览阅读1.9k次,点赞52次,收藏27次。缓存击穿指的是数据库有数据,缓存本应该也有数据,但是缓存过期了,Redis 这层流量防护屏障被击穿了,请求直奔数据库。缓存穿透指的是数据库本就没有这个数据,请求直奔数据库,缓存系统形同虚设。缓存雪崩指的是大量的热点数据无法在 Redis 缓存中处理(大面积热点数据缓存失效、Redis 宕机),流量全部打到数据库,导致数据库极大压力。
文章浏览阅读1.2k次。一次命令时间(borrow|return resource + Jedis执行命令(含网络) )的平均耗时约为1ms,一个连接的QPS大约是1000,业务期望的QPS是50000,那么理论上需要的资源池大小是50000 / 1000 = 50个,实际maxTotal可以根据理论值合理进行微调。JedisPool默认的maxTotal=8,下面的代码从JedisPool中借了8次Jedis,但是没有归还,当第9次(jedisPool.getResource().ping())3、发生异常可能的情况。_redis.clients.jedis.exceptions.jedisconnectionexception: could not get a res
文章浏览阅读1k次,点赞27次,收藏18次。在这篇文章中,你将了解到如何在 CentOS 系统上安装 Redis 服务,并且掌握通过自定义域名来访问 Redis 服务的技巧。通过使用自定义域名,你可以方便地管理和访问你的 Redis 数据库,提高工作效率。无论你是开发者、系统管理员还是对 Redis 感兴趣的读者,这篇文章都会为你提供清晰的指导和实用的技巧。阅读本文,轻松搭建自己的 Redis 服务,并体验自定义域名带来的便捷!_redis怎么自定义域名
文章浏览阅读1.1k次,点赞15次,收藏18次。我们post请求,拦截器要预先读取HtppServletRequest里面的body的数据,是通过io的方式,都知道io读取完毕之后,之前的数据是变为null的,但是,当我么后面的接口来委派的时候,也是通过io读取body。我们要考虑一个事情,就是我们要验证数据的重复提交: 首先第一次提交的数据肯定是要被存储的,当而第二次往后,每次提交数据都会与之前的数据产生比对从而验证数据重复提交,我们要具体判断数据是否重复提交的子类。发现数据是成功存入的,剩余7s过期,在10s之内,也就是数据没过期之前,在发送一次。_json.parseobject(str, clazz, auto_type_filter);
文章浏览阅读3.9k次,点赞3次,收藏7次。PHP使用Redis实战实录系列:我们首先检查$redis->connect()方法的返回值来确定是否成功连接到Redis服务器。如果连接失败,我们可以输出相应的错误信息。如果连接成功,我们再执行一些操作,如$redis->set()、$redis->get()等,并检查每个操作的返回结果来判断是否发生了异常。_php redis
文章浏览阅读1.5w次,点赞23次,收藏51次。Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。Redis 是一个高性能的key-value数据库。redis的出现,很大程度补偿了memcached这类key/value存储的不足,在部 分场合可以对关系数据库起到很好的补充作用。_redisdesktopmanager下载