scala-akka实现简单的分布式RPC通信框架主从监听,消息发送

简介:模拟用akka实现一个RPC分布式通信框架,实现从机向主机发送心跳,主机判断从机是否挂机,以及消息通信的简单功能。

开发平台:IntelliJ IDEA     Maven架构

项目结构及依赖:


代码2个样例类:

case  class Workinfo (val id:String) extends Serializable{
}
case  class SendHearBeat (val id:String) extends Serializable{
}

Master类:

import akka.actor.{Actor,ActorSystem,Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
class Master extends Actor{
   //保存worker  id
   val workerMessage=new mutable.HashMap[String,Long]
   //worker超时时间
   val WORKEER_TIMEOUT=10 * 1000
   override def preStart(): Unit ={
      //导入隐式转换,用于启动定时器
      import context.dispatcher
     //启动定时
      context.system.scheduler.schedule(0 millis,5000 millis,self,"CheckOfTimeOutWorker")
  }
   override def receive: Receive = {
      //注册woker把workerid 和当前时间存起来
      case  Workinfo(id)=>{
         if(!workerMessage.contains(id)) {
            workerMessage.put(id,System.currentTimeMillis())
         }
         println("注册worker:"+id)
         sender ! "MasterReply"
      }
      //接收worker心跳,更新接收时间
      case SendHearBeat(id)=>{
         workerMessage(id)=System.currentTimeMillis()
         println("接收到worker:"+id+"的心跳报告。")
      }
      //Master自己向自己发送的定期检查超时Worker的消息
      case  "CheckOfTimeOutWorker" =>{
         val currentTime = System.currentTimeMillis()
         //过滤出超时的worker
         val outTimeWorker=workerMessage.filter(m=>currentTime-m._2>WORKEER_TIMEOUT).toArray
         for(worker<-outTimeWorker){
            workerMessage.remove(worker._1)
         }
         println("woker数量:"+outTimeWorker.size+"个,挂了")
      }
   }
}
object  Master{
   def main(args: Array[String]): Unit = {
      //接受参数
      val host="169.254.189.184"
      val port="8888"
      //配置信息
      val configStr=
         s"""
           |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
           |akka.remote.netty.tcp.hostname = "$host"
           |akka.remote.netty.tcp.port = "$port"
         """.stripMargin
     val config=ConfigFactory.parseString(configStr)
      //ActorSystem,辅助创建和监控下面的Actor,单例
      val actorSystem=ActorSystem("MasterSystem",config)
      //执行mastor生命周期方法
      val master= actorSystem.actorOf(Props(new Master),"Master")
      actorSystem.awaitTermination()
   }
}

Worker类:

import java.util.UUID
import akka.actor.{Actor,ActorSelection,Props}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
class Worker(val masterHost:String,val masterPort:String) extends Actor{
   var master:ActorSelection=_
   //生成uuid作为worker标识
   val id=UUID.randomUUID().toString
   //建立连接,链接Master
   override def preStart(): Unit ={
      //Master就是你前面给master起的名字
      master=context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
     //id和其他信息封装到一个类中,把这个类发送过去,注意这个类要序列化
      master ! Workinfo(id)
   }
   override def receive: Receive = {
      //接收master返回来的信息后启动定时
      case "MasterReply"=>{
         println("a reply from master")
         //导入隐式转换,用于启动定时器
         import context.dispatcher
         //启动定时任务,通过case回调SendHearBeat向master发送心跳(id标识)
         context.system.scheduler.schedule(0 millis,"SendHeartBeat")
      }
      case  "SendHearBeat"=>{
         println("worker send heartbeat")
         master ! SendHearBeat(id)
      }
   }
}
object Worker{
   def main(args: Array[String]): Unit = {
      //接受参数
      val workerHost="169.254.189.184"
      val workerPort="9999"
      val masterHost="169.254.189.184"
      val masterPort="8888"
      //配置信息
      val configStr=
         s"""
            |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
            |akka.remote.netty.tcp.hostname = "$workerHost"
            |akka.remote.netty.tcp.port = "$workerPort"
         """.stripMargin
      val config=ConfigFactory.parseString(configStr)
      //ActorSystem,单例
      val actorSystem=ActorSystem("WorkerSystem",config)
      //启动Actor,woker会被实例化,生命周期方法会被调用
      actorSystem.actorOf(Props(new Worker(masterHost,masterPort)),"Worker")
   }
}

启动Master控制台打印:

"C:\Program Files (x86)\Java\jdk1.7.0_01\bin\java" "-javaagent:D:\Program Files (x86)\IntelliJ IDEA 2017.3.2\lib\idea_rt.jar=54133:D:\Program Files (x86)\IntelliJ IDEA 2017.3.2\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\charsets.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\deploy.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\ext\dnsns.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\ext\localedata.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\ext\sunec.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\ext\sunjce_provider.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\ext\sunmscapi.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\ext\sunpkcs11.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\ext\zipfs.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\javaws.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\jce.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\jsse.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\management-agent.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\plugin.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\resources.jar;C:\Program Files (x86)\Java\jdk1.7.0_01\jre\lib\rt.jar;D:\Program Files (x86)\IntelliJ IDEA 2017.3.2\scalaspace\rpcScala\target\classes;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.10.6\scala-library-2.10.6.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.10\2.3.14\akka-actor_2.10-2.3.14.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.2.1\config-1.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-remote_2.10\2.3.14\akka-remote_2.10-2.3.14.jar;C:\Users\Administrator\.m2\repository\io\netty\netty\3.8.0.Final\netty-3.8.0.Final.jar;C:\Users\Administrator\.m2\repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;C:\Users\Administrator\.m2\repository\org\uncommons\maths\uncommons-maths\1.2.2a\uncommons-maths-1.2.2a.jar" Master
[INFO] [01/06/2018 13:44:54.602] [main] [Remoting] Starting remoting
[INFO] [01/06/2018 13:44:55.338] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://MasterSystem@169.254.189.184:8888]
[INFO] [01/06/2018 13:44:55.341] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://MasterSystem@169.254.189.184:8888]
woker数量:0个,挂了
woker数量:0个,挂了
woker数量:0个,挂了
woker数量:0个,挂了

启动Woker后Master控制台打印:(看见worker注册了)

woker数量:0个,挂了
woker数量:0个,挂了
注册worker:9e8ec9e7-6014-491f-816e-94d0f65eb068
woker数量:0个,挂了
woker数量:0个,挂了
worker控制台打印:

a reply from master

关掉Worker后Master控制台打印:

woker数量:0个,挂了
woker数量:0个,挂了
woker数量:1个,挂了
woker数量:0个,挂了
可以看到关掉worker后Master监控到有一台worker挂掉了。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 TwemojiTwemoji是Twitter开源的其完整的Emoji表情图片。开发者可以去GitHub下载完整的表情库,并把这些表情加入到自己的应用或网页中。使用示例:var i = 0;twemoji.parse(  ’emoji, m\u276
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即两个对象在内存中的首地址;equals比较字符串中所包含的内容是否相同。publicstaticvoidmain(String[]args){​ Strings1="abc"; Strings2=newString("abc");​ System.out.println(s1==s2)
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala怎么使用”吧!语法scala...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相信很多人在Scala是一种什么语言问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相信很多人在Scala Trait怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日常操作中,相信很多人在Scala类型检查与模式匹配怎么使用问题上存在疑惑,小编查阅了各式资料,整理...
这篇文章主要介绍“scala中常用但不常见的符号有哪些”,在日常操作中,相信很多人在scala中常用但不常见的符号有哪些问题上存在疑惑,小编查阅了各式资料,整理...
本篇内容主要讲解“Scala基础知识有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala基础知识有哪些”...
本篇内容介绍了“scala基础知识点有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧...
本篇内容介绍了“Scala下划线怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧...
本篇内容主要讲解“Scala提取器怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala提取器怎么使用”...
这篇文章主要讲解了“Scala基础语法有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Scala基础语法有...
本篇内容主要讲解“Scala方法与函数怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala方法与函数怎...
这篇文章主要讲解了“scala条件控制与循环怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“scala条...
这篇文章主要介绍“scala函数怎么定义和调用”,在日常操作中,相信很多人在scala函数怎么定义和调用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操
这篇文章主要介绍“scala如何声明变量”,在日常操作中,相信很多人在scala如何声明变量问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对...
这篇文章主要讲解了“scala的Map和Tuple怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“scala的Ma...
这篇文章主要介绍“scala的隐式参数有什么作用”,在日常操作中,相信很多人在scala的隐式参数有什么作用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的...
本篇内容主要讲解“Scala怎么进行文件写操作”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Scala怎么进行文件...
这篇文章主要讲解了“Scala怎么声明数组”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Scala怎么声明数组...