如何解决为什么使用Parquet Dataset读取行的一小部分与读取整个文件需要相同的时间?
我正在开发一个程序来分析某些资产的某些历史价格。数据被构造和分析为熊猫数据框。列是日期,行是资产。以前,我使用这种移调,但是这种格式给了我更好的阅读时间。我将这些数据保存在一个实木复合地板文件中,现在我想读取一个从A到B的日期间隔和一小部分资产,分析它,然后使用相同的资产重复相同的过程,但是从B + 1至C。 问题是,即使我使用唯一的行,镶木地板的读取时间也与读取整个文件的时间相同。有没有办法改善这种行为?,最好是,一旦它过滤了行,它就可以保存内存中块的位置,以加快下一次读取的速度。我必须写一个过滤了资产的新文件吗?。
为了避免完全读取,我尝试编写具有较少行组和较小数据页大小的镶木地板文件,但这在时间方面并没有取得很好的效果。
我还有其他问题。为什么我们如果使用Parquet数据集读取完整的Parquet文件并且use_legacy_dataset = False,则比用use_legacy_dataset = True读取相同的Parquet数据集要花费更多的时间?
代码示例:
import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
# generating the small data for the example,the file weight like 150MB for this example,the real data
# has 2 GB
dates = pd.bdate_range('2019-01-01','2020-03-01')
assets = list(range(1000,50000))
historical_prices = pd.DataFrame(np.random.rand(len(assets),len(dates)),assets,dates)
historical_prices.columns = historical_prices.columns.strftime('%Y-%m-%d')
# name of the index
historical_prices.index.name = 'assets'
# writing the parquet file using the lastest version,in the comments are the thigns that I tested
historical_prices.to_parquet(
'historical_prices.parquet',version='2.0',data_page_version='2.0',writer_engine_version='2.0',# row_group_size=100,# compression=None
# use_dictionary=False,# data_page_size=1000,# use_byte_stream_split=True,# flavor='spark',)
# reading the complete parquet dataset
start_time = time.time()
historical_prices_dataset = pq.ParquetDataset(
'historical_prices.parquet',use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()
print(time.time() - start_time)
# Reading only one asset of the parquet dataset
start_time = time.time()
filters = [('assets','=',assets[0])]
historical_prices_dataset = pq.ParquetDataset(
'historical_prices.parquet',filters=filters,use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()
print(time.time() - start_time)
# this is what I want to do,read by intervals.
num_intervals = 5
for i in range(num_intervals):
start = int(i * len(dates) / num_intervals)
end = int((i + 1) * len(dates) / num_intervals)
interval = list(dates[start:end].strftime('%Y-%m-%d'))
historical_prices_dataset.read_pandas(columns=interval).to_pandas()
# Here goes some analyzing process that can't be done in parallel due that the results of every interval
# are used in the next interval
print(time.time() - start_time)
解决方法
我正在使用它的转置,但是这种格式给了我更好的阅读时间。
Parquet支持单个列读取。因此,如果您有10列1万行,而您想要5列,那么您将读取5万个单元格。如果您有10k列的10行,并且想要5列,那么您将读取50个单元格。因此,大概这就是为什么移调为您提供更好的阅读时间的原因。我认为这里没有足够的细节。 Parquet还支持读取单个行组,稍后会对此进行更多介绍。
您大约有49,000个资产和300个日期。我希望您通过使用资产作为列来获得更好的性能,但是有49,000个列是很多的。您有可能不得不读取过多的列元数据,或者正在通过跟踪如此多的列来处理CPU开销。
将日期值或资产ID作为列有点奇怪。更为典型的布局是具有三列:“日期”,“资产ID”和“价格”。
问题在于,即使我使用唯一的一行,镶木地板的读取也需要与读取整个文件相同的时间
是的,如果您有一个行组。 Parquet不支持部分行组读取。我相信这是由于列被压缩的事实。但是,我没有得到与您相同的结果。示例中的中间时间(读取单个资产)通常约为首次读取时间的60-70%。所以它更快。可能是因为到达熊猫所需的转换较少,或者可能是我不知道的一些优化。
问题在于,即使我使用唯一的行,读取实木复合地板的时间也与读取整个文件的时间相同。有没有办法改善这种行为?,最好是,一旦它过滤了行,它就可以保存内存中块的位置,以加快下一次读取的速度。我必须写一个过滤了资产的新文件吗?。
行组可能是您的答案。请参阅下一节。
为了避免完全读取,我尝试编写具有较少行组和较小数据页大小的镶木地板文件,但这在时间方面并没有取得很好的效果。
这可能是您想要的(或者您可以使用多个文件)。 Parquet支持从整个文件中读取一个行组。但是,对于row_group_size,数字100太小。每个行组都会在文件中创建一些元数据,并具有一些处理开销。例如,如果我将其更改为10,000,则中间读取速度快一倍(现在仅读取整个表的30-40%)。
我还有其他问题。为什么我们如果使用Parquet数据集读取完整的Parquet文件并且use_legacy_dataset = False,则比用use_legacy_dataset = True读取相同的Parquet数据集要花费更多的时间?
这个新的数据集API相当新(7月发布的1.0.0版本中是新的)。可能会有更多的开销。您没有做任何可以利用新数据集API的事情(例如,使用扫描或非拼花数据集或新文件系统)。因此,虽然use_legacy_datasets不应较快,但也不应较慢。他们应该花费大致相同的时间。
听起来您有很多资产(成千上万),并且您想阅读其中的一些资产。您还希望将读取分块为较小的读取(使用日期)。
首先,我建议不要使用日期,而建议使用dataset.scan
(https://arrow.apache.org/docs/python/dataset.html)。这样您就可以一次处理一个行组的数据。
第二,有什么方法可以对资产ID进行分组?如果每个资产ID只有一行,则可以忽略此行。但是,例如,如果每个资产ID有500行(每个资产ID /日期对有1行),则可以编写文件,使其看起来像这样...
asset_id date price
A 1 ?
A 2 ?
A 3 ?
B 1 ?
B 2 ?
B 3 ?
如果执行此操作,并且将行组大小设置为合理的值(尝试10k或100k,然后从那里进行细化),则应该能够得到它,以便每个资产ID仅读取1个或2个行组
,我发现了另一种方法,可以为我的特定情况提供更好的时间,当然,这不是一个非常通用的解决方案。它具有一些不是pyarrow的功能,但是当我们多次读取同一行时,它的作用与我认为pyarrow的过滤器相同。当要读取的行组数量增加时,实木复合地板数据集将提供更好的性能。
import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
from typing import Dict,Any,List
class PriceGroupReader:
def __init__(self,filename: str,assets: List[int]):
self.price_file = pq.ParquetFile(filename)
self.assets = assets
self.valid_groups = self._get_valid_row_groups()
def _get_valid_row_groups(self):
"""
I don't fine a parquet function to make this row group search,so I did this manual search.
Note: The assets index is sorted,so probably this can be improved a lot.
"""
start_time = time.time()
assets = pd.Index(self.assets)
valid_row_groups = []
index_position = self.price_file.schema.names.index("assets")
for i in range(self.price_file.num_row_groups):
row_group = self.price_file.metadata.row_group(i)
statistics = row_group.column(index_position).statistics
if np.any((statistics.min <= assets) & (assets <= statistics.max)):
valid_row_groups.append(i)
print("getting the row groups: {}".format(time.time() - start_time))
return valid_row_groups
def read_valid_row_groups(self,dates: List[str]):
row_groups = []
for row_group_pos in self.valid_groups:
df = self.price_file.read_row_group(row_group_pos,columns=dates,use_pandas_metadata=True).to_pandas()
df = df.loc[df.index.isin(self.assets)]
row_groups.append(df)
df = pd.concat(row_groups)
"""
# This is another way to read the groups but I think it can consume more memory,probably is faster.
df = self.price_file.read_row_groups(self.valid_groups,use_pandas_metadata=True).to_pandas()
df = df.loc[df.index.isin(self.assets)]
"""
return df
def write_prices(assets: List[int],dates: List[str]):
historical_prices = pd.DataFrame(np.random.rand(len(assets),len(dates)),assets,dates)
# name of the index
historical_prices.index.name = 'assets'
# writing the parquet file using the lastest version,in the comments are the thigns that I tested
historical_prices.to_parquet(
'historical_prices.parquet',version='2.0',data_page_version='2.0',writer_engine_version='2.0',row_group_size=4000,# compression=None
# use_dictionary=False,# data_page_size=1000,# use_byte_stream_split=True,# flavor='spark',)
# generating the small data for the example,the file weight like 150MB,the real data weight 2 GB
total_dates = list(pd.bdate_range('2019-01-01','2020-03-01').strftime('%Y-%m-%d'))
total_assets = list(range(1000,50000))
write_prices(total_assets,total_dates)
# selecting a subset of the whole assets
valid_assets = total_assets[:3000]
# read the price file for the example
price_group_reader = PriceGroupReader('historical_prices.parquet',valid_assets)
# reading all the dates,only as an example
start_time = time.time()
price_group_reader.read_valid_row_groups(total_dates)
print("complete reading: {}".format(time.time() - start_time))
# this is what I want to do,read by intervals.
num_intervals = 5
start_time = time.time()
for i in range(num_intervals):
start = int(i * len(total_dates) / num_intervals)
end = int((i + 1) * len(total_dates) / num_intervals)
interval = list(total_dates[start:end])
df = price_group_reader.read_valid_row_groups(interval)
# print(df)
print("interval reading: {}".format(time.time() - start_time))
filters = [('assets','in',valid_assets)]
price_dataset = pq.ParquetDataset(
'historical_prices.parquet',filters=filters,use_legacy_dataset=False
)
start_time = time.time()
price_dataset.read_pandas(columns=total_dates).to_pandas()
print("complete reading with parquet dataset: {}".format(time.time() - start_time))
start_time = time.time()
for i in range(num_intervals):
start = int(i * len(total_dates) / num_intervals)
end = int((i + 1) * len(total_dates) / num_intervals)
interval = list(total_dates[start:end])
df = price_dataset.read_pandas(columns=interval).to_pandas()
print("interval reading with parquet dataset: {}".format(time.time() - start_time))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。