Spark Cassandra连接器+加入超时

如何解决Spark Cassandra连接器+加入超时

我需要加入两个Spark数据帧,然后将结果返回给Hive。以下是数据框:

数据框1:Cassandra表-分区和聚类键:(ID,PART_NBR)

val df1 = spark.read.format("org.apache.spark.sql.cassandra")
    .option("keyspace","mykeyspace")
    .option("table","mytable")
    .load

** Dataframe2:从另一个来源获得的键(属于上表中的ID列的分区键)的数据框-该表中不同键的数量约为15万个**

val df2 = spark.read
    .format("jdbc")
    .option("driver","com.mysql.jdbc.Driver")
    .option("url","****")
    .option("dbtable","table")
    .option("user","username")
    .option("password","password123")
    .load()

  val joinExpr = df1.col("ID") === df2.col("ID")

  val res = df1.join(df2,joinExpr)

  res.write.mode(SaveMode.Append).format("orc")
    .saveAsTable("targetTable")

现在,此代码始终会导致“ com.datastax.oss.driver.api.core.servererrors.ReadFailureException:一致性为LOCAL_ONE的读取查询期间Cassandra失败(需要1个响应,但仅响应0个副本,1个失败)”

即使失败也将LOCAL_ONE更改为QUORUM。

我甚至尝试将密钥数据帧拆分为20个KEYS(数据帧中20个ID值)的批处理,然后与cassandra表联接-即使失败。

我什至尝试了IN子句,尽管它有效,但DBA限制了我们在加载Cassandra时运行它的操作,并导致CPU峰值。

在检查Cassandra DBA时,他们要求进行有针对性的查询,因为上述查询会导致较大的令牌范围扫描,并且会导致失败。但是,单独的有针对性的查询将导致与Cassandra的往返15万次(这需要花费几个小时才能完成),而且费用太高。

为什么会产生如此巨大的令牌范围扫描?我们该如何解决?我还有什么选择?

pom.xml依赖性

 <scala.version>2.11.12</scala.version>
 <spark.version>2.2.0</spark.version>

        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.5.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>



尝试了以下操作,但未进行直接联接。我有什么想念吗?

spark-submit --class ExampleCassandra --deploy-mode client --num-executors 15 --executor-memory 4g  --driver-memory=1g  --conf spark.sql.shuffle.partitions=25 --conf spark.executor.heartbeatInterval=100s --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --jars spark-sql_2.11-2.4.0.jar,spark-core_2.11-2.4.0.jar,spark-hive_2.11-2.4.0.jar,mysql-connector-java-8.0.18.jar,spark-cassandra-connector_2.11-2.5.1.jar ExampleCassandra-bundled-1.0-SNAPSHOT.jar

代码中印有= Spark.sparkContext.version = 2.4.0的火花版本

产生的计划

== Physical Plan ==
*(8) SortMergeJoin [item_nbr#31],[item_nbr#24],Inner
:- *(2) Sort [item_nbr#31 ASC NULLS FIRST],false,0
:  +- Exchange hashpartitioning(item_nbr#31,25)
:     +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [item_nbr#31,planNum#32,strN#33,currTail#34,currTailTy#35,hor#36,prNbr#37,revSce#38,stckHnad#39] PushedFilters: [],ReadSchema: struct<item_nbr:int,planNum:int,strN:int,currTail:decimal(38,18),currTailTy:s...
+- *(7) Sort [item_nbr#24 ASC NULLS FIRST],0
   +- Exchange hashpartitioning(item_nbr#24,25)
      +- *(6) HashAggregate(keys=[item_nbr#21],functions=[])
         +- Exchange hashpartitioning(item_nbr#21,25)
            +- *(5) HashAggregate(keys=[item_nbr#21],functions=[])
               +- *(5) Filter (NOT (trim(lower(item_nbr#21),None) = null) && isnotnull(cast(trim(item_nbr#21,None) as int)))
                  +- Generate explode(split(items#4,)),[item_nbr#21]
                     +- *(4) Project [items#4]
                        +- *(4) BroadcastHashJoin [planNum#0],[planNum#2],Inner,BuildRight
                           :- *(4) Scan JDBCRelation(( select planNum from QAMdPlans.Plan where plan_type = 'MBM' order by planNum desc ) t) [numPartitions=1] [planNum#0] PushedFilters: [*IsNotNull(planNum)],ReadSchema: struct<planNum:int>
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,int,false] as bigint)))
                              +- *(3) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [planNum#2,items#4] PushedFilters: [],ReadSchema: struct<planNum:int,items:string>

解决方法

问题在于版本2.0.5不能优化Dataframe的联接-如果您执行res.explain,则会看到Spark将执行从Cassandra读取所有数据的操作,然后对火花级别。优化的联接仅在RDD API中以leftJoinWithCassandraTablejoinWithCassandraTable的形式提供。

release of the Spark Cassandra Connector 2.5的情况已经改变,现在包括针对Dataframe API的优化联接(但是您需要启用Spark SQL扩展才能使其工作)。因此,您需要将连接器升级到2.5.latest(目前为2.5.1),或使用RDD API中的加入功能。

P.S。我最近wrote a detailed blog post介绍了同时使用Dataframe和RDD API与Spark中的Cassandra表中的数据进行有效连接的方法。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?