如何解决为什么在Pyspark中使用Cache / Persist时,完整的DAG在循环的每个迭代中都会重新评估?
当我尝试使用任何一条规则时,要花费250万条记录需要3分钟,但是当我使用完整的5条规则运行时,则需要3小时以上。每个规则都会增加额外的时间,因为DAG从头开始评估。
def match_rows(dataframe):
'''
In this function,I am trying to match all the rows of dataframe with each other
'''
# some logic here
return dataframe
df = spark.read.format("snowflake").options(**options).option("query","SELECT * FROM TABLE").load()
#here some basic transformation
rules_list = ["rule1","rule2","rule3","rule4","rule5"]
for rule in rules_list:
df.cache()
# MATCH_STR is concatination of two three columns based on every rule
df.withColumn("MATCH_STR",concate(*some_columns))
df.repartition(col("MATCH_STR"))
df = sqlContext.createDataFrame(df.rdd.mapPartitions(match_rows),schema = df.schema)
df.take(1) # also tried count
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。