如何解决如何在Akka Streams中的Akka客户端WebSocket中使用Flow.map
我有一个WebSocket服务器,我需要通过已建立的WebSocket连接向其发送和接收消息,因为Akka客户端WebSocket不能像Java中那样为我们提供常规的websocket.send()
功能
所以我找到了这个解决方案How to use Akka-HTTP client websocket send message
效果很好,但是我是Akka流中的初学者,我正在尝试实现以下方案
在我的案例中,WebSocket服务器在ws://0.0.0.0:8188上运行的示例
首先,我将向服务器发送一条消息以启动sessionID
request# 1
{
"janus" : "create","transaction" : "<random alphanumeric string>"
}
服务器将使用会话ID进行响应
response #1
{
"janus": "success","session_id": 2630959283560140,"transaction": "asqeasd4as3d4asdasddas","data": {
"id": 4574061985075210
}
}
然后基于ID 4574061985075210,我将发送另一条消息并接收更多信息
request # 02 {
"janus": "attach","session_id":${sessionId},"plugin":"janus.plugin.echotest","transaction":"asqeasd4as3d4asdasddas"
}
response # 02 {
}
----
到目前为止,我可以显示request #1
的响应,但我不知道如何使用从请求#1获得的sessionID发起请求#2并显示其响应
这是我的代码
def main(args: Array[String]): Unit = {
val url = "ws://0.0.0.0:8188"
val req = WebSocketRequest(url,Nil,Option("janus-protocol"))
implicit val materializer = ActorMaterializer()
import system.dispatcher
val webSocketFlow = Http().webSocketClientFlow(req)
val messageSource: Source[Message,ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10,OverflowStrategy.fail)
val messageSink: Sink[Message,NotUsed] =
Flow[Message]
.map{message =>
println(s"Received text message: [$message]")
val strJson = message.toString
val jsonResponse = strJson.parseJson
val jsonObj = jsonResponse.asJsObject
val janus = jsonObj.fields("janus").convertTo[String]
val data = jsonObj.fields("data").asJsObject
val sessionID = data.fields("id")
// i need to take this SessionId and send to the websocket established connection and receive its response
}
.to(Sink.ignore)
val ((ws,upgradeResponse),closed) =
messageSource
.viaMat(webSocketFlow)(Keep.both)
.toMat(messageSink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
val source =
"""{ "janus": "create","transaction":"d1403sa54a5s3d4as3das"}"""
val jsonAst = source.parseJson
ws ! TextMessage.Strict(jsonAst.toString())
}
任何帮助将不胜感激
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。