如何将视频流从一个 python 传递到另一个?

如何解决如何将视频流从一个 python 传递到另一个?

在我之前的帖子中,我们找到了一种将图像文件从一个 python 传递到另一个 python 的方法: pass video data from one python script to another

我现在正在尝试传递视频(连续图像):

写.py

import sys
import numpy as np
import cv2
from PIL import Image
import io
import time

while True:
    img = cv2.imread('cat.jpg')
    bimg = cv2.imencode('.jpg',img)[1]
    sys.stdout.buffer.write(bimg)
    sys.stdout.flush()
    time.sleep(1)

read.py:

import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
    
while True:
    data = sys.stdin.buffer.read()
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(),np.uint8),cv2.IMREAD_UNCHANGED)
    cv2.imshow('image',img_np)
    cv2.waitKey(0)

如果我将 write.py 数据输出到终端,它会打印出来。如果我手动将数据交给 read.py 读取。但是把它们放在一起(python3 write.py | python3 read.py),它就挂了。 write.py 只写入一次,而 read.py 似乎永远不会得到它。

我的猜测是,读代码正在等待写代码“结束”,然后才将数据包打包并称之为图像。虽然如果是这样的话,我认为进行冲洗会解决它。

解决方法

我想我明白了。在 read.py 中,sys.stdin.buffer.read() 会读取并等待 stdin 管道关闭,但由于 write.py 循环,stdout 从未真正关闭其 while True。这个概念验证简化示例有效:

write.py

import sys
import time

sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

read.py

import sys

with open("output.txt","w") as file:
    file.write(sys.stdin.read())

这也将挂起并且永远不会真正向 "output.txt" 写入任何内容。如果我们从 while True 中移除 write.py 循环,代码将不再挂起,"Hello World" 将被写入 "output.py" 因为当 write.py 完成写入它会关闭它的过程,这将关闭管道。要解决此问题,我建议将 read.py 更改为如下所示:

import sys

while True:
    with open("output.txt","a") as file:
        file.write(sys.stdin.read(1))

解决方案:

write.py

import sys
import time

MAX_FILE_SIZE = 16 # bytes

msg = b"Hello world"

# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())

sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()

# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

read.py

import sys
import time

MAX_FILE_SIZE = 16 # bytes

while True:
    time.sleep(1) # Make sure `write.py` has sent the data
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
    time.sleep(1) # Make sure `write.py` has sent the data

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    with open("output.txt","wb") as file:
        file.write(sys.stdin.buffer.read(length))

    file_ended = sys.stdin.buffer.read(1)
    if file_ended == b"1":
        # File has ended
        break
    else:
        # We are going to start reading again for the next file:
        pass

编辑: 解决方法如下:

  1. 发送文件大小
  2. 发送实际文件数据
  3. 发送一个字节,告诉 read.py 是否应该等待另一个文件

对于第 1 部分,我们只是将文件的长度编码为一个在前面填充 0 的字符串。注意:确保 MAX_FILE_SIZE 大于最大文件的大小(大数字会稍微降低性能)。对于第 3 部分,如果我们发送 "1",则意味着没有更多文件要发送。否则 reader.py 将等待并接受下一个文件。所以 write.py 将变成:

from math import log
import time
import sys
import cv2


MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE,2)+1)


def write_file(buffer,data,last_file=False):
   # Tell `reader.py` that it needs to read x number of bytes.
   length = len(data)
   # We also need to tell `read.py` how many bytes it needs to read.
   # This means that we have reached the same problem as before.
   # To fix that issue we are always going to send the number of bytes but
   # We are going to pad it with `0`s at the start.
   # https://stackoverflow.com/a/339013/11106801
   length = str(length).zfill(MAX_FILE_SIZE)
   with open("output.txt","w") as file:
      file.write(length)
   buffer.write(length.encode())

   # Write the actual data
   buffer.write(data)

   # We also need to tell `read.py` that it was the last file that we send
   # Sending `1` means that the file has ended
   buffer.write(str(int(last_file)).encode())
   buffer.flush()


while True:
    img = cv2.imread("img.jpg")
    bimg = cv2.imencode(".jpg",img)[1]
    # Call write_data
    write_file(sys.stdout.buffer,bimg,last_file=False)
    # time.sleep(1) # Don't need this

read.py 将变成:

from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys


MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE,2)+1)


def read(buffer,number_of_bytes):
    output = b""
    while len(output) < number_of_bytes:
        output += buffer.read(number_of_bytes - len(output))
    assert len(output) == number_of_bytes,"An error occured."
    return output


def read_file(buffer):
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(read(buffer,MAX_FILE_SIZE))

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    data = read(buffer,length)

    # Read a byte so that we know if it is the last file
    file_ended = read(buffer,1)

    return data,(file_ended == b"1")


while True:
    print("Reading file")
    data,last_file = read_file(sys.stdin.buffer)
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(),np.uint8),cv2.IMREAD_UNCHANGED)
    cv2.imshow("image",img_np)
    cv2.waitKey(0)

    if last_file:
        break;
,

两种解决方案:ZeroMQ |磁盘缓存

使用 ZeroMQ 将帧从一个 Python 文件发送到另一个文件非常容易。

零MQ


通过 PyPI 安装:pip install -U pyzmq。有多种发送帧的方式。 这是使用 PUBLISHER 和 SUBSCRIBER

