如何解决使用Future和Async处理多个文件IO请求而不会阻塞
我是scala的新手,正在探索如何编写用于从文件读取的非阻塞IO代码。 以下是从缓存文件读取并将JSON输出返回到API端点的方法。
def retrieveCache = {
val source = Source.fromFile(fileName)
val content = try source.mkString
finally source.close()
// Some Parsing JSON Code...
// At the end,clearing the cache from the file
fileName.writeAll("")
}
我阅读了有关使用Future和Async实现无阻塞IO的信息,并做了一些尝试。但是不确定如何做到。
解决方法
通常,只需将阻塞代码转换为基于import os
import glob
import matplotlib as mpl
import matplotlib.pyplot as plt
# import cartopy.crs as ccrs
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.basemap import Basemap
import numpy as np
# The first file in 3 swath files.
FILE_NAME = 'MOD06_L2.A2017126.0655.061.2019226193408.hdf'
GEO_FILE_NAME ='MOD06_L2.A2017126.0655.061.2019226193408.hdf'
DATAFIELD_NAME = 'Brightness_Temperature'
from pyhdf.SD import SD,SDC
i = 0
for file in list(glob.glob('MOD06*.hdf')):
reader = open(file)
hdf = SD(file,SDC.READ)
# Read dataset.
data2D = hdf.select(DATAFIELD_NAME)
data = data2D[:,:].astype(np.double)
hdf_geo = SD(GEO_FILE_NAME,SDC.READ)
# Read geolocation dataset.
lat = hdf_geo.select('Latitude')
latitude = lat[:,:]
lon = hdf_geo.select('Longitude')
longitude = lon[:,:]
# Retrieve attributes.
attrs = data2D.attributes(full=1)
lna=attrs["long_name"]
long_name = lna[0]
aoa=attrs["add_offset"]
add_offset = aoa[0]
fva=attrs["_FillValue"]
_FillValue = fva[0]
sfa=attrs["scale_factor"]
scale_factor = sfa[0]
vra=attrs["valid_range"]
valid_min = vra[0][0]
valid_max = vra[0][1]
ua=attrs["units"]
units = ua[0]
invalid = np.logical_or(data > valid_max,data < valid_min)
invalid = np.logical_or(invalid,data == _FillValue)
data[invalid] = np.nan
data = (data - add_offset) * scale_factor
datam = np.ma.masked_array(data,np.isnan(data))
if i == 0 :
data_m = datam
latitude_m = latitude
longitude_m = longitude
else:
data_m = np.vstack([data_m,datam])
latitude_m = np.vstack([latitude_m,latitude])
longitude_m = np.vstack([longitude_m,longitude])
i = i + 1
m = Basemap(projection='cyl',resolution='l',llcrnrlat=-90,urcrnrlat=90,llcrnrlon=-180,urcrnrlon=180)
m.drawcoastlines(linewidth=0.5)
m.drawparallels(np.arange(-90,91,45))
m.drawmeridians(np.arange(-180,180,45),labels=[True,False,True])
sc = m.scatter(longitude_m,latitude_m,c=data_m,s=0.1,cmap=plt.cm.jet,edgecolors=None,linewidth=0)
cb = m.colorbar()
cb.set_label(units)
# Put title using the first file.
basename = os.path.basename(FILE_NAME)
plt.title('{0}\n{1}'.format(basename,DATAFIELD_NAME))
fig = plt.gcf()
# Save image.
pngfile = "{0}.py.png".format(basename)
fig.savefig(pngfile)
的异步代码,
Future
,然后使用import scala.concurrent.Future
包装代码:
Future.apply
val fut = Future {
// insert code here
}
将使用隐式Future.apply
(基本上是线程池的抽象)运行该块,尽管对于ScalaJS(至少现在是Scala Native),它更像是一个事件-循环)。如果范围内没有隐式ExecutionContext
,则必须指定一个。编译器会建议
ExecutionContext
对于与CPU绑定的操作,隐式全局上下文非常好,但是对于执行阻塞的I / O操作,它不是最优的:该上下文中的线程通常会被阻塞,并且该上下文中的线程数与JVM一样多。检测核心。解决方案是:使用import scala.concurrent.ExecutionContext.Implicits.global // a.k.a. "the implicit global ExecutionContext"
标记正在阻塞的代码部分(这可能使阻塞发生在另一个线程上),或者定义一个blocking
并使用更多线程来进行阻塞I / O,但不用于计算。
要使用ExecutionContext
,这通常就是您所需要的:
blocking
根据我的经验,import scala.concurrent.{ ExecutionContext,Future,blocking }
// Be careful about blindly importing this...
import ExecutionContext.Implicits.global
def retrieveCache = Future {
val content = blocking {
val source = Source.fromFile(fileName)
try {
source.mkString
} finally source.close()
}
// Parse the JSON
blocking {
fileName.writeAll("")
}
}
提供的糖并没有太大的好处:我更喜欢仅使用async
合成。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。