如何解决如何在Python中使用结构化流实现雪花连接器?
目前,我有接受输入并创建数据流的代码。我的目标是将数据上传到雪花。目前,我正在尝试这种方法,是否有更简单的方法可以解决此问题。还是可以将此内容写入pandas df,然后将pandas df上传到雪花?以前它只能用于结构化流媒体,而无法与雪花连接。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
sfconn = {
"sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com","sfUser": os.getenv('SNOWFLAKE_USER'),"sfPassword": os.getenv('SNOWFLAKE_PASSWORD'),"sfDatabase": "x","sfSchema": "x","sfWarehouse": "x"
}
spark = SparkSession.builder\
.appName("snowflake-connector")\
.getOrCreate()
df = spark \
.readStream\
.format('json') \
.schema(spark_schemas['x']) \
.load(f"s3a://{x_path}")
out = df \
.writeStream\
.outputMode("append")\
.option("dbtable","scratch_table")\
.options(sfconn)\
.trigger(processingTime='1 minutes')\
.format("snowflake")\
.start()
现在正在显示
options() takes 1 positional argument but 2 were given
和
: java.lang.ClassNotFoundException: Failed to find data source: snowflake.
解决方法
随后的第一个错误-options
接受指定options的可变对数。如果您有选择作为地图,则需要使用**map
语法来“ unpack”,例如:
opts = {'inferSchema': "true","header": "false"}
df = spark.read.options(**opts)
.format("csv")
.schema("ticker String,date Date,price Float")
.load(".../datasets/dow-quotes.csv")
对于第二个错误-您只需要指定连接器的正确名称-net.snowflake.spark.snowflake
而不是snowflake
,并确保在提交作业时指定了Snowflake Spark连接器。有关更多详细信息,请参见雪花documentation。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。