如何解决Akka流Http singleRequest在等待响应时会阻止整个流
我正在尝试将Akka Http集成到我的Akka流中,但是在极少数情况下,该流会卡住。
implicit val system: ActorSystem = ActorSystem("actor-system")
Source(0 to 10)
.mapAsync(10)(i ⇒ {
val url =
if (i == 1) "http://run.mocky.io/v3/40ff086f-1389-4ca5-ace8-1f0b3ac75582?mocky-delay=10s"
else "http://google.com"
Http().singleRequest(HttpRequest(uri = url))
})
.runForeach(r ⇒ println(s"${System.currentTimeMillis()}: ${r._1}"))
此代码将在第一个输出1599480226827: 301 Moved Permanently
之后停留10秒钟,然后在几乎相同的时间刷新其余代码。
输出将是:
1599480226827: 301 Moved Permanently
1599480236826: 302 Found
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
我希望它能按顺序输出所有内容,但延迟的内容除外。
为什么我的信息流被此类请求阻止?以及如何避免呢?
解决方法
来自mapAsync
的scaladocs
将并行运行的期货数量作为mapAsync的第一个参数给出。这些期货可以以任何顺序完成,但向下游发出的元素与从上游接收的元素顺序相同。
您的请求是并行发送的,但是runForeach
下的函数是按特定顺序调用的,这会导致输出结果的延迟。它正在等待第二个响应可用。
您可以使用mapAsyncUnordered
尽快处理响应。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。