在Spring Boot App中使用Redis Stream通过HTTP长轮询来阻止HTTP响应

如何解决在Spring Boot App中使用Redis Stream通过HTTP长轮询来阻止HTTP响应

我有一个Spring Boot Web应用程序,该应用程序具有更新名为StudioLinking的实体的功能。此实体描述了两个IoT设备之间的临时,可变,描述性逻辑链接,我的Web应用程序是它们的云服务。这些设备之间的链接本质上是短暂的,但是StudioLinking实体保留在数据库中以用于报告。 StudioLinking使用Spring Data / Hibernate以常规方式存储到基于SQL的数据存储中。该StudioLinking实体会不时地使用来自Rest API的新信息进行更新。链接更新后,设备需要响应(更改颜色,音量等)。现在,通过每5秒轮询一次来进行处理,但这会造成人类用户向系统中输入更新以及IoT设备实际更新时的延迟。可能只有一毫秒或最多5秒!显然,增加轮询的频率是不可持续的,并且在绝大多数情况下根本没有更新!

因此,我正在尝试使用HTTP Long Polling在同一应用程序上开发另一个Rest API,该API将在给定StudioLinking实体更新或超时后返回。侦听器不支持WebSocket或类似功能,使我无法进行Long Polling。长轮询可能会导致竞争状态,在这种情况下,您必须考虑以下可能性:对于连续消息,一条消息可能会在HTTP请求之间进入时“丢失”(在关闭和打开连接时,可能会出现新的“更新”如果我使用发布/订阅,则不会被“注意到”。

请注意,此“订阅更新” API仅应返回StudioLinking的最新和最新版本,但仅当存在实际更新或自最后签到。 “订阅更新”客户端将首先发布API请求,以设置新的侦听会话并将其传递,以便服务器知道它们是谁。因为可能有多个设备需要监视对同一StudioLinking实体的更新。我相信我可以通过在Redis XREAD中使用单独命名的使用者来完成此操作。 (请在以后的问题中记住这一点)

经过数小时的研究,我相信完成此操作的方法是使用redis流。

我在Spring Data Redis中找到了有关Redis流的以下两个链接:

https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/ https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281

我也已经阅读了有关长轮询的链接,这两个链接在长轮询期间都只有一个睡眠计时器,用于演示,但显然我想做些有用的事情。

https://www.baeldung.com/spring-deferred-result

这两个链接都非常有用。现在,我可以毫无疑问地确定如何将更新发布到Redis流-(这是未经测试的“伪代码”,但我预计实现此目标不会有任何问题)

// In my StudioLinking Entity

@PostUpdate
public void postToRedis() {
    StudioLinking link = this;
    ObjectRecord<String,StudioLinking> record = StreamRecords.newRecord()
            .ofObject(link)
            .withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
    this.redisTemplate
            .opsForStream()
            .add(record)
            .subscribe(System.out::println);
    atomicInteger.incrementAndGet();
}

但是,在订阅上述流时,我会很沮丧:所以基本上我想在这里做-请原谅伪造的伪代码,这仅出于想法目的。我很清楚,该代码绝不表示该语言和框架的实际行为:)

// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId,@PathVariable("updatesId") Integer updatesId) {
    LOG.info("Received async-deferredresult request");
    DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);

    deferredResult.onTimeout(() -> 
      deferredResult.setErrorResult(
        ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
          .body("IT WAS NOT UPDATED!")));
    
    ForkJoinPool.commonPool().submit(() -> {
        //----------------------------------------------
        // Made up stuff... here is where I want to subscribe to a stream and block!
        //----------------------------------------------
        LOG.info("Processing in separate thread");
        try {
            // Subscribe to Redis Stream,get any updates that happened between long-polls
            // then block until/if a new message comes over the stream
            var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(studioLinkingID,updateList),StreamOffset.create(studioLinkingID,ReadOffset.lastConsumed()),streamListener);
            listenerContainer.start();
        } catch (InterruptedException e) {
        }
        output.setResult("IT WAS UPDATED!");
    });
    
    LOG.info("servlet thread freed");
    return output;
}

那么我将如何解决这个问题呢?我认为答案就在https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html之内,但是我对Spring的熟练程度还不足以真正理解Java Docs中的术语(Spring文档确实很好,但是JavaDocs是用密集的技术语言编写的,对此我深表感谢。但还不太了解)。

我的实施还有两个障碍:

  1. 我对弹簧的确切了解还不是100%。我还没有到达那一刻,我真的完全理解为什么所有这些bean都在漂浮。我认为这是为什么我不能在这里得到东西的关键... Redis的配置在Spring以太坊中浮动,我不了解如何称呼它。我真的需要继续对此进行调查(这对我来说是一个巨大的障碍)。
  2. 这些StudioLinking寿命很短,因此我也需要进行一些清理。一旦我将整个事情付诸实践,我将稍后实施,我确实知道将需要这样做。

解决方法

为什么不使用阻塞式轮询机制?无需使用spring-data-redis的高级工具。只需使用5秒钟的简单阻塞读取,那么此调用可能需要6秒钟左右的时间。您可以减少或增加阻止超时。

class LinkStatus {
    private final boolean updated;

    LinkStatus(boolean updated) {
      this.updated = updated;
    }
  }



// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
  // updateList is a unique token to track individual consumers in Redis
  @GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
  public LinkStatus subscribeToUpdates(
      @PathVariable("linkId") Integer linkId,@PathVariable("updatesId") Integer updatesId) {
    StreamOperations<String,String,String> op = redisTemplate.opsForStream();
    
    Consumer consumer = Consumer.from("test-group","test-consumer");
    // auto ack block stream read with size 1 with timeout of 5 seconds
    StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
    List<MapRecord<String,String>> records =
        op.read(consumer,readOptions,StreamOffset.latest("test-stream"));
    return new LinkStatus(!CollectionUtils.isEmpty(records));
  }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-