如何读取目录中的多个文件,这些文件都是带有Airflow S3 Hook或boto3的csv.gzip?

如何解决如何读取目录中的多个文件,这些文件都是带有Airflow S3 Hook或boto3的csv.gzip?

我在S3中有一个目录,假设s3://test-bucket/test-folder/2020-08-28/具有这样的文件:

2020-08-28 03:29:13   29397684 data_0_0_0.csv.gz
2020-08-28 03:29:13   29000150 data_0_1_0.csv.gz
2020-08-28 03:29:13   38999956 data_0_2_0.csv.gz
2020-08-28 03:29:13   32079942 data_0_3_0.csv.gz
2020-08-28 03:29:13   34154791 data_0_4_0.csv.gz
2020-08-28 03:29:13   45348128 data_0_5_0.csv.gz
2020-08-28 03:29:13   60904419 data_0_6_0.csv.gz

我正在尝试使用S3钩子(https://airflow.readthedocs.io/en/stable/_modules/airflow/hooks/S3_hook.html)创建一个Airflow运算符,该钩子会将这些文件的内容转储到某个地方。我尝试过:

contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/),contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv)
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv.gz)

这些似乎都不起作用。我注意到这里有s3.select_key,但似乎没有正确的参数,只有输入和输出序列化。有什么方法可以使用S3钩子导入此数据,而无需对文件本身做任何事情?

我的下一个问题是文件夹s3://test-bucket/test-folder/2020-08-28/中有一堆文件。我尝试使用list_keys,但它不喜欢存储桶名称:

keys = s3.list_keys('s3://test-bucket/test-folder/2020-08-28/')

给予

Invalid bucket name "s3://test-bucket/test-folder/2020-08-28/": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$"

我也尝试过相同的操作,但是删除了“ s3://”。任何时候都不会给我验证错误。当我在上面的.csv.gz调用中插入read_key时,它告诉我

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte

我认为这与gzip压缩的事实有关吗?

那么,我该如何1.从S3中读取已压缩的csv文件的密钥,以及2.如何一次在给定目录中读取所有csv文件?

解决方法

假设您正在从s3://your_bucket/your_directory/YEAR-MONTH-DAY/之类的目录中读取文件。然后您可以做两件事:

  • 读取数据路径。读取每个子目录中.csv.gz文件的路径

  • 加载数据。在此示例中,我们将其加载为pandas.DataFrame,但也可以将其保留为gzip Object。

1.A使用Airflow S3挂钩读取路径

# Initialize the s3 hook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_hook = S3Hook()

# Read the keys from s3 bucket
paths = s3_hook.list_keys(bucket_name='your_bucket_name',prefix='your_directory')

其中,要列出键,请在后面使用分页器。这是我们进入路径列表的第三种形式。

1.B使用分页器读取路径

例如,对于分页器,如果要列出s3_//your_bucket/your_directory/item.csv.gz,...等对象,则分页器的工作方式类似于(取自docs的示例)

client = boto3.client('s3',region_name='us-west-2')
paginator = client.get_paginator('list_objects')
operation_parameters = {'Bucket': 'your_bucket','Prefix': 'your_directory'}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
    print(page['Contents'])

,这将输出一个字典列表,您可以从中过滤每个字典的Key以获得要读取的路径列表,也就是说,分页器将抛出类似

的内容
[{'Key': 'your_directoyr/file_1.csv.gz
....},...,{'Key': 'your_directoyr/file_n.csv.gz
....}

现在,我们使用第三种形式进行此操作,与之前的形式相似。

1.C使用Boto 3客户端读取路径

要读取路径,请考虑以下功能

import boto3 

s3_client = boto3.client('s3')

def get_all_s3_objects(s3_client,**base_kwargs):
    continuation_token = None
    while True:
        list_kwargs = dict(MaxKeys=1000,**base_kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token
        response = s3_client.list_objects_v2(**list_kwargs)
        yield from response.get('Contents',[])
        if not response.get('IsTruncated'):  # At the end of the list?
            break
        continuation_token = response.get('NextContinuationToken')

当您使用后缀Key和您的存储桶名称调用此函数时,

files = get_all_s3_objects(s3_client,Bucket='your_bucket_name',Prefix=f'your_directory/YEAR-MONTH-DAY')
paths = [f['Key'] for f in files]

通过调用路径,您将获得包含.csv.gz个文件的列表。就您而言,这将是

[data_0_0_0.csv.gz,data_0_1_0.csv.gz,data_0_2_0.csv.gz]

然后,您可以将其用作以下函数的输入,例如,将数据读取为pandas数据框。

2。加载数据

考虑功能

from io import BytesIO
import pandas as pd

def load_csv_gzip(s3_client,bucket,key):
    with BytesIO() as f:
        s3_files = s3_client.download_fileobj(Bucket=bucket,Key=key,Fileobj=f)
        f.seek(0)
        gzip_fd = gzip.GzipFile(fileobj=f)
        return pd.read_csv(gzip_fd)

最后,您将提供一个包含.csv.gz文件的列表,您可以迭代地加载每个路径并将结果连接到pandas数据框,也可以仅加载一个.csv.gz文件。例如,

data = pd.concat([load_csv_gzip(s3_client,'your_bucket',path) for p in paths])

其中路径的每个元素都类似于your_subdirectory/2020-08-28/your_file.csv.gz

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-