如何解决如何对来自WebSocket网址的视频流进行对象检测
我从我制作的源中获取一个流,以便可以在特定的websocket URL上访问它(不确定是否理想,也将欣赏任何其他体系结构)。我现在需要在此视频流上执行对象检测,并考虑到将通过客户端websocket库(例如通过flask或fastapi制成的服务器中的websocket)连接到websocket URL的体系结构,然后再次流检测到的对象通过另一个websocket URL将视频发送给多个客户端。
问题是,即使连接到websocket URL,我也无法接收任何图像,并且不确定如何在服务器方案中处理run_until_complete行。
任何建议或帮助将不胜感激
服务器代码
import uvicorn
from fastapi import FastAPI,WebSocket
# import websockets
# import asyncio
# init app
app = FastAPI()
async def clientwebsocketconnection():
uri = "wss://<websocket URL here>"
async with websockets.connect(uri) as websocket:
print("reached here")
data = await websocket.recv()
print(f"< {data}")
# Routes
@app.get('/')
async def index():
return {"text": "Its working"}
@app.websocket('/websocketconnection') # FIXME: change this to a websocket endpoint
async def websocketconnection():
return {"text": "Its working"}
if __name__ == '__main__':
uvicorn.run(app,host="127.0.0.1",port=8000)
# asyncio.get_event_loop().run_until_complete(clientwebsocketconnection())
解决方法
我假设您想通过 websocket 发送文本。为此,您需要以下代码:
@app.websocket('/websocketconnection')
async def websocketconnection(websocket: WebSocket) -> None:
await websocket.accept()
await websocket.send_text("It's working!")
await websocket.close()
您可以在 the official FastAPI docs 中找到更多示例。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。