如何解决多线程使我收到“ ValueError:对已关闭文件的I / O操作”错误为什么?
我正在使用WTForms编写Flask Web应用程序。在其中一种形式中,用户应上传一个csv文件,服务器将分析收到的数据。这是我正在使用的代码。
filename = token_hex(8) + '.csv' # Generate a new filename
form.dataset.data.save('myapp/datasets/' + filename) # Save the received file
dataset = genfromtxt('myapp/datasets/' + filename,delimiter=',') # Open the newly generated file
# analyze 'dataset'
只要我在单线程应用程序中使用此代码,一切就可以正常工作。我尝试在代码中添加线程。这是线程调用的过程(与函数内部相同的代码):
def execute_analysis(form):
filename = token_hex(8) + '.csv' # Generate a new filename
form.dataset.data.save('myapp/datasets/' + filename) # Save the received file
dataset = genfromtxt('myapp/datasets/' + filename,') # Open the newly generated file
# analyze 'dataset'
这是我如何调用线程
import threading
@posts.route("/estimation",methods=['GET','POST'])
@login_required
def estimate_parameters():
form = EstimateForm()
if form.validate_on_submit():
threading.Thread(target=execute_analysis,args=[form]).start()
flash("Your request has been received. Please check the site in again in a few minutes.",category='success')
# return render_template('posts/post.html',title=post.id,post=post)
return render_template('estimations/estimator.html',title='New Analysis',form=form,legend='New Analysis')
但是现在我收到以下错误:
解决方法
我对框架的了解不足以确切说明发生了什么,但是我可以告诉您如何解决它。
只要您拥有由多个线程共享的资源,就使用锁。
Array
(
[0] => WC_Meta_Data Object
(
[current_data:protected] => Array
(
[id] => 646
[key] => yith_booking_data
[value] => Array
(
[from] => 1603152000
[to] => 1603411200
[duration] => 3
[person_types] => Array
(
)
[booking_services] => Array
(
)
[booking_service_quantities] => Array
(
)
[_added-to-cart-timestamp] => 1602161920
)
)
[data:protected] => Array
(
[id] => 646
[key] => yith_booking_data
[value] => Array
(
[from] => 1603152000
[to] => 1603411200
[duration] => 3
[person_types] => Array
(
)
[booking_services] => Array
(
)
[booking_service_quantities] => Array
(
)
[_added-to-cart-timestamp] => 1602161920
)
)
)
[1] => WC_Meta_Data Object
(
[current_data:protected] => Array
(
[id] => 647
[key] => _booking_id
[value] => 882
)
[data:protected] => Array
(
[id] => 647
[key] => _booking_id
[value] => 882
)
)
)
Array
(
[0] => WC_Meta_Data Object
(
[current_data:protected] => Array
(
[id] => 451
[key] => yith_booking_data
[value] => Array
(
[from] => 1600905600
[to] => 1600992000
[duration] => 1
[person_types] => Array
(
)
[booking_services] => Array
(
)
[booking_service_quantities] => Array
(
)
[_added-to-cart-timestamp] => 1600955166
)
)
[data:protected] => Array
(
[id] => 451
[key] => yith_booking_data
[value] => Array
(
[from] => 1600905600
[to] => 1600992000
[duration] => 1
[person_types] => Array
(
)
[booking_services] => Array
(
)
[booking_service_quantities] => Array
(
)
[_added-to-cart-timestamp] => 1600955166
)
)
)
[1] => WC_Meta_Data Object
(
[current_data:protected] => Array
(
[id] => 452
[key] => _booking_id
[value] => 709
)
[data:protected] => Array
(
[id] => 452
[key] => _booking_id
[value] => 709
)
)
)
Documentation on threading.Lock
:
实现原始锁对象的类。线程获得锁后,随后的尝试将其阻塞,直到释放为止
基本上,在线程1调用from threading import Lock
LOCK = Lock()
def process():
LOCK.acquire()
... # open a file,write some data to it etc.
LOCK.release()
# alternatively,use the context manager syntax
with LOCK:
...
threading.Thread(target=process).start()
threading.Thread(target=process).start()
之后,随后的调用例如来自其他线程的操作,将导致这些线程冻结并等待直到有人调用LOCK.acquire()
(通常是线程1,在完成与资源的业务之后)。
如果文件名是随机生成的,那么我不会期望1个线程关闭另一个文件的问题,除非两个文件碰巧都生成了相同的名称。但是也许您可以通过一些实验来弄清楚,例如首先尝试锁定对LOCK.release()
和save
的调用,然后检查是否有帮助。添加一些genfromtxt
语句(或者甚至更好,使用print
)也可能是有意义的,例如检查文件名是否不冲突。
没有进一步的上下文很难说,但我怀疑您可能是从函数返回或退出上下文管理器,这导致某些文件描述符关闭,从而导致save(..)
调用失败ValueError
。
如果是这样,一种直接的解决方法是在返回/关闭文件之前等待线程完成。类似于:
def handle_request(form):
...
analyzer_thread = threading.Thread(target=execute_analysis,args=[form])
analyzer_thread.start()
...
analyzer_thread.join() # wait for completion of execute_analysis
cleanup_context(form)
return
以下是我正在描述的问题的可复制的最小示例:
import threading
SEM = threading.Semaphore(0)
def run(fd):
SEM.acquire() # wait till release
fd.write("This will fail :(")
fd = open("test.txt","w+")
other_thread = threading.Thread(target=run,args=[fd])
other_thread.start()
fd.close()
SEM.release() # release the semaphore,so other_thread will acquire & proceed
other_thread.join()
请注意,根据您的情况,主线程将关闭文件,而另一个线程将在用write
进行ValueError: I/O operation on closed file.
调用时失败。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。