如何解决如何滞后偏移
我有一个给定的数据框,如下所示:
TEST_schema = StructType([StructField("Date",StringType(),True),\
StructField("START",\
StructField("quantity",IntegerType(),\
StructField("col1",StructField("col2",True)])
TEST_data = [('2020-08-15','2020-08-19',1,'2020-08-05','2020-08-09'),('2020-08-16',2,'2020-08-09')\,('2020-08-17',3,'2020-08-06',\
('2020-08-18',4,'2020-08-10','2020-08-11'),('2020-08-19',5,'2020-08-16','2020-08-19'),\
('2020-08-20',6,'2020-08-20','2020-08-25'),('2020-08-21',7,'2020-08-21'),\
('2020-08-22',8,'2020-08-24'),('2020-08-23',9,'2020-08-09')]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data,TEST_schema)
TEST_df = TEST_df.withColumn("Date",to_date("Date"))\
.withColumn("START",to_date("START"))\
.withColumn("col1",to_date("col1"))\
.withColumn("col2",to_date("col2"))\
TEST_df.show()
+----------+----------+--------+----------+----------+
| Date| START|quantity| col1| col2|
+----------+----------+--------+----------+----------+
|2020-08-15|2020-08-19| 1|2020-08-05|2020-08-09|
|2020-08-16|2020-08-19| 2|2020-08-05|2020-08-09|
|2020-08-17|2020-08-19| 3|2020-08-06|2020-08-09|
|2020-08-18|2020-08-19| 4|2020-08-10|2020-08-11|
|2020-08-19|2020-08-19| 5|2020-08-16|2020-08-19|
|2020-08-20|2020-08-19| 6|2020-08-20|2020-08-25|
|2020-08-21|2020-08-19| 7|2020-08-20|2020-08-21|
|2020-08-22|2020-08-19| 8|2020-08-19|2020-08-24|
|2020-08-23|2020-08-19| 9|2020-08-05|2020-08-09|
+----------+----------+--------+----------+----------+
其中col1和col2可能不是唯一的,并且Date仅是增量日期,而START是唯一的。
我的逻辑是,如果START == col2,
则lag(quantity,offset= datediff(col2,col1),0)
否则为0。
在这种情况下,datediff(col2,col1)是3天。
尝试1。
from pyspark.sql.functions import when,col,datediff,expr
TEST_df = TEST_df.withColumn('datedifff',datediff(col('col2'),col('col1')))\
.withColumn('want',expr("IF(START == col2,lag(quantity,datedifff,0),0) "))
存在文字错误...
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。