使用多处理程序在后台运行功能,直到其他功能停止运行

如何解决使用多处理程序在后台运行功能,直到其他功能停止运行

我有一些代码(我也在StackOverflow上找到了,并做了一些修改以满足我的需要),这些代码连续不断地从模数转换器读取数据流。我希望能够

  • 通过调用函数轻松开始测量和绘图
  • 连续记录所有传入值
  • 通过调用另一个函数轻松停止测量

今天,我将以下代码重写为一个类,因为在主函数中更易于阅读,该函数还控制其他测量设备,然后尝试使用多处理使测量在后台运行,只是注意到多处理并没有不能使用实例方法。

当前的实现使用线程,但是我想使用多处理,因为代码的不同部分并行运行非常重要,因为代码的其他部分(例如泵)也将必须在其中执行代码。在代码持续运行的同时不断地后台运行。 我要实现的示例实现如下所示:

initialize_measurement_device1(params)
global recorded_data
start_measurement_device1(params) # continuously plot & record incoming data
# start,control,stop other devices such as
pump1 = Pump(params)
valve1 = Valve(params)
pump1.infuse(5,"mL")
valve1.open()
sleep(5)
valve1.close()
# finally stop the measurement
stop_measurement_device1()

我找到的代码(有效)如下:

# Analog data acquisition via National Instruments' cDAQ unit
# The following assumes:

# TODO:
# Implement median filter

# Imports
import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx import constants
# from nidaqmx import stream_readers  # not needed in this script
# from nidaqmx import stream_writers  # not needed in this script

import threading
import multiprocessing
import pickle
from datetime import datetime
import scipy.io

# Parameters
sampling_freq_in = 1000  # in Hz
buffer_in_size = 100
bufsize_callback = buffer_in_size
buffer_in_size_cfg = round(buffer_in_size * 1)  # clock configuration
chans_in = 1  # set to number of active OPMs (x2 if By and Bz are used,but that is not recommended)
refresh_rate_plot = 10  # in Hz
crop = 10  # number of seconds to drop at acquisition start before saving
my_filename = 'test_3_opms'  # with full path if target folder different from current folder (do not leave trailing /)

# Initialize data placeholders
buffer_in = np.zeros((chans_in,buffer_in_size))
data = np.zeros((chans_in,1))  # will contain a first column with zeros but that's fine

# Definitions of basic functions
def ask_user():
    global running
    input("Press ENTER/RETURN to stop acquisition and coil drivers.")
    running = False

def stop():
    global running
    running = False

def cfg_read_task(acquisition):  # uses above parameters
    acquisition.ai_channels.add_ai_voltage_chan("Dev1/ai0")  # has to match with chans_in
    acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in,sample_mode=constants.AcquisitionType.CONTINUOUS,samps_per_chan=buffer_in_size_cfg)

def reading_task_callback(task_idx,event_type,num_samples,callback_data):  # bufsize_callback is passed to num_samples
    global data
    global buffer_in

    if running:
        # It may be wiser to read slightly more than num_samples here,to make sure one does not miss any sample,# see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
        buffer_in = np.zeros((chans_in,num_samples))  # double definition ???
        stream_in.read_many_sample(buffer_in,timeout=constants.WAIT_INFINITELY)

        data = np.append(data,buffer_in,axis=1)  # appends buffered data to total variable data

    return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

if __name__ == '__main__':
    # Configure and setup the tasks
    task_in = nidaqmx.Task()
    cfg_read_task(task_in)
    stream_in = AnalogMultiChannelReader(task_in.in_stream)
    task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback,reading_task_callback)

    # Start threading to prompt user to stop
    thread_user = threading.Thread(target=ask_user)
    thread_user.start()

    # Main loop
    running = True
    time_start = datetime.now()
    task_in.start()

    # Plot a visual feedback for the user's mental health
    f,ax1 = plt.subplots(1,1,sharex='all',sharey='none')
    while running:  # make this adapt to number of channels automatically
        ax1.clear()
        ax1.plot(data[0,-sampling_freq_in * 5:].T)  # 5 seconds rolling window
        # Label and axis formatting
        ax1.set_xlabel('time [s]')
        ax1.set_ylabel('voltage [V]')
        xticks = np.arange(0,data[0,-sampling_freq_in * 5:].size,sampling_freq_in)
        xticklabels = np.arange(0,xticks.size,1)
        ax1.set_xticks(xticks)
        ax1.set_xticklabels(xticklabels)

        plt.pause(1/refresh_rate_plot)  # required for dynamic plot to work (if too low,nulling performance bad)


    # Close task to clear connection once done
    task_in.close()
    duration = datetime.now() - time_start

    # Some messages at the end
    num_samples_acquired = data[0,:].size
    print("\n")
    print("Acquisition ended.\n")
    print("Acquisition duration: {}.".format(duration))
    print("Acquired samples: {}.".format(num_samples_acquired - 1))


    # Final plot of whole time course the acquisition
    plt.close('all')
    f_tot,sharey='none')
    ...

我将如何以可以在后台开始运行测量并保持运行直到决定终止的方式实施多处理?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-