如何解决从pyspark引用scala / java PipelineStage-HasInputCol的构造函数问题
使用pyspark 2.4.4,python 3.6.5
我正在使用scala编写自定义PipelineStage
。我更喜欢在scala中使用繁重的Spark代码,但我也想在pyspark中使用API。
我注意到pyspark的PipelineStages正在使用JavaTransformer,JavaEstimator
调用Java对象,所以我决定做同样的事情。
我遇到一些构造函数错误,因为我找不到任何有关如何执行此操作的文档的指针,所以不胜感激。也欢迎您提供有关此问题的帮助。
这是我的scala转换器
package org.apache.spark.ml
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.{HasInputCol,HasOutputCol}
import org.apache.spark.sql.{DataFrame,Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType,StructType}
class FooTransformer(val uid: String="foo") extends Transformer with HasInputCol with HasOutputCol {
def transform(dataset: Dataset[_]): DataFrame =
dataset.withColumn(getOutputCol,concat(col(getInputCol),lit(" great success")))
def copy(extra: ParamMap): Transformer = defaultCopy(extra)
def transformSchema(schema: StructType): StructType =
schema.add(getOutputCol,StringType,nullable = false)
}
这是我的变压器的python代码:
from pyspark.ml.param.shared import *
from pyspark import keyword_only
from pyspark.ml.wrapper import JavaTransformer
class FooTransformer(JavaTransformer,HasInputCol,HasOutputCol):
@keyword_only
def __init__(self,inputCol=None,outputCol=None):
super(FooTransformer,self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.FooTransformer",self.uid)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self,outputCol=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
这是我实际运行火花的python应用程序的python代码(我打包了jar然后引用了它)
from pyspark.sql import SparkSession
from foo import FooTransformer
def init_spark():
spark = SparkSession.builder\
.appName("bar") \
.config('spark.jars','path to my jar')\
.getOrCreate()
sc = spark.sparkContext
return spark,sc
def main():
spark,sc = init_spark()
print(spark.version)
lines = [('a',),('b',)]
rdd = sc.parallelize(lines)
df = rdd.toDF(["x"])
transformer = FooTransformer(inputCol="x",outputCol="y")
df0 = transformer.transform(df)
print("testing a transformer that was created in scala")
print(df0.collect())
if __name__ == '__main__':
main()
运行上述结果会导致以下错误(成功打印版本后)
Traceback (most recent call last):
File "~/foo_main.py",line 28,in <module>
main()
File "~/foo_main.py",line 21,in main
transformer = FooTransformer(inputCol="x",outputCol="y")
File "~/venv/lib/python3.6/site-packages/pyspark/__init__.py",line 110,in wrapper
return func(self,**kwargs)
File "~/foo.py",line 11,in __init__
self._java_obj = self._new_java_obj("org.apache.spark.ml.FooTransformer",self.uid)
File "~/venv/lib/python3.6/site-packages/pyspark/ml/wrapper.py",line 67,in _new_java_obj
return java_obj(*java_args)
File "~/venv/lib/python3.6/site-packages/py4j/java_gateway.py",line 1525,in __call__
answer,self._gateway_client,None,self._fqn)
File "~/venv/lib/python3.6/site-packages/pyspark/sql/utils.py",line 63,in deco
return f(*a,**kw)
File "~/venv/lib/python3.6/site-packages/py4j/protocol.py",line 328,in get_return_value
format(target_id,".",name),value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.ml.FooTransformer.
: java.lang.NoSuchMethodError: org.apache.spark.ml.param.shared.HasInputCol.$init$(Lorg/apache/spark/ml/param/shared/HasInputCol;)V
at org.apache.spark.ml.FooTransformer.<init>(FooTransformer.scala:9)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
如果我在scala中从HasInputCol
中删除了对HasOutputCol
和FooTransformer
的任何依赖,它都可以工作,但是我必须使用这些mixins ...
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。