的示例
# writer | publisher
import base64
import time
import zmq
import cv2


# Prepare our context and publisher
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")

CAM_INDEX_OR_URI = 0
capture = cv2.VideoCapture(CAM_INDEX_OR_URI)
assert capture.isOpened(),"Cannot open camera"

while True:
    # Write two messages,each with an envelope and content

    # capture frame-by-frame
    ret,frame = capture.read()
    if not ret:
        print("[+] No frame received. Stream ended.")
        break

    # resize the frame
    frame = cv2.resize(frame,(640,480))
    encoded,buffer = cv2.imencode(".jpg",frame)

    #  all is good
    # cv2.imshow("Frames",frame)

    # stop with Esc key (27)
    if cv2.waitKey(1) == 27:
        break

    sent_frame = base64.b64encode(buffer)
    publisher.send_multipart([b"camera_A",sent_frame])

    time.sleep(0.01)
  

# We never get here but clean up anyhow
publisher.close()
context.term()

capture.release()
cv2.destroyAllWindows()
# reader.py | subscriber

import numpy as np
import base64
import zmq
import cv2

# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt_string(zmq.SUBSCRIBE,"camera_A")

while True:
    # Read envelope with address
    [address,contents] = subscriber.recv_multipart()

    receive_frame = base64.b64decode(contents)
    frame = np.frombuffer(receive_frame,dtype=np.uint8)
    frame = cv2.imdecode(frame,1)

 
    cv2.namedWindow("Frames",cv2.WINDOW_NORMAL)
    cv2.imshow("Frames",frame)
   

    # stop with Esc key (27) 
    if cv2.waitKey(1) == 27:
        break

subscriber.close()
context.term()
cv2.destroyAllWindows()

磁盘缓存


您也可以考虑使用 diskcache。它允许通过内存传递 python 对象。它就像Redis,但都是Python,不需要服务器。注意:pip install --upgrade diskcache。您可以调整以开始从相机发送实时帧 |视频

# writer.py
import time
from pathlib import Path
import diskcache as dc
import cv2

tmp = Path("/tmp/stream")

with dc.Cache(tmp) as cache:
    print(f"[+] Ready to push data to {tmp}.")
    while True:
        img = cv2.imread("cat.jpg")
        cache.push(img,expire=5)
        time.sleep(10)


# reader.py

import time
from pathlib import Path
import diskcache as dc
import cv2

tmp = Path("/tmp/stream")

with dc.Cache(tmp) as cache:
    print(f"[+] Ready to pull data from {tmp}")
    while True:
        (key,value),_ = cache.pull(expire_time=True)
        if key:
            cv2.imshow("cat",value)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        time.sleep(0.1)

我会朝这些方向前进,而不是 sys,因为您可以完全控制流数据。见diskcache Documentation

,

您已经提到要发送的图像大小不一致,但我必须假设它是否来自同一台摄像机(对于给定的视频流),原始图像大小不会改变,而只是压缩图像大小.我想您可能有足够的 RAM 一次在内存中存储至少一帧未压缩的帧,而您只是在所有压缩和解压缩过程中引入了处理开销。

鉴于我将使用 multiprocessing.shared_memory 创建一个共享缓冲区,它可以在两个进程之间共享帧(如果你想要真正的幻想,甚至可以创建几个帧的循环缓冲区,并防止屏幕撕裂,但是在我的测试中这不是一个大问题)

鉴于 cv2.VideoCapture().read() 可以直接读入现有数组,并且您可以创建一个使用共享内存作为缓冲区的 numpy 数组,您可以在零额外复制的情况下将数据读入共享内存。使用它,我能够从以 1280x688 分辨率使用 H.264 编码的视频文件每秒读取近 700 帧。

from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np

vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie

#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success,frame = cap.read()
if not success:
    raise Exception("error reading from video")

#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape",create=True,size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3,buffer=frame_shape_shm.buf,dtype='i4')  #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer",size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape,buffer=frame_buffer_shm.buf,dtype=frame.dtype)

input("writer is ready: press enter once reader is ready")

try: #use keyboardinterrupt to quit
    while True:
        cap.read(frame_buffer) #read data into frame buffer
        # sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT,close this one first so the reader doesn't unlink() the 
#  shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()

读取器过程看起来非常相似,但我们没有创建视频设备和read帧,而是构建共享数组和imshow一堆。 GUI 没有转储数据那么快,所以我们没有达到 700 fps,但高达 500 帧也不错......

from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np

#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3],dtype='i4')

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")

#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape,dtype='u1')
try:
    while True:
        cv2.imshow('frame',frame_buffer)
        cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT the writer process should close before this one,so nothing 
#  tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()

编辑: 用户的其他问题表明可能需要使用低于 3.8 的 Python 版本(甚至可以跨版本工作),因此这里有一个就地使用 posix_ipc 的示例multiprocessing.shared_memory 来创建帧缓冲区(以及如何清理它):

#creation
shm = posix_ipc.SharedMemory(name="frame_buf",flags=posix_ipc.O_CREX,#if this fails,cleanup didn't happen properly last time
                             size=frame.nbytes)
shm_map = mmap.mmap(shm.fd,shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape,buffer=buf,dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer

#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
,

什么与使用 ROS 发布者和订阅者有关。实现起来很简单,也很容易理解。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-