如何解决pySpark流式传输:当您读取stream.json时,可以设置最小批处理大小吗?
当我的批量达到n时,我需要在Spark Streaming上专门执行聚合:
例如,当我运行此代码时:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.readStream.option('cleanSource','delete')\
.option('maxFilesPerTrigger',n)\
.json('data/stream/')
def func(batch_df,batch_id):
results = batch_df.select(aggregation_code).collect()[0]
json.dump(results.asDict(),open(f'data/aggregates/{batch_id}.json','w'))
df.writeStream.foreachBatch(func)
当我只能保证每个批次的大小不大于n时,我需要确保每个批次的大小恰好为n
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。