如何解决在Spring Boot App中使用Redis Stream通过HTTP长轮询来阻止HTTP响应
我有一个Spring Boot Web应用程序,该应用程序具有更新名为StudioLinking
的实体的功能。此实体描述了两个IoT设备之间的临时,可变,描述性逻辑链接,我的Web应用程序是它们的云服务。这些设备之间的链接本质上是短暂的,但是StudioLinking
实体保留在数据库中以用于报告。 StudioLinking
使用Spring Data / Hibernate以常规方式存储到基于SQL的数据存储中。该StudioLinking实体会不时地使用来自Rest API的新信息进行更新。链接更新后,设备需要响应(更改颜色,音量等)。现在,通过每5秒轮询一次来进行处理,但这会造成人类用户向系统中输入更新以及IoT设备实际更新时的延迟。可能只有一毫秒或最多5秒!显然,增加轮询的频率是不可持续的,并且在绝大多数情况下根本没有更新!
因此,我正在尝试使用HTTP Long Polling在同一应用程序上开发另一个Rest API,该API将在给定StudioLinking实体更新或超时后返回。侦听器不支持WebSocket或类似功能,使我无法进行Long Polling。长轮询可能会导致竞争状态,在这种情况下,您必须考虑以下可能性:对于连续消息,一条消息可能会在HTTP请求之间进入时“丢失”(在关闭和打开连接时,可能会出现新的“更新”如果我使用发布/订阅,则不会被“注意到”。
请注意,此“订阅更新” API仅应返回StudioLinking
的最新和最新版本,但仅当存在实际更新或自最后签到。 “订阅更新”客户端将首先发布API请求,以设置新的侦听会话并将其传递,以便服务器知道它们是谁。因为可能有多个设备需要监视对同一StudioLinking
实体的更新。我相信我可以通过在Redis XREAD中使用单独命名的使用者来完成此操作。 (请在以后的问题中记住这一点)
经过数小时的研究,我相信完成此操作的方法是使用redis流。
我在Spring Data Redis中找到了有关Redis流的以下两个链接:
https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/ https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281
我也已经阅读了有关长轮询的链接,这两个链接在长轮询期间都只有一个睡眠计时器,用于演示,但显然我想做些有用的事情。
https://www.baeldung.com/spring-deferred-result
这两个链接都非常有用。现在,我可以毫无疑问地确定如何将更新发布到Redis流-(这是未经测试的“伪代码”,但我预计实现此目标不会有任何问题)
// In my StudioLinking Entity
@PostUpdate
public void postToRedis() {
StudioLinking link = this;
ObjectRecord<String,StudioLinking> record = StreamRecords.newRecord()
.ofObject(link)
.withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
this.redisTemplate
.opsForStream()
.add(record)
.subscribe(System.out::println);
atomicInteger.incrementAndGet();
}
但是,在订阅上述流时,我会很沮丧:所以基本上我想在这里做-请原谅伪造的伪代码,这仅出于想法目的。我很清楚,该代码绝不表示该语言和框架的实际行为:)
// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId,@PathVariable("updatesId") Integer updatesId) {
LOG.info("Received async-deferredresult request");
DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);
deferredResult.onTimeout(() ->
deferredResult.setErrorResult(
ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
.body("IT WAS NOT UPDATED!")));
ForkJoinPool.commonPool().submit(() -> {
//----------------------------------------------
// Made up stuff... here is where I want to subscribe to a stream and block!
//----------------------------------------------
LOG.info("Processing in separate thread");
try {
// Subscribe to Redis Stream,get any updates that happened between long-polls
// then block until/if a new message comes over the stream
var subscription = listenerContainer.receiveAutoAck(
Consumer.from(studioLinkingID,updateList),StreamOffset.create(studioLinkingID,ReadOffset.lastConsumed()),streamListener);
listenerContainer.start();
} catch (InterruptedException e) {
}
output.setResult("IT WAS UPDATED!");
});
LOG.info("servlet thread freed");
return output;
}
那么我将如何解决这个问题呢?我认为答案就在https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html之内,但是我对Spring的熟练程度还不足以真正理解Java Docs中的术语(Spring文档确实很好,但是JavaDocs是用密集的技术语言编写的,对此我深表感谢。但还不太了解)。
我的实施还有两个障碍:
- 我对弹簧的确切了解还不是100%。我还没有到达那一刻,我真的完全理解为什么所有这些bean都在漂浮。我认为这是为什么我不能在这里得到东西的关键... Redis的配置在Spring以太坊中浮动,我不了解如何称呼它。我真的需要继续对此进行调查(这对我来说是一个巨大的障碍)。
- 这些
StudioLinking
寿命很短,因此我也需要进行一些清理。一旦我将整个事情付诸实践,我将稍后实施,我确实知道将需要这样做。
解决方法
为什么不使用阻塞式轮询机制?无需使用spring-data-redis的高级工具。只需使用5秒钟的简单阻塞读取,那么此调用可能需要6秒钟左右的时间。您可以减少或增加阻止超时。
class LinkStatus {
private final boolean updated;
LinkStatus(boolean updated) {
this.updated = updated;
}
}
// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public LinkStatus subscribeToUpdates(
@PathVariable("linkId") Integer linkId,@PathVariable("updatesId") Integer updatesId) {
StreamOperations<String,String,String> op = redisTemplate.opsForStream();
Consumer consumer = Consumer.from("test-group","test-consumer");
// auto ack block stream read with size 1 with timeout of 5 seconds
StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
List<MapRecord<String,String>> records =
op.read(consumer,readOptions,StreamOffset.latest("test-stream"));
return new LinkStatus(!CollectionUtils.isEmpty(records));
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。