如何解决Pyspark Groupby在课程中应用UDF
TLDR ;在pyspark的类中进行groupby-apply UDF时,我无法弄清楚如何在bioID列中保留信息。
详细信息:
我正在为需要在主题1和主题2之间比较计数的管道创建一个类。
df = spark.createDataFrame(pd.DataFrame({
"comparison": ["subject1_v_subject2","subject1_v_subject2","subject1_v_subject2"],"bioID":["AAG","ATT","ATG"],"subject1":[12,15,17],"subject3":[123,107,110],"pvalue":[.01,.015,0.112]
},index=[1,2,3]))
comparison bioId subject1 subject3 pvalue
1 subject1_v_subject "AAG" 212 123 0.010
2 subject1_v_subject2 "ATT" 15 107 0.015
3. subject1_v_subject2 "ATG" 17 110 0.112
我需要获取此数据帧并在比较中运行所有pvalue(也称为groupby比较列),运行BH计算以获取排序的pvalue,对其进行排名,然后根据self.alpha返回布尔值以标记是否pvalue是否有意义。
我尝试创建一个单独的类并从主类中调用它,但是我无法弄清楚如何保留bioId列以将结果连接回原始数据框:
class BenjaminiHochbergFDR():
__name__ = "BenjaminiHochbergFDR"
# any input that is static across all rows can be set at initialization time
def __init__(self,alpha):
self.alpha = alpha
# any input that's based on a single row's value goes here
def __call__(self,pvals):
m = len(pvals)
k = -1
while pvals[k + 1] <= (self.alpha * (k + 2) / (1.0 * m)):
k += 1
return [1] * (k + 1) + [0] * (m - (k + 1))
class DifferentialAbundanceAnalysis():
def __init__(self,spark,input_df,alpha=.01):
# set user input variables
self.spark = spark
self.input_df = input_df
self.alpha = alpha
def run_bh(self,alpha):
# BH FDR correction UDF to apply function to pval_df
bh_udf = F.udf(BenjaminiHochbergFDR(self.alpha),ArrayType(StringType()))
# apply bh correction to pval df and run bh udf on list of pvals for each comparison
sig_df = input_df.groupBy("comparison") \
.agg(collect_list("pvalue").alias("pvals")) \
.withColumn("significant",bh_udf("pvals")) \
.withColumn('significant',F.explode(F.col('significant'))) \
.withColumn('pvalue',F.explode(F.col('pvals'))).drop_duplicates()
# this does not work as expected
final_df = pval_df.join(sig_df,on=["comparison","pvalue"],how='left').drop("pvals")
return final_df
所需的输出:
comparison bioId subject1 subject3 pvalue significant
1 subject1_v_subject "AAG" 212 123 0.010 0
2 subject1_v_subject2 "ATT" 15 107 0.015 0
3. subject1_v_subject2 "ATG" 17 110 0.112 0
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。