如何解决如何将视频流从一个 python 传递到另一个?
在我之前的帖子中,我们找到了一种将图像文件从一个 python 传递到另一个 python 的方法: pass video data from one python script to another
我现在正在尝试传递视频(连续图像):
写.py
import sys
import numpy as np
import cv2
from PIL import Image
import io
import time
while True:
img = cv2.imread('cat.jpg')
bimg = cv2.imencode('.jpg',img)[1]
sys.stdout.buffer.write(bimg)
sys.stdout.flush()
time.sleep(1)
read.py:
import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
while True:
data = sys.stdin.buffer.read()
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(),np.uint8),cv2.IMREAD_UNCHANGED)
cv2.imshow('image',img_np)
cv2.waitKey(0)
如果我将 write.py 数据输出到终端,它会打印出来。如果我手动将数据交给 read.py 读取。但是把它们放在一起(python3 write.py | python3 read.py
),它就挂了。 write.py 只写入一次,而 read.py 似乎永远不会得到它。
我的猜测是,读代码正在等待写代码“结束”,然后才将数据包打包并称之为图像。虽然如果是这样的话,我认为进行冲洗会解决它。
解决方法
我想我明白了。在 read.py
中,sys.stdin.buffer.read()
会读取并等待 stdin
管道关闭,但由于 write.py
循环,stdout
从未真正关闭其 while True
。这个概念验证简化示例有效:
write.py
import sys
import time
sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
和read.py
import sys
with open("output.txt","w") as file:
file.write(sys.stdin.read())
这也将挂起并且永远不会真正向 "output.txt"
写入任何内容。如果我们从 while True
中移除 write.py
循环,代码将不再挂起,"Hello World"
将被写入 "output.py"
因为当 write.py
完成写入它会关闭它的过程,这将关闭管道。要解决此问题,我建议将 read.py
更改为如下所示:
import sys
while True:
with open("output.txt","a") as file:
file.write(sys.stdin.read(1))
解决方案:
write.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
msg = b"Hello world"
# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())
sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
和read.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
while True:
time.sleep(1) # Make sure `write.py` has sent the data
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
time.sleep(1) # Make sure `write.py` has sent the data
# Here you can switch to a different file every time `writer.py`
# Sends a new file
with open("output.txt","wb") as file:
file.write(sys.stdin.buffer.read(length))
file_ended = sys.stdin.buffer.read(1)
if file_ended == b"1":
# File has ended
break
else:
# We are going to start reading again for the next file:
pass
编辑: 解决方法如下:
- 发送文件大小
- 发送实际文件数据
- 发送一个字节,告诉
read.py
是否应该等待另一个文件
对于第 1 部分,我们只是将文件的长度编码为一个在前面填充 0 的字符串。注意:确保 MAX_FILE_SIZE
大于最大文件的大小(大数字会稍微降低性能)。对于第 3 部分,如果我们发送 "1"
,则意味着没有更多文件要发送。否则 reader.py
将等待并接受下一个文件。所以 write.py
将变成:
from math import log
import time
import sys
import cv2
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE,2)+1)
def write_file(buffer,data,last_file=False):
# Tell `reader.py` that it needs to read x number of bytes.
length = len(data)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
with open("output.txt","w") as file:
file.write(length)
buffer.write(length.encode())
# Write the actual data
buffer.write(data)
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
buffer.write(str(int(last_file)).encode())
buffer.flush()
while True:
img = cv2.imread("img.jpg")
bimg = cv2.imencode(".jpg",img)[1]
# Call write_data
write_file(sys.stdout.buffer,bimg,last_file=False)
# time.sleep(1) # Don't need this
和 read.py
将变成:
from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE,2)+1)
def read(buffer,number_of_bytes):
output = b""
while len(output) < number_of_bytes:
output += buffer.read(number_of_bytes - len(output))
assert len(output) == number_of_bytes,"An error occured."
return output
def read_file(buffer):
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(read(buffer,MAX_FILE_SIZE))
# Here you can switch to a different file every time `writer.py`
# Sends a new file
data = read(buffer,length)
# Read a byte so that we know if it is the last file
file_ended = read(buffer,1)
return data,(file_ended == b"1")
while True:
print("Reading file")
data,last_file = read_file(sys.stdin.buffer)
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(),np.uint8),cv2.IMREAD_UNCHANGED)
cv2.imshow("image",img_np)
cv2.waitKey(0)
if last_file:
break;
,
两种解决方案:ZeroMQ |磁盘缓存
使用 ZeroMQ 将帧从一个 Python 文件发送到另一个文件非常容易。
零MQ
通过 PyPI 安装:pip install -U pyzmq
。有多种发送帧的方式。
这是使用 PUBLISHER 和 SUBSCRIBER
# writer | publisher
import base64
import time
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")
CAM_INDEX_OR_URI = 0
capture = cv2.VideoCapture(CAM_INDEX_OR_URI)
assert capture.isOpened(),"Cannot open camera"
while True:
# Write two messages,each with an envelope and content
# capture frame-by-frame
ret,frame = capture.read()
if not ret:
print("[+] No frame received. Stream ended.")
break
# resize the frame
frame = cv2.resize(frame,(640,480))
encoded,buffer = cv2.imencode(".jpg",frame)
# all is good
# cv2.imshow("Frames",frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
sent_frame = base64.b64encode(buffer)
publisher.send_multipart([b"camera_A",sent_frame])
time.sleep(0.01)
# We never get here but clean up anyhow
publisher.close()
context.term()
capture.release()
cv2.destroyAllWindows()
# reader.py | subscriber
import numpy as np
import base64
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt_string(zmq.SUBSCRIBE,"camera_A")
while True:
# Read envelope with address
[address,contents] = subscriber.recv_multipart()
receive_frame = base64.b64decode(contents)
frame = np.frombuffer(receive_frame,dtype=np.uint8)
frame = cv2.imdecode(frame,1)
cv2.namedWindow("Frames",cv2.WINDOW_NORMAL)
cv2.imshow("Frames",frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
subscriber.close()
context.term()
cv2.destroyAllWindows()
磁盘缓存
您也可以考虑使用 diskcache
。它允许通过内存传递 python 对象。它就像Redis,但都是Python,不需要服务器。注意:pip install --upgrade diskcache
。您可以调整以开始从相机发送实时帧 |视频
# writer.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to push data to {tmp}.")
while True:
img = cv2.imread("cat.jpg")
cache.push(img,expire=5)
time.sleep(10)
# reader.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to pull data from {tmp}")
while True:
(key,value),_ = cache.pull(expire_time=True)
if key:
cv2.imshow("cat",value)
cv2.waitKey(0)
cv2.destroyAllWindows()
time.sleep(0.1)
我会朝这些方向前进,而不是 sys
,因为您可以完全控制流数据。见diskcache Documentation
您已经提到要发送的图像大小不一致,但我必须假设它是否来自同一台摄像机(对于给定的视频流),原始图像大小不会改变,而只是压缩图像大小.我想您可能有足够的 RAM 一次在内存中存储至少一帧未压缩的帧,而您只是在所有压缩和解压缩过程中引入了处理开销。
鉴于我将使用 multiprocessing.shared_memory
创建一个共享缓冲区,它可以在两个进程之间共享帧(如果你想要真正的幻想,甚至可以创建几个帧的循环缓冲区,并防止屏幕撕裂,但是在我的测试中这不是一个大问题)
鉴于 cv2.VideoCapture().read()
可以直接读入现有数组,并且您可以创建一个使用共享内存作为缓冲区的 numpy 数组,您可以在零额外复制的情况下将数据读入共享内存。使用它,我能够从以 1280x688 分辨率使用 H.264 编码的视频文件每秒读取近 700 帧。
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np
vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success,frame = cap.read()
if not success:
raise Exception("error reading from video")
#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape",create=True,size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3,buffer=frame_shape_shm.buf,dtype='i4') #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer",size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape,buffer=frame_buffer_shm.buf,dtype=frame.dtype)
input("writer is ready: press enter once reader is ready")
try: #use keyboardinterrupt to quit
while True:
cap.read(frame_buffer) #read data into frame buffer
# sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT,close this one first so the reader doesn't unlink() the
# shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()
读取器过程看起来非常相似,但我们没有创建视频设备和read
帧,而是构建共享数组和imshow
一堆。 GUI 没有转储数据那么快,所以我们没有达到 700 fps,但高达 500 帧也不错......
from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np
#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3],dtype='i4')
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")
#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape,dtype='u1')
try:
while True:
cv2.imshow('frame',frame_buffer)
cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT the writer process should close before this one,so nothing
# tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()
编辑: 用户的其他问题表明可能需要使用低于 3.8 的 Python 版本(甚至可以跨版本工作),因此这里有一个就地使用 posix_ipc
的示例multiprocessing.shared_memory
来创建帧缓冲区(以及如何清理它):
#creation
shm = posix_ipc.SharedMemory(name="frame_buf",flags=posix_ipc.O_CREX,#if this fails,cleanup didn't happen properly last time
size=frame.nbytes)
shm_map = mmap.mmap(shm.fd,shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape,buffer=buf,dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer
#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
,
什么与使用 ROS 发布者和订阅者有关。实现起来很简单,也很容易理解。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。