如何解决使用多处理程序在后台运行功能,直到其他功能停止运行
我有一些代码(我也在StackOverflow上找到了,并做了一些修改以满足我的需要),这些代码连续不断地从模数转换器读取数据流。我希望能够
- 通过调用函数轻松开始测量和绘图
- 连续记录所有传入值
- 通过调用另一个函数轻松停止测量
今天,我将以下代码重写为一个类,因为在主函数中更易于阅读,该函数还控制其他测量设备,然后尝试使用多处理使测量在后台运行,只是注意到多处理并没有不能使用实例方法。
当前的实现使用线程,但是我想使用多处理,因为代码的不同部分并行运行非常重要,因为代码的其他部分(例如泵)也将必须在其中执行代码。在代码持续运行的同时不断地后台运行。 我要实现的示例实现如下所示:
initialize_measurement_device1(params)
global recorded_data
start_measurement_device1(params) # continuously plot & record incoming data
# start,control,stop other devices such as
pump1 = Pump(params)
valve1 = Valve(params)
pump1.infuse(5,"mL")
valve1.open()
sleep(5)
valve1.close()
# finally stop the measurement
stop_measurement_device1()
我找到的代码(有效)如下:
# Analog data acquisition via National Instruments' cDAQ unit
# The following assumes:
# TODO:
# Implement median filter
# Imports
import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx import constants
# from nidaqmx import stream_readers # not needed in this script
# from nidaqmx import stream_writers # not needed in this script
import threading
import multiprocessing
import pickle
from datetime import datetime
import scipy.io
# Parameters
sampling_freq_in = 1000 # in Hz
buffer_in_size = 100
bufsize_callback = buffer_in_size
buffer_in_size_cfg = round(buffer_in_size * 1) # clock configuration
chans_in = 1 # set to number of active OPMs (x2 if By and Bz are used,but that is not recommended)
refresh_rate_plot = 10 # in Hz
crop = 10 # number of seconds to drop at acquisition start before saving
my_filename = 'test_3_opms' # with full path if target folder different from current folder (do not leave trailing /)
# Initialize data placeholders
buffer_in = np.zeros((chans_in,buffer_in_size))
data = np.zeros((chans_in,1)) # will contain a first column with zeros but that's fine
# Definitions of basic functions
def ask_user():
global running
input("Press ENTER/RETURN to stop acquisition and coil drivers.")
running = False
def stop():
global running
running = False
def cfg_read_task(acquisition): # uses above parameters
acquisition.ai_channels.add_ai_voltage_chan("Dev1/ai0") # has to match with chans_in
acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in,sample_mode=constants.AcquisitionType.CONTINUOUS,samps_per_chan=buffer_in_size_cfg)
def reading_task_callback(task_idx,event_type,num_samples,callback_data): # bufsize_callback is passed to num_samples
global data
global buffer_in
if running:
# It may be wiser to read slightly more than num_samples here,to make sure one does not miss any sample,# see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
buffer_in = np.zeros((chans_in,num_samples)) # double definition ???
stream_in.read_many_sample(buffer_in,timeout=constants.WAIT_INFINITELY)
data = np.append(data,buffer_in,axis=1) # appends buffered data to total variable data
return 0 # Absolutely needed for this callback to be well defined (see nidaqmx doc).
if __name__ == '__main__':
# Configure and setup the tasks
task_in = nidaqmx.Task()
cfg_read_task(task_in)
stream_in = AnalogMultiChannelReader(task_in.in_stream)
task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback,reading_task_callback)
# Start threading to prompt user to stop
thread_user = threading.Thread(target=ask_user)
thread_user.start()
# Main loop
running = True
time_start = datetime.now()
task_in.start()
# Plot a visual feedback for the user's mental health
f,ax1 = plt.subplots(1,1,sharex='all',sharey='none')
while running: # make this adapt to number of channels automatically
ax1.clear()
ax1.plot(data[0,-sampling_freq_in * 5:].T) # 5 seconds rolling window
# Label and axis formatting
ax1.set_xlabel('time [s]')
ax1.set_ylabel('voltage [V]')
xticks = np.arange(0,data[0,-sampling_freq_in * 5:].size,sampling_freq_in)
xticklabels = np.arange(0,xticks.size,1)
ax1.set_xticks(xticks)
ax1.set_xticklabels(xticklabels)
plt.pause(1/refresh_rate_plot) # required for dynamic plot to work (if too low,nulling performance bad)
# Close task to clear connection once done
task_in.close()
duration = datetime.now() - time_start
# Some messages at the end
num_samples_acquired = data[0,:].size
print("\n")
print("Acquisition ended.\n")
print("Acquisition duration: {}.".format(duration))
print("Acquired samples: {}.".format(num_samples_acquired - 1))
# Final plot of whole time course the acquisition
plt.close('all')
f_tot,sharey='none')
...
我将如何以可以在后台开始运行测量并保持运行直到决定终止的方式实施多处理?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。