为什么使用Parquet Dataset读取行的一小部分与读取整个文件需要相同的时间?

如何解决为什么使用Parquet Dataset读取行的一小部分与读取整个文件需要相同的时间?

我正在开发一个程序来分析某些资产的某些历史价格。数据被构造和分析为熊猫数据框。列是日期,行是资产。以前,我使用这种移调,但是这种格式给了我更好的阅读时间。我将这些数据保存在一个实木复合地板文件中,现在我想读取一个从A到B的日期间隔和一小部分资产,分析它,然后使用相同的资产重复相同的过程,但是从B + 1至C。 问题是,即使我使用唯一的行,镶木地板的读取时间也与读取整个文件的时间相同。有没有办法改善这种行为?,最好是,一旦它过滤了行,它就可以保存内存中块的位置,以加快下一次读取的速度。我必须写一个过滤了资产的新文件吗?。

为了避免完全读取,我尝试编写具有较少行组和较小数据页大小的镶木地板文件,但这在时间方面并没有取得很好的效果。

我还有其他问题。为什么我们如果使用Parquet数据集读取完整的Parquet文件并且use_legacy_dataset = False,则比用use_legacy_dataset = True读取相同的Parquet数据集要花费更多的时间?

代码示例:

import pandas as pd 
import numpy as np
import time
import pyarrow.parquet as pq

# generating the small data for the example,the file weight like 150MB for this example,the real data 
# has 2 GB
dates = pd.bdate_range('2019-01-01','2020-03-01')
assets = list(range(1000,50000))

historical_prices = pd.DataFrame(np.random.rand(len(assets),len(dates)),assets,dates)
historical_prices.columns = historical_prices.columns.strftime('%Y-%m-%d')

# name of the index
historical_prices.index.name = 'assets'

# writing the parquet file using the lastest version,in the comments are the thigns that I tested
historical_prices.to_parquet(
    'historical_prices.parquet',version='2.0',data_page_version='2.0',writer_engine_version='2.0',# row_group_size=100,# compression=None
    # use_dictionary=False,# data_page_size=1000,# use_byte_stream_split=True,# flavor='spark',)


# reading the complete parquet dataset 
start_time = time.time()

historical_prices_dataset = pq.ParquetDataset(
    'historical_prices.parquet',use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()

print(time.time() - start_time)


# Reading only one asset of the parquet dataset
start_time = time.time()


filters = [('assets','=',assets[0])]
historical_prices_dataset = pq.ParquetDataset(
    'historical_prices.parquet',filters=filters,use_legacy_dataset=False
)

historical_prices_dataset.read_pandas().to_pandas()

print(time.time() - start_time)

# this is what I want to do,read by intervals.
num_intervals = 5

for i in range(num_intervals):
    start = int(i * len(dates) / num_intervals)
    end = int((i + 1) * len(dates) / num_intervals)
    interval = list(dates[start:end].strftime('%Y-%m-%d'))
    historical_prices_dataset.read_pandas(columns=interval).to_pandas()

    # Here goes some analyzing process that can't be done in parallel due that the results of every interval
    # are used in the next interval

print(time.time() - start_time)


解决方法

我正在使用它的转置,但是这种格式给了我更好的阅读时间。

Parquet支持单个列读取。因此,如果您有10列1万行,而您想要5列,那么您将读取5万个单元格。如果您有10k列的10行,并且想要5列,那么您将读取50个单元格。因此,大概这就是为什么移调为您提供更好的阅读时间的原因。我认为这里没有足够的细节。 Parquet还支持读取单个行组,稍后会对此进行更多介绍。

您大约有49,000个资产和300个日期。我希望您通过使用资产作为列来获得更好的性能,但是有49,000个列是很多的。您有可能不得不读取过多的列元数据,或者正在通过跟踪如此多的列来处理CPU开销。

将日期值或资产ID作为列有点奇怪。更为典型的布局是具有三列:“日期”,“资产ID”和“价格”。

问题在于,即使我使用唯一的一行,镶木地板的读取也需要与读取整个文件相同的时间

是的,如果您有一个行组。 Parquet不支持部分行组读取。我相信这是由于列被压缩的事实。但是,我没有得到与您相同的结果。示例中的中间时间(读取单个资产)通常约为首次读取时间的60-70%。所以它更快。可能是因为到达熊猫所需的转换较少,或者可能是我不知道的一些优化。

问题在于,即使我使用唯一的行,读取实木复合地板的时间也与读取整个文件的时间相同。有没有办法改善这种行为?,最好是,一旦它过滤了行,它就可以保存内存中块的位置,以加快下一次读取的速度。我必须写一个过滤了资产的新文件吗?。

行组可能是您的答案。请参阅下一节。

为了避免完全读取,我尝试编写具有较少行组和较小数据页大小的镶木地板文件,但这在时间方面并没有取得很好的效果。

这可能是您想要的(或者您可以使用多个文件)。 Parquet支持从整个文件中读取一个行组。但是,对于row_group_size,数字100太小。每个行组都会在文件中创建一些元数据,并具有一些处理开销。例如,如果我将其更改为10,000,则中间读取速度快一倍(现在仅读取整个表的30-40%)。

我还有其他问题。为什么我们如果使用Parquet数据集读取完整的Parquet文件并且use_legacy_dataset = False,则比用use_legacy_dataset = True读取相同的Parquet数据集要花费更多的时间?

这个新的数据集API相当新(7月发布的1.0.0版本中是新的)。可能会有更多的开销。您没有做任何可以利用新数据集API的事情(例如,使用扫描或非拼花数据集或新文件系统)。因此,虽然use_legacy_datasets不应较快,但也不应较慢。他们应该花费大致相同的时间。


听起来您有很多资产(成千上万),并且您想阅读其中的一些资产。您还希望将读取分块为较小的读取(使用日期)。

首先,我建议不要使用日期,而建议使用dataset.scanhttps://arrow.apache.org/docs/python/dataset.html)。这样您就可以一次处理一个行组的数据。

第二,有什么方法可以对资产ID进行分组?如果每个资产ID只有一行,则可以忽略此行。但是,例如,如果每个资产ID有500行(每个资产ID /日期对有1行),则可以编写文件,使其看起来像这样...

asset_id  date  price
A         1     ?
A         2     ?
A         3     ?
B         1     ?
B         2     ?
B         3     ?

如果执行此操作,并且将行组大小设置为合理的值(尝试10k或100k,然后从那里进行细化),则应该能够得到它,以便每个资产ID仅读取1个或2个行组

,

我发现了另一种方法,可以为我的特定情况提供更好的时间,当然,这不是一个非常通用的解决方案。它具有一些不是pyarrow的功能,但是当我们多次读取同一行时,它的作用与我认为pyarrow的过滤器相同。当要读取的行组数量增加时,实木复合地板数据集将提供更好的性能。

import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
from typing import Dict,Any,List


class PriceGroupReader:
    def __init__(self,filename: str,assets: List[int]):
        self.price_file = pq.ParquetFile(filename)
        self.assets = assets
        self.valid_groups = self._get_valid_row_groups()

    def _get_valid_row_groups(self):
        """
        I don't fine a parquet function to make this row group search,so I did this manual search.
        Note: The assets index is sorted,so probably this can be improved a lot.
        """
        start_time = time.time()
        assets = pd.Index(self.assets)
        valid_row_groups = []
        index_position = self.price_file.schema.names.index("assets")

        for i in range(self.price_file.num_row_groups):
            row_group = self.price_file.metadata.row_group(i)
            statistics = row_group.column(index_position).statistics
            if np.any((statistics.min <= assets) & (assets <= statistics.max)):
                valid_row_groups.append(i)

        print("getting the row groups: {}".format(time.time() - start_time))
        return valid_row_groups

    def read_valid_row_groups(self,dates: List[str]):
        
        row_groups = []
        for row_group_pos in self.valid_groups:
            df = self.price_file.read_row_group(row_group_pos,columns=dates,use_pandas_metadata=True).to_pandas()
            df = df.loc[df.index.isin(self.assets)]
            row_groups.append(df)

        df = pd.concat(row_groups)
    

        """
        # This is another way to read the groups but I think it can consume more memory,probably is faster.
        df = self.price_file.read_row_groups(self.valid_groups,use_pandas_metadata=True).to_pandas()
        df = df.loc[df.index.isin(self.assets)]
        """
        
        return df


def write_prices(assets: List[int],dates: List[str]):
    historical_prices = pd.DataFrame(np.random.rand(len(assets),len(dates)),assets,dates)

    # name of the index
    historical_prices.index.name = 'assets'

    # writing the parquet file using the lastest version,in the comments are the thigns that I tested
    historical_prices.to_parquet(
        'historical_prices.parquet',version='2.0',data_page_version='2.0',writer_engine_version='2.0',row_group_size=4000,# compression=None
        # use_dictionary=False,# data_page_size=1000,# use_byte_stream_split=True,# flavor='spark',)


# generating the small data for the example,the file weight like 150MB,the real data weight 2 GB
total_dates = list(pd.bdate_range('2019-01-01','2020-03-01').strftime('%Y-%m-%d'))
total_assets = list(range(1000,50000))
write_prices(total_assets,total_dates)

# selecting a subset of the whole assets
valid_assets = total_assets[:3000]

# read the price file for the example
price_group_reader = PriceGroupReader('historical_prices.parquet',valid_assets)

# reading all the dates,only as an example
start_time = time.time()
price_group_reader.read_valid_row_groups(total_dates)
print("complete reading: {}".format(time.time() - start_time))

# this is what I want to do,read by intervals.
num_intervals = 5

start_time = time.time()
for i in range(num_intervals):
    start = int(i * len(total_dates) / num_intervals)
    end = int((i + 1) * len(total_dates) / num_intervals)
    interval = list(total_dates[start:end])
    df = price_group_reader.read_valid_row_groups(interval)
    # print(df)

print("interval reading: {}".format(time.time() - start_time))


filters = [('assets','in',valid_assets)]
price_dataset = pq.ParquetDataset(
    'historical_prices.parquet',filters=filters,use_legacy_dataset=False
)

start_time = time.time()
price_dataset.read_pandas(columns=total_dates).to_pandas()
print("complete reading with parquet dataset: {}".format(time.time() - start_time))

start_time = time.time()
for i in range(num_intervals):
    start = int(i * len(total_dates) / num_intervals)
    end = int((i + 1) * len(total_dates) / num_intervals)
    interval = list(total_dates[start:end])
    df = price_dataset.read_pandas(columns=interval).to_pandas()

print("interval reading with parquet dataset: {}".format(time.time() - start_time))

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-