如何解决Pyspark比较数据框中的记录并查找每列的增量
我有两个数据帧,fulldata和deltadata,需要进行比较,并需要根据以下规则进行输出
- 如果两个列的值都相同,则不应在输出中显示-例如emp1
- 当增量数据发生变化时,输出中应包含已更改的列以及在MANDATORY_FIELDS_LOOKUP中定义的强制列–例如emp2,其中已更改的列“ field2”和强制列employee_number,record_type,姓,first_forename和date_of_birth已添加到输出中(字段1和字段3应该为空,因为没有变化)
- 如果更改了必填列,则应包括该记录-例如emp3,其中更改的列是姓和first_forename
- 应该包括增量中的新记录–例如emp5
- 任何更改为空字符串的列都应硬编码为“(空白)” –例如emp6
我有一个工作代码可以提供所需的输出,但是性能却很慢– 250条记录的500条记录大约需要10分钟。我正在使用以下配置在连接到Glue Dev Endpoint的Juypter Notebook中运行此代码。 工人人数15 工种G.2X 数据处理单元(DPU)31 能否以更有效的方式实施?任何帮助表示赞赏。
from pyspark.sql.types import StructType,StringType,StructField
from pyspark.context import SparkContext
from pyspark.sql import SparkSession,DataFrame
from pyspark import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from operator import add
from functools import reduce
spark = SparkContext.getOrCreate()
sql_context = SQLContext(spark)
spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()
MANDATORY_FIELDS_LOOKUP = {
"10PERDET": [
"surname","first_forename","date_of_birth",]
}
def delta_compare(full_df: DataFrame,delta_df: DataFrame,record_header:str,join_key: list) -> DataFrame:
all_fields = full_df.columns
mandatory_fields = MANDATORY_FIELDS_LOOKUP.get(record_header)
delta_df = delta_df.withColumn("isDelta",lit(1))
full_df = full_df.withColumn("isDelta",lit(0))
union_df = full_df.union(delta_df)
window_spec = Window.partitionBy([col(k) for k in join_key]).orderBy(col("isDelta"))
df = (
union_df.withColumn(
"isChanged",reduce(
add,[
when(col(fld) == lag(col(fld)).over(window_spec),0).otherwise(1)
for fld in all_fields
if fld not in ["record_type",*join_key,"isDelta"]
],),)
.select(
*join_key,col("record_type"),*mandatory_fields,*[
when(col(fld) == lag(col(fld)).over(window_spec),"")
.when(
(col(fld) != lag(col(fld)).over(window_spec))
& (col(fld) == ""),"(blank)"
)
.otherwise(col(fld))
.alias(fld)
for fld in all_fields
if fld not in ["record_type","isDelta"] + mandatory_fields
],)
.filter(col("isDelta") == 1)
.filter(col("isChanged") != 0)
)
return df.select(*[fld for fld in all_fields]).orderBy(*join_key)
schema = (StructType([StructField("employee_number",StringType()),StructField("record_type",StructField("surname",StructField("first_forename",StructField("date_of_birth",StructField("field1",StructField("field2",StructField("field3",StringType())])
)
fulldata = [['emp1',"10PERDET",'Neil','Par','16011980','10','20','30'],['emp2','Tom','Hanks','11091982','15','25','35'],['emp3','jag','ram','26121981','17','27','37'],['emp4','right','sam','26121990','oldrow','99','88'],['emp6','coke','john','01021985','29','39','49'],]
full_df = sql_context.createDataFrame(fulldata,schema=schema)
deltadata = [['emp1','new','jag_new','ram_new',['emp5','newjohn','gan','22022020','newrow','01','02'],'',]
delta_df = sql_context.createDataFrame(deltadata,schema=schema)
final_df = delta_compare(full_df,delta_df,['employee_number'])
final_df.show()
输出:
+---------------+-----------+-------+--------------+-------------+------+-------+------+
|employee_number|record_type|surname|first_forename|date_of_birth|field1| field2|field3|
+---------------+-----------+-------+--------------+-------------+------+-------+------+
| emp2| 10PERDET| Tom| Hanks| 11091982| | new| |
| emp3| 10PERDET|jag_new| ram_new| 26121981| new| | |
| emp5| 10PERDET|newjohn| gan| 22022020|newrow| 01| 02|
| emp6| 10PERDET| coke| john| 01021985| |(blank)| |
+---------------+-----------+-------+--------------+-------------+------+-------+------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。