如何解决Spark数据集过滤器元素
我有2个spark数据集: lessonDS和LatestLessonDS;
这是我的Spark数据集POJO:
Lesson class:
private List<SessionFilter> info;
private lessonId;
LatestLesson class:
private String id:
SessionFilter class:
private String id;
private String sessionName;
我想获取所有Lesson数据,其中Lesson类中的info.id不在LatestLesson id中。
类似这样的东西:
lessonDS.filter(explode(col("info.id")).notEqual(latestLessonDS.col("value"))).show();
latestLessonDS contain:
100A
200C
300A
400A
lessonDS contain:
1,[[100A,jon],[200C,natalie]]
2,jon]]
3,[[600A,[400A,Kim]]
result:
3,jon]
解决方法
如果您的数据集的最新数据量足够合理,则可以收集并广播 然后对lessonDS进行简单的过滤器转换即可获得所需的结果。
喜欢
import scala.collection.JavaConversions._
import spark.implicits._
val bc = spark.sparkContext.broadcast(latestLessonDS.collectAsList().toSeq)
lessonDS.mapPartitions(itr => {
val cache = bc.value;
itr.filter(x => {
//check in cache
})
})
,
通常,在连接lessonDs
和latestLessonDs
时,可以使用array_contains函数作为连接条件。但是此功能在这里不起作用,因为联接条件要求lessonDs.info.id
的 all 个元素出现在latestLessonDS
中。
获取结果的一种方法是爆炸lessonDs
,与latestLessonDs
联接,然后检查lessonDs.info
的 all 个元素是否在{{ 1}}通过比较联接前后信息元素的数量而存在:
latestLessonDs
打印
lessonDs
.withColumn("noOfEntries",size('info))
.withColumn("id",explode(col("info.id")))
.join(latestLessonDs,"id" )
.groupBy("lessonId","info","noOfEntries").count()
.filter("noOfEntries = count")
.drop("noOfEntries","count")
.show(false)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。