如何解决如何读取目录中的多个文件,这些文件都是带有Airflow S3 Hook或boto3的csv.gzip?
我在S3中有一个目录,假设s3://test-bucket/test-folder/2020-08-28/
具有这样的文件:
2020-08-28 03:29:13 29397684 data_0_0_0.csv.gz
2020-08-28 03:29:13 29000150 data_0_1_0.csv.gz
2020-08-28 03:29:13 38999956 data_0_2_0.csv.gz
2020-08-28 03:29:13 32079942 data_0_3_0.csv.gz
2020-08-28 03:29:13 34154791 data_0_4_0.csv.gz
2020-08-28 03:29:13 45348128 data_0_5_0.csv.gz
2020-08-28 03:29:13 60904419 data_0_6_0.csv.gz
我正在尝试使用S3钩子(https://airflow.readthedocs.io/en/stable/_modules/airflow/hooks/S3_hook.html)创建一个Airflow运算符,该钩子会将这些文件的内容转储到某个地方。我尝试过:
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/),contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv)
contents = s3.read_key(key=s3://test-bucket/test-folder/2020-08-28/data_0_0_0.csv.gz)
这些似乎都不起作用。我注意到这里有s3.select_key
,但似乎没有正确的参数,只有输入和输出序列化。有什么方法可以使用S3钩子导入此数据,而无需对文件本身做任何事情?
我的下一个问题是文件夹s3://test-bucket/test-folder/2020-08-28/
中有一堆文件。我尝试使用list_keys
,但它不喜欢存储桶名称:
keys = s3.list_keys('s3://test-bucket/test-folder/2020-08-28/')
给予
Invalid bucket name "s3://test-bucket/test-folder/2020-08-28/": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$"
我也尝试过相同的操作,但是删除了“ s3://”。任何时候都不会给我验证错误。当我在上面的.csv.gz
调用中插入read_key
时,它告诉我
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
我认为这与gzip压缩的事实有关吗?
那么,我该如何1.从S3中读取已压缩的csv文件的密钥,以及2.如何一次在给定目录中读取所有csv文件?
解决方法
假设您正在从s3://your_bucket/your_directory/YEAR-MONTH-DAY/
之类的目录中读取文件。然后您可以做两件事:
-
读取数据路径。读取每个子目录中
.csv.gz
文件的路径 -
加载数据。在此示例中,我们将其加载为
pandas.DataFrame
,但也可以将其保留为gzip Object。
1.A使用Airflow S3挂钩读取路径
# Initialize the s3 hook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_hook = S3Hook()
# Read the keys from s3 bucket
paths = s3_hook.list_keys(bucket_name='your_bucket_name',prefix='your_directory')
其中,要列出键,请在后面使用分页器。这是我们进入路径列表的第三种形式。
1.B使用分页器读取路径
例如,对于分页器,如果要列出s3_//your_bucket/your_directory/item.csv.gz
,...等对象,则分页器的工作方式类似于(取自docs的示例)
client = boto3.client('s3',region_name='us-west-2')
paginator = client.get_paginator('list_objects')
operation_parameters = {'Bucket': 'your_bucket','Prefix': 'your_directory'}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
print(page['Contents'])
,这将输出一个字典列表,您可以从中过滤每个字典的Key
以获得要读取的路径列表,也就是说,分页器将抛出类似
[{'Key': 'your_directoyr/file_1.csv.gz
....},...,{'Key': 'your_directoyr/file_n.csv.gz
....}
现在,我们使用第三种形式进行此操作,与之前的形式相似。
1.C使用Boto 3客户端读取路径
要读取路径,请考虑以下功能
import boto3
s3_client = boto3.client('s3')
def get_all_s3_objects(s3_client,**base_kwargs):
continuation_token = None
while True:
list_kwargs = dict(MaxKeys=1000,**base_kwargs)
if continuation_token:
list_kwargs['ContinuationToken'] = continuation_token
response = s3_client.list_objects_v2(**list_kwargs)
yield from response.get('Contents',[])
if not response.get('IsTruncated'): # At the end of the list?
break
continuation_token = response.get('NextContinuationToken')
当您使用后缀Key和您的存储桶名称调用此函数时,
files = get_all_s3_objects(s3_client,Bucket='your_bucket_name',Prefix=f'your_directory/YEAR-MONTH-DAY')
paths = [f['Key'] for f in files]
通过调用路径,您将获得包含.csv.gz
个文件的列表。就您而言,这将是
[data_0_0_0.csv.gz,data_0_1_0.csv.gz,data_0_2_0.csv.gz]
然后,您可以将其用作以下函数的输入,例如,将数据读取为pandas数据框。
2。加载数据
考虑功能
from io import BytesIO
import pandas as pd
def load_csv_gzip(s3_client,bucket,key):
with BytesIO() as f:
s3_files = s3_client.download_fileobj(Bucket=bucket,Key=key,Fileobj=f)
f.seek(0)
gzip_fd = gzip.GzipFile(fileobj=f)
return pd.read_csv(gzip_fd)
最后,您将提供一个包含.csv.gz
文件的列表,您可以迭代地加载每个路径并将结果连接到pandas数据框,也可以仅加载一个.csv.gz
文件。例如,
data = pd.concat([load_csv_gzip(s3_client,'your_bucket',path) for p in paths])
其中路径的每个元素都类似于your_subdirectory/2020-08-28/your_file.csv.gz
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。