如何解决在RDD元素上评估pyspark中的Xquery
我们正在尝试读取大量XML,并在pyspark中对它们运行Xquery,例如book xml。我们正在使用 spark-xml-utils 库。
- 我们要将包含xml的目录提供给pyspark。
- 对所有这些都运行Xquery以得到我们的结果。
参考答案:Calling scala code in pyspark for XSLT transformations
xquery处理器的定义,其中xquery是xquery的字符串:
proc = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(xquery)
我们正在使用以下方法读取目录中的文件:
sc.wholeTextFiles("xmls/test_files")
这为我们提供了一个RDD,其中包含所有文件作为元组列表:
[(Filename1,FileContentAsAString),(Filename2,File2ContentAsAString)]
如果我们在字符串(FileContentAsAString)上运行,则xquery会评估并给我们结果
whole_files = sc.wholeTextFiles("xmls/test_files").collect()
proc.evaluate(whole_files[1][1])
# Prints proper xquery result for that file
问题:
如果我们尝试使用lambda函数在RDD上运行proc.evaluate(),则会失败。
test_file = sc.wholeTextFiles("xmls/test_files")
test_file.map(lambda x: proc.evaluate(x[1])).collect()
# Should give us a list of xquery results
错误:
PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
这些功能以某种方式工作,但以上评估无效:
打印应用了xquery的内容
test_file.map(lambda x: x[1]).collect()
# Outputs the content. if x[0],gives us the list of filenames
返回内容中的字符长度
test_file.map(lambda x: len(x[1])).collect()
# Output: [15274,13689,13696]
可供参考的图书示例:
books_xquery = """for $x in /bookstore/book
where $x/price>30
return $x/title/data()"""
proc_books = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(books_xquery)
books_xml = sc.wholeTextFiles("xmls/books.xml")
books_xml.map(lambda x: proc_books.evaluate(x[1])).collect()
# Error
# I can share the stacktrace if you guys want
解决方法
不幸的是,不可能在从Python代码进行的映射调用中直接调用Java / Scala库。 This answer很好地解释了为什么没有简单的方法可以做到这一点。简而言之,原因是Py4J网关(将Python调用“转换”为JVM世界所必需的)仅位于驱动程序节点上,而您尝试执行的map调用则在执行程序节点上运行。 >
解决该问题的一种方法是将XQuery函数包装在Scala UDF中(解释为here),但是仍然有必要编写几行Scala代码。
EDIT :如果您能够从XQuery切换到XPath,则可能更简单的选择是更改(XPath)库。 ElementTree是用Python编写的XML库,也是XPath。
代码
xmls = spark.sparkContext.wholeTextFiles("xmls/test_files")
import xml.etree.ElementTree as ET
xpathquery = "...your query..."
xmls.flatMap(lambda x: ET.fromstring(x[1]).findall(xpathquery)) \
.map(lambda x: x.text) \
.foreach(print)
将针对从目录xpathquery
加载的所有文档打印运行xmls/test_files
的所有结果。
首先,将flatMap用作findall调用,以返回每个文档中所有匹配的elements的列表。通过使用flatMap
,此列表将变平(结果每个文件可能包含多个元素)。在第二个map
调用中,将元素映射到它们的text以获得可读的输出。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。