如何解决如何在Akka客户端WebSocket中使用actor
我正在使用akka客户端网络套接字https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html
我有一个接受请求并以json响应的服务器 这是我的请求和回复的模式
request #1
{
"janus" : "create","transaction" : "<random alphanumeric string>"
}
response #1
{
"janus": "success","session_id": 2630959283560140,"transaction": "asqeasd4as3d4asdasddas","data": {
"id": 4574061985075210
}
}
然后基于response #1
,我需要启动request #2
,并且在收到response #2
之后,我需要启动request #3
,依此类推
例如
然后根据ID 4574061985075210
,我将发送请求#2并收到响应
request # 2 {
}
response # 2 {
}
----
我如何使用带有源和接收器的actor并重新使用流程 这是我的初始代码
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val incoming: Sink[Message,Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
//suppose here based on the server response i need to send another message to the server and so on do i need to repeat this same code here again ?????
}
val outgoing = Source.single(TextMessage("hello world!"))
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
val (upgradeResponse,closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
在这里我用了Source.ActorRef
val url = "ws://0.0.0.0:8188"
val req = WebSocketRequest(url,Nil,Option("janus-protocol"))
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val webSocketFlow = Http().webSocketClientFlow(req)
val messageSource: Source[Message,ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10,OverflowStrategy.fail)
val messageSink: Sink[Message,NotUsed] =
Flow[Message]
.map(message => println(s"Received text message: [$message]"))
.to(Sink.ignore)
val ((ws,upgradeResponse),closed) =
messageSource
.viaMat(webSocketFlow)(Keep.both)
.toMat(messageSink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
val source =
"""{ "janus": "create","transaction":"d1403sa54a5s3d4as3das"}"""
val jsonAst = source.parseJson
ws ! TextMessage.Strict(jsonAst.toString())
现在我需要帮助
我如何在这里发起第二个请求,因为我需要从服务器返回的"id"
来发起请求#2
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。