为什么在Spark GraphX中执行Pregel时会出现TypeMismatch错误?

如何解决为什么在Spark GraphX中执行Pregel时会出现TypeMismatch错误?

我已经用Spark GraphX中的Pregel编写了算法。但不幸的是,我收到TypeMismatch错误。 我用:val my_graph= GraphLoader.edgeListFile(sc,path)加载图形。因此,开始时节点的结构如下:

(1,1)
(2,1)
(3,1)

以nodeID为键,默认属性为1。

首先在run2函数内部,更改结构,以使每个节点可以存储多个属性。因为我正在研究重叠的社区检测算法,所以属性是标签及其分数。 在run2的第一轮运行中,每个节点的结构如下:

(34,Map(34 -> (1.0,34)))
(13,Map(13 -> (1.0,13)))
(4,Map(4 -> (1.0,4)))
(16,Map(16 -> (1.0,16)))
(22,Map(22 -> (1.0,22)))

这意味着节点34具有标签34,其分数等于1。然后,每个节点可以存储从其邻居接收到的多个属性,并在下一步中将其发送给其邻居。

在算法结束时,每个节点可以包含多个属性或仅包含一个属性,例如以下结构:

(1,Map((2->(0.49,1),(8->(0.9,1)),(13->(0.79,1))))
(2,Map((11->(0.89,2)),(6->(0.68,(10->(0.57,2))))
(3,Map((20->(0.0.8,3)),(1->(0.66,3))))

上面的结构显示,例如,节点1属于社区2,得分为0.49,属于社区8,得分为0.9,属于社区13,得分为0.79。

下面的代码显示了Pregel中定义的不同功能。

def run2[VD,ED: ClassTag](graph: Graph[VD,ED],maxSteps: Int) = {

  val temp_graph = graph.mapVertices { case (vid,_) => mutable.HashMap[VertexId,(Double,VertexId)](vid -> (1,vid)) }

  def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId,VertexId)],ED]): Iterator[(VertexId,mutable.HashMap[VertexId,VertexId)])] = {
    Iterator((e.srcId,e.dstAttr),(e.dstId,e.srcAttr))
  }

  def mergeMessage(count1: (mutable.HashMap[VertexId,VertexId)]),count2: (mutable.HashMap[VertexId,VertexId)]))= {

    val communityMap = new mutable.HashMap[VertexId,List[(Double,VertexId)]]

    (count1.keySet ++ count2.keySet).map(key => {

      val count1Val = count1.getOrElse(key,(0D,0:VertexId))
      val count2Val = count2.getOrElse(key,0:VertexId))

      communityMap += key->(count1Val::communityMap(key))
      communityMap += key->(count2Val::communityMap(key))

    })
    communityMap
  }

  def vertexProgram(vid: VertexId,attr: mutable.HashMap[VertexId,message: mutable.HashMap[VertexId,VertexId)]]) = {
    if (message.isEmpty)
      attr
    else {
      val labels_score: mutable.HashMap[VertexId,Double] = message.map {
        key =>
          var value_sum = 0D
          var isMemberFlag = 0
          var maxSimilar_result = 0D
          val max_similar = most_similar.filter(x=>x._1==vid)(1)
          if (key._2.exists(x=>x._2==max_similar)) isMemberFlag = 1 else isMemberFlag = 0

          key._2.map {
            values =>
              if (values._2==max_similar) maxSimilar_result = values._1 else maxSimilar_result = 0D

              val temp = broadcastVariable.value(vid)(values._2)._2
              value_sum += values._1 * temp
          }
          value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)
          (key._1,value_sum) //label list
      }


      val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
      val dividedByMax = labels_score.map(x=>(x._1,x._2/max_value)) // divide by maximum value

      val resultMap: mutable.HashMap[VertexId,Double] = new mutable.HashMap[VertexId,Double]
      dividedByMax.foreach{ row => // select labels more than threshold P = 0.5
        if (row._2 >= p) resultMap += row
      }

      val max_for_normalize= resultMap.values.sum
      val res = resultMap.map(x=>(x._1->(x._2/max_for_normalize,x._1))) // Normalize labels

      res
    }
  }

  val initialMessage = mutable.HashMap[VertexId,VertexId)]()

  val overlapCommunitiesGraph = Pregel(temp_graph,initialMessage,maxIterations = maxSteps)(
    vprog = vertexProgram,sendMsg = sendMessage,mergeMsg = mergeMessage)

  overlapCommunitiesGraph
}

val my_graph= GraphLoader.edgeListFile(sc,path)
val new_updated_graph2 = run2(my_graph,1)

在上面的代码中,p=0.5beta=0.5most_similar是包含每个节点及其最重要节点的RDD。例如,(1,3)表示节点3与节点1最相似。broadcatVariable的结构与以下相同:

(19,Map(33 -> (1.399158675718661,0.6335049099178383),34 -> (1.4267350687130098,0.6427405501408145)))

(15,0.6427405501408145)))
...

该结构显示了一个节点(作为键)和其邻居(作为值)之间的关系。例如,节点19与节点33和34相邻,并且它们之间的分数显示了这种关系。

在算法中,每个节点发送每个属性Map,其中包含几个标签及其分数。然后在mergeMessage函数中,将具有相同编号的标签的值放入List中,并在每个标签或键的vertexProgram中处理其列表。

已更新

根据下图中的等式,我使用List来收集Label的不同分数,并在vertexProgram函数中对其进行处理。因为我需要P_ji来处理每个节点的标签分数,所以我不知道是否可以在mergeMessage函数中执行它,或者是否需要在vertexProgram中执行它。 P_ji是源节点与其邻居之间的分数,应与标签分数相乘。

enter image description here

我得到的错误显示在vprog = vertexProgram,行的前面,并在此图中显示。任何人都可以通过解决此错误来帮助我吗?

enter image description here

解决方法

主要问题是您对消息使用两种不同类型。初始消息的类型为mutable.HashMap[VertexId,(Double,VertexId)],但是在合并两个消息(使用mergeMessage函数)后,类型变为mutable.HashMap[VertexId,List[(Double,VertexId)]]。这里的问题是,由于类型错误,现在合并的消息无法与其他消息合并。

有两种解决方法:

  1. 将消息类型更改为mutable.HashMap[VertexId,VertexId)]],确保初始消息与此匹配。
  2. 将消息类型保留为mutable.HashMap[VertexId,VertexId)],并将输出类型mergeMessage更改为匹配。

下面是有关这两种选择的可能解决方案的一些草图。由于实际所需的逻辑不是很清楚(代码中有一些未使用的变量,等等),因此它们内部可能会有一些错误。当与其余代码结合使用时,这两个选项都可以运行,并且将返回一个新图形。 >


解决方案1 ​​

您需要调整sendMessagemergeMessageinitialMessage才能处理列表。可以按照以下步骤进行操作:

def sendMessage(e: EdgeTriplet[Map[VertexId,VertexId)],ED]): Iterator[(VertexId,Map[VertexId,VertexId)]])] = {
  val msg1 = e.dstAttr.map{ case (k,v) => (k,List(v)) }
  val msg2 = e.srcAttr.map{ case (k,List(v)) }
  Iterator((e.srcId,msg1),(e.dstId,msg2))
}

def mergeMessage(count1: Map[VertexId,VertexId)]],count2: Map[VertexId,VertexId)]])= {
  val merged = count1.toSeq ++ count2.toSeq
  val new_message = merged.groupBy(_._1).map{case (k,v.map(_._2).flatten.toList)}
  new_message
}

val initialMessage = Map[VertexId,VertexId)]]()

可能还需要调整messages.isEmpty中的vertexProgram返回。

解决方案2

要使用不带列表的消息,您需要将合并逻辑从vertexProgram移至mergeMessage。我对代码进行了一些简化,因此可能需要进行一些测试。

def mergeMessage(count1: (Map[VertexId,VertexId)]),count2: (Map[VertexId,VertexId)]))= {

  val merged = count1.toSeq ++ count2.toSeq
  val grouped = merged.groupBy(_._1)

  val new_message = grouped.map{ case (key,key_values) =>
    val values = key_values.map(_._2)
    
    val max_similar = most_similar.filter(x => x._1 == key).headOption match {
      case Some(x) => x  
      case _ => -1   // What should happen when there is no match?
    }
  
    val maxSimilar_result = values.filter(v => v._2 == max_similar).headOption match {
      case Some(x) => x._1
      case _ => 0.0
    }
    
    val value_sum = values.map{ v => v._1 * broadcastVariable.value(key)(v._2)._2}.sum
    val res = (beta*value_sum)+((1-beta)*maxSimilar_result)
    (key,(res,key))
  }
  
  new_message.toMap
}

def vertexProgram(vid: VertexId,attr: Map[VertexId,messages: Map[VertexId,VertexId)]) = {
  if (messages.isEmpty){
    attr
  } else { 
    val labels_score = messages.map(m => (m._1,m._2._1))
    val max_value = labels_score.maxBy(x => x._2)._2.toDouble
    val dividedByMax = labels_score.map(x => (x._1,x._2 / max_value)) // divide by maximum value

    // select labels more than threshold P = 0.5
    val resultMap = dividedByMax.filter{ row => row._2 >= p }

    val max_for_normalize= resultMap.values.sum
    val res = resultMap.map(x => (x._1 -> (x._2 / max_for_normalize,x._1))) // Normalize labels

    res
  }
}

注释

  1. 当前在sendMessage中,一条消息被发送到两个节点,而与图边缘的方向无关。这是否正确取决于所需的逻辑。
  2. 我将mutable.HashMap更改为普通(不变)Map。如果可能,始终首选使用不可变选项。
  3. 由于vertexProgram中的逻辑非常复杂,因此解决方案1应该更易于使用。那里还有更多的变量当前不执行任何操作,但也许稍后会使用。如果不可能以迭代方式合并消息(并且您需要一次查看所有消息),则可以使用List

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-