如何解决如何使用多处理或多线程加速此循环?
恐怕我没有正确地做多线程操作,所以我来这里是为了寻求智慧。我有两个地址数组,我必须检查第一个数组的地址是否存在于第二个数组中,以防它不寻找数组2中最相似的地址。
具有“官方”地址的数组称为directory
,而我需要验证的数组称为look_address
。
代码如下:
import pandas as pd
import numpy as np
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
from datetime import datetime,timedelta
import threading
import queue
class myThread(threading.Thread):
def __init__(self,threadID,name,q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name=name
self.q = q
def run(self):
print(f"starting {self.name}")
process_data(self.name,self.q)
print(f"ending {self.name}")
locs = []
ratios={}
def process_data(threadName,q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
d = q.get()
queueLock.release()
d = d.strip()
if directory.isin([d]).any():
locs.append(d)
else:
pos = process.extract(d,directory.values,scorer=fuzz.ratio,limit=50)
ratios[d] = pos
else:
queueLock.release()
threadlist = ["T-1","T-2","T-3","T-4","T-5","T-6","T-7","T-8","T-9","T-10"]
nameList = look_address
queueLock = threading.Lock()
workQueue = queue.Queue(len(nameList)+1)
threads=[]
threadID=1
exitFlag=0
for name in threadlist:
thread = myThread(threadID,workQueue)
thread.start()
threads.append(thread)
threadID+=1
queueLock.acquire()
for addr in nameList:
workQueue.put(addr)
queueLock.release()
total_steps = len(workQueue.queue)
tot_sec = 0
t0 = datetime.now()
while not workQueue.empty():
total_seconds =(datetime.now()-t0).total_seconds()
if total_seconds == 0:
total_seconds = 1e-8
progress = 1-len(workQueue.queue)/total_steps
tot_sec+=total_seconds
print("\rProgreso: {pr:.2f}% || Buenas/Errores: {gb}/{bd}".format(
pr = progress*100,its = 1/total_seconds,elap = timedelta(seconds=np.round(tot_sec)),gb=len(locs),bd=len(errors),eta = timedelta(seconds=np.round(total_seconds*(total_steps-len(workQueue.queue))))),end="",flush=True)
exitFlag = 1
for t in threads:
t.join()
print("\nExiting Main Thread")
process.extract
中的每个请求大约需要25s(%timeit
是)。现在,上面的脚本似乎并不能加快数据处理速度。它已经运行了大约2个小时,并且进展了约4.29%。
我的两个问题是:
- 多线程的实现正确吗?
- 如何加快数据处理速度?也许可以在亚马逊或Google的VPS上运行它?
我想了解为什么这么慢以及如何加快速度。
编辑:更改自:
if not workQueue.empty():
d = q.get()
d = d.strip()
if directory.isin([d]).any():
locs.append(d)
else:
pos = process.extract(d,limit=50)
ratios[d] = pos
queueLock.release()
收件人:
if not workQueue.empty():
d = q.get()
queueLock.release()
d = d.strip()
if directory.isin([d]).any():
locs.append(d)
else:
pos = process.extract(d,limit=50)
ratios[d] = pos
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。