如何解决udf中的F.regexp_extract返回AttributeError:'NoneType'对象没有属性'_jvm'
我是Spark和pyspark的完全入门者。我有一个庞大的数据集,并且有一组关键字需要检查并从列中提取出来。
我的代码如下
temp_skills = ['sales','it','c']
@F.udf
def lookhere(z) -> str:
strng = ' '
for skill in temp_skills:
strng += F.regexp_extract(z,skill,0)
return strng
spark.udf.register("lookhere",lambda z : lookhere(z),returnType=StringType())
DF.withColumn(
'temp',lookhere(DF.dept_name)
).show(truncate = False)
原始DF:
+------------------+-------+
| dept_name|dept_id|
+------------------+-------+
| finance sales it| 10|
|marketing it sales| 20|
| sales| 30|
| it| 40|
+------------------+-------+
期望的DF:
+------------------+-------+----------+
| dept_name|dept_id| temp|
+------------------+-------+----------+
| finance sales it| 10|sales it c|
|marketing it sales| 20| sales it |
| sales| 30| sales |
| it| 40| it |
+------------------+-------+----------+
错误:
---------------------------------------------------------------------------
PythonException Traceback (most recent call last)
<ipython-input-80-0c11f7327f77> in <module>()
1 DF.withColumn('temp2',2 lookintothis(DF.dept_name)
----> 3 ).show(truncate = False)
/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in show(self,n,truncate,vertical)
440 print(self._jdf.showString(n,20,vertical))
441 else:
--> 442 print(self._jdf.showString(n,int(truncate),vertical))
443
444 def __repr__(self):
/content/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self,*args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer,self.gateway_client,self.target_id,self.name)
1306
1307 for temp_arg in temp_args:
/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in raise_from(e)
PythonException:
An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py",line 605,in main
process()
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py",line 597,in process
serializer.dump_stream(out_iter,outfile)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",line 223,in dump_stream
self.serializer.dump_stream(self._batched(iterator),stream)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",line 141,in dump_stream
for obj in iterator:
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",line 212,in _batched
for item in iterator:
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py",line 450,in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets,f) in udfs)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py",in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets,line 90,in <lambda>
return lambda *a: f(*a)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py",line 107,in wrapper
return f(*args,**kwargs)
File "<ipython-input-75-31ef5eea3b75>",line 7,in lookintothis
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py",line 1811,in regexp_extract
jc = sc._jvm.functions.regexp_extract(_to_java_column(str),pattern,idx)
AttributeError: 'NoneType' object has no attribute '_jvm
环境: Google Colab Windows 10 星火3.0.0 pyspark 3.0.0
我的方法错误吗?还是我的语法?请帮助我理解这一点!
解决方法
我已经通过使用skill
语句检查了dept_name
在in
中的位置。我认为您不需要更换任何东西。
temp_skills = ['sales','it','c']
from pyspark.sql.functions import *
@udf
def lookhere(z) -> str:
strings = []
for skill in temp_skills:
if skill in z: strings.append(skill)
return strings
spark.udf.register("lookhere",lookhere)
df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")
df.withColumn('temp',lookhere('dept_name')).show(4,False)
+------------------+-------+--------------+
|dept_name |dept_id|temp |
+------------------+-------+--------------+
|finance sales it |10 |[sales,it,c]|
|marketing it sales|20 |[sales,it] |
|sales |30 |[sales] |
|it |40 |[it] |
+------------------+-------+--------------+
数据框方法的另一种方法,并通过分割dept_name
来添加更准确的关键字比较。
temp_skills = ['sales','c']
from pyspark.sql.functions import *
df = spark.read.option("header","true").csv("test.csv")
df.withColumn('dept_names',split('dept_name',' ')) \
.withColumn('skills',array(*map(lambda c: lit(c),temp_skills))) \
.withColumn('temp',array_intersect('dept_names','skills')) \
.drop('dept_names','skills').show(4,False)
+------------------+-------+-----------+
|dept_name |dept_id|temp |
+------------------+-------+-----------+
|finance sales it |10 |[sales,it]|
|marketing it sales|20 |[it,sales]|
|sales |30 |[sales] |
|it |40 |[it] |
+------------------+-------+-----------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。