如何解决如何有效连接大型pyspark数据帧和小型python列表以获取数据块上的某些NLP结果
我正在使用SparkNLP和SparkML处理数据块上的NLP。
我使用SparkML的LDA进行主题建模,并获得了以下主题。
这是一个pyspark数据框(df1):
df1:
t_id word_index weights
0 [0,2,3] [0.2105,0.116,0.18]
1 [1,4,6] [0.15,0.05,0.36]
"t_id" is topic id.
"weights" is the weight value of each word with index in "word_index"
The "word_index" in df1 corresponds to the location of each word in the list (lt).
df1 is small with not more than 100 rows.
我有一个单词列表(lt):它是python列表
lt:
['like','book','music','bike','great','pen','laptop']
lt has about 20k words.
我还有另一个大型pyspark数据框(df2),具有超过2000万行。 大小为50+ GB。
df2:
u_id p_id reviews
sra tvs "I like this music" # some english tokens (each token can be found in "lt")
fbs dvr "The book is great"
我想将df1中的“ t_id”(主题)分配给df2的每一行,这样我就可以得到pyspark数据框,例如:
u_id p_id reviews t_id the_highest_weights
sra tvs "I like this music" 1 # the highest of all tokens' weights among all "t_id"s
fbs dvr "The book is great" 4
但是,一个评论可能有多个“ t_id”(主题),因为该评论可能包含多个“ t_id”所涵盖的词。 因此,我必须计算每个“ t_id”的总权重,以便将具有最高总权重的“ t_id”分配给df2中的“评论”。
它表示为最终结果的“ the_highest_weights”。
我不想使用“ for循环”来逐行处理此问题,因为它对于大型数据帧而言效率不高。
如何使用pyspark数据框(而不是熊猫)和矢量化(如果需要)来有效地获得结果?
谢谢
解决方法
我不确定您要计算的确切内容,但是您可以调整此答案以获取所需的内容。假设您要为每个句子找到得分最高的t_id
(由其权重之和得出)。
您可以首先生成一个将每个单词与其索引相关联的数据框。
df_lt = spark.createDataFrame([(i,lt[i]) for i in
range(0,len(lt))],['word_index','w'])
然后,我们将df1展平,以便每行包含一个t_id
索引,一个单词索引和相应的权重。为此,我们可以使用UDF。请注意,在spark> = 2.4中,您可以改用array_union
和create_map
,但是由于df1
很小,因此使用UDF不会有问题。
def create_pairs(index,weights):
return [(index[i],weights[i]) for i in range(0,len(index))]
create_pairs_udf = udf(create_pairs,ArrayType(StructType([
StructField(IntegerType(),'word_index'),StructField(DoubleType(),'weight')
])))
df1_exp = df1\
.select('t_id',explode(create_pairs_udf(df1['word_index'],df1['weights']))
.alias('pair'))\
.select('t_id','pair.word_index','pair.weight')
最后,主要工作是在大型数据帧df2
上完成。我们首先展开句子以每行开始单词(加上u_id
和p_id
)。然后,我们需要与df_lt
一起将单词转换为索引。然后,通过与df1_exp
连接,我们将每个单词索引与其权重相关联。然后,我们将所有索引(包括t_id
)分组以计算权重之和,然后再次分组以为每个句子选择最佳的t_id
。
为加快处理速度,我们可以提示要广播的df_lt
和df1_exp
较小的火花,以避免改组较大的df2
。
代码如下:
df2\
.select("u_id","p_id",explode(split(df2['reviews'],"\\s+")).alias("w"))\
.join(broadcast(df_lt),['w'])\
.drop('w')\
.join(broadcast(df1_exp),['word_index'])\
.groupBy('u_id','p_id','t_id')\
.agg(sum('weight').alias('score'))\
.withColumn('t_id',struct('score','t_id'))\
.groupBy('u_id','p_id')\
.agg(max('t_id').alias('t_id'))\
.select('u_id','t_id.score','t_id.t_id')\
.show()
+----+----+------+----+
|u_id|p_id| score|t_id|
+----+----+------+----+
| fbs| dvr| 0.2| 1|
| sra| tvs|0.3265| 0|
+----+----+------+----+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。