如何解决如何遍历Glue DynamicFrame
您好,IAM正在使用AWS胶水火花。我正在从动态表中获取数据,并从中创建一个动态框架。我希望能够发送该表中的所有数据,并以sqs为单位逐条记录。我看到另一个建议将动态框架转换为spark数据框架。但这将是一个具有数百万条记录的表。转换为数据框可能需要一段时间。我希望能够将动态帧中的所有记录发送到sqs队列。
这是我的代码:
sqs = boto3.resource('sqs')
sqs_queue_url = f"https://sqs.us-east-1.amazonaws.com/{account_id}/my-stream-queue"
queue = sqs.Queue(sqs_queue_url)
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
args = getResolvedOptions(sys.argv,['JOB_NAME'])
job = Job(glueContext)
## @params: [JOB_NAME]
job.init(args['JOB_NAME'],args)
logger = glueContext.get_logger()
df = glueContext.create_dynamic_frame.from_options("dynamodb",connection_options={
"dynamodb.input.tableName": "my_table","dynamodb.throughput.read.percent": "1.5","dynamodb.splits": "500"
},numSlots=2368)
job.commit()
# iterate over dynamic frame and send each record over the sqs queue
for record in df:
queue.send_message(MessageBody=record)
解决方法
我正在做一些非常相似的事情。这是我发现的:
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database="athena",table_name=str(args['value']),transformation_ctx="datasource0")
job.commit()
df = datasource0.toDF()
pandasDF = df.toPandas()
for index,row in pandasDF.iterrows():
message_body = generate_message(
row['bucket'],row['key'],row['version_id'])
send_message(sqs_queue,json.loads(json.dumps(message_body)))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。