Akka之简单的自定义RPC框架(乞丐版)

关于Akka

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机)。

自定义RPC通信框架(乞丐版)

Akka之简单的自定义RPC框架(乞丐版)

目标

woker能发送成功注册,并定时发送心跳。

master能成功接收注册,并能接收心跳及完成自检。

大体思路

1、提供一个Master,负责woker的任务分配,注册及销毁。

2、提供一个Woker,负责Master分配的任务。需要定时向Master报告状态

3、Master内部提供自检机制,为其检测过期woker并销毁。

大体思路就是这样。下面开始撸代码:

1,消息模板类

  1. package com.itunic.akka
  2. /**
  3.   * Created by itunic.com on 2016/12/12.
  4.   */
  5. trait RemoteMessage extends Serializable
  6. //注册消息
  7. //worker -> master
  8. case class Register(workerId: String, memorys: Int, cores: Int) extends RemoteMessage
  9. //返回注册成功信息 master的连接信息
  10. //master -> worker
  11. case class Registered(ip: String, port: Int) extends RemoteMessage
  12. //报活
  13. //worker -> master
  14. case class TransferHeartbeat(workerId: String) extends RemoteMessage
  15. //heartbeat ->worker
  16. case object SendHeartbeat
  17. //heartbeat->master
  18. case object CheckTimeOutWorker
  19. //master ->worker 重新注册
  20. case object RegisterAgain

2,Woker实体类

  1. package com.itunic.akka
  2. /**
  3.   * Created by itunic.com on 2016/12/13.
  4.   */
  5. class WorkerInfo(workerId: String, memory: Int, cores: Int) {
  6.   //TODO 最后更新日期
  7.   var lastTime: Long = _
  8. }

3,核心类:Master

  1. package com.itunic.akka
  2. import akka.actor.{Actor, ActorSystem, Props}
  3. import com.typesafe.config.ConfigFactory
  4. import scala.concurrent.duration._
  5. import scala.collection.mutable
  6. /**
  7.   * Created by root
  8.   */
  9. class Master(host: String, prot: Int) extends Actor {
  10.   println("constructor invoked")
  11.   var workerMap = new mutable.HashMap[String, WorkerInfo]()
  12.   //检测时间
  13.   val CHECK_TIME = 15000
  14.   override def preStart(): Unit = {
  15.     println("preStart invoked")
  16.     //启动线程
  17.     import context.dispatcher
  18.     context.system.scheduler.schedule(0 millis, CHECK_TIME millis, self, CheckTimeOutWorker)
  19.   }
  20.   // 用于接收消息
  21.   override def receive: Receive = {
  22.     case Register(workerId, memorys, cores) => {
  23.       println("a client connected")
  24.       //判断map是否包含此worker
  25.       if (!workerMap.contains(workerId)) {
  26.         val workerInfo = new WorkerInfo(workerId, memorys, cores)
  27.         workerMap += (workerId -> workerInfo)
  28.       }
  29.       sender ! Registered(host, prot)
  30.     }
  31.     case TransferHeartbeat(workerId) => {
  32.       if (workerMap.contains(workerId)) {
  33.         val info = workerMap(workerId)
  34.         info.lastTime = System.currentTimeMillis()
  35.       } else {
  36.         sender ! RegisterAgain
  37.       }
  38.     }
  39.     //检测
  40.     case CheckTimeOutWorker => {
  41.       val thisTime = System.currentTimeMillis()
  42.       val removeWorker = workerMap.filter(x => thisTime - x._2.lastTime > CHECK_TIME)
  43.       for (i <- removeWorker) {
  44.         workerMap -= i._1
  45.       }
  46.       println(workerMap.size)
  47.     }
  48.     case "hello" => {
  49.       println("hello")
  50.     }
  51.   }
  52. }
  53. object Master {
  54.   def main(args: Array[String]) {
  55.     val host = args(0)
  56.     val port = args(1).toInt
  57.     // 准备配置
  58.     val configStr =
  59.     s"""
  60.        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
  61.        |akka.remote.netty.tcp.hostname = "$host"
  62.        |akka.remote.netty.tcp.port = "$port"
  63.        """.stripMargin
  64.     val config = ConfigFactory.parseString(configStr)
  65.     //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
  66.     val actorSystem = ActorSystem("MasterSystem", config)
  67.     //创建Actor, 起个名字
  68.     val master = actorSystem.actorOf(Props(new Master(host, port)), "Master"//Master主构造器会执行
  69.     master ! "hello" //发送信息
  70.     actorSystem.awaitTermination() //让进程等待着, 先别结束
  71.   }
  72. }

4,核心类:Woker

  1. package com.itunic.akka
  2. import java.util.UUID
  3. import scala.concurrent.duration._
  4. import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
  5. import com.typesafe.config.ConfigFactory
  6. import scala.language.postfixOps
  7. /**
  8.   * Created by itunic.com on 2016/12/13.
  9.   */
  10. class Worker(val masterHost: String, val masterPort: Int) extends Actor {
  11.   var master: ActorSelection = _
  12.   //worker唯一标识
  13.   val workerId = UUID.randomUUID().toString
  14.   //内存
  15.   val memory = 40000
  16.   //cpu核心数
  17.   val cores = 4
  18.   //心跳时间
  19.   val HEARTBEAT_TIME = 10000
  20.   //建立连接
  21.   override def preStart(): Unit = {
  22.     //在master启动时会打印下面的那个协议, 可以先用这个做一个标志, 连接哪个master
  23.     //继承actor后会有一个context, 可以通过它来连接
  24.     master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //需要有/user, Master要和master那边创建的名字保持一致
  25.     master ! Register(workerId, memory, cores)
  26.   }
  27.   override def receive: Receive = {
  28.     //向Master注册。
  29.     case Registered(host, port) => {
  30.       println("a reply form master" + host + ":" + port)
  31.       //启动线程,进行发送心跳
  32.       import context.dispatcher
  33.       context.system.scheduler.schedule(0 millis, HEARTBEAT_TIME millis, self, SendHeartbeat)
  34.     }
  35.     //发送心跳给Master
  36.     case SendHeartbeat => {
  37.       println("from SendHeartbeat by worker")
  38.       master ! TransferHeartbeat(workerId)
  39.     }
  40.     //重新注册
  41.     case RegisterAgain => {
  42.       master ! Register(workerId, memory, cores)
  43.     }
  44.   }
  45. }
  46. object Worker {
  47.   def main(args: Array[String]) {
  48.     val host = args(0)
  49.     val port = args(1).toInt
  50.     val masterHost = args(2)
  51.     val masterPort = args(3).toInt
  52.     // 准备配置
  53.     val configStr =
  54.     s"""
  55.        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
  56.        |akka.remote.netty.tcp.hostname = "$host"
  57.        |akka.remote.netty.tcp.port = "$port"
  58.        """.stripMargin
  59.     val config = ConfigFactory.parseString(configStr)
  60.     //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
  61.     val actorSystem = ActorSystem("WorkerSystem", config)
  62.     actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
  63.     actorSystem.awaitTermination()
  64.   }
  65. }

 

5,Maven pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>com.itunic.akka</groupId>
  7.     <artifactId>my-rpc</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <maven.compiler.source>1.7</maven.compiler.source>
  11.         <maven.compiler.target>1.7</maven.compiler.target>
  12.         <encoding>UTF-8</encoding>
  13.         <scala.version>2.11.8</scala.version>
  14.         <scala.compat.version>2.11</scala.compat.version>
  15.     </properties>
  16.     <dependencies>
  17.         <dependency>
  18.             <groupId>org.scala-lang</groupId>
  19.             <artifactId>scala-library</artifactId>
  20.             <version>${scala.version}</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>com.typesafe.akka</groupId>
  24.             <artifactId>akka-actor_2.11</artifactId>
  25.             <version>2.3.15</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>com.typesafe.akka</groupId>
  29.             <artifactId>akka-remote_2.11</artifactId>
  30.             <version>2.3.15</version>
  31.         </dependency>
  32.     </dependencies>
  33.     <build>
  34.         <sourceDirectory>src/main/scala</sourceDirectory>
  35.         <testSourceDirectory>src/test/scala</testSourceDirectory>
  36.         <plugins>
  37.             <plugin>
  38.                 <groupId>net.alchim31.maven</groupId>
  39.                 <artifactId>scala-maven-plugin</artifactId>
  40.                 <version>3.2.2</version>
  41.                 <executions>
  42.                     <execution>
  43.                         <goals>
  44.                             <goal>compile</goal>
  45.                             <goal>testCompile</goal>
  46.                         </goals>
  47.                         <configuration>
  48.                             <args>
  49.                                 <!--<arg>-make:transitive</arg>-->
  50.                                 <arg>-dependencyfile</arg>
  51.                                 <arg>${project.build.directory}/.scala_dependencies</arg>
  52.                             </args>
  53.                         </configuration>
  54.                     </execution>
  55.                 </executions>
  56.             </plugin>
  57.             <plugin>
  58.                 <groupId>org.apache.maven.plugins</groupId>
  59.                 <artifactId>maven-shade-plugin</artifactId>
  60.                 <version>2.4.3</version>
  61.                 <executions>
  62.                     <execution>
  63.                         <phase>package</phase>
  64.                         <goals>
  65.                             <goal>shade</goal>
  66.                         </goals>
  67.                         <configuration>
  68.                             <filters>
  69.                                 <filter>
  70.                                     <artifact>*:*</artifact>
  71.                                     <excludes>
  72.                                         <exclude>META-INF/*.SF</exclude>
  73.                                         <exclude>META-INF/*.DSA</exclude>
  74.                                         <exclude>META-INF/*.RSA</exclude>
  75.                                     </excludes>
  76.                                 </filter>
  77.                             </filters>
  78.                             <transformers>
  79.                                 <transformer
  80.                                         implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  81.                                     <resource>reference.conf</resource>
  82.                                 </transformer>
  83.                                 <transformer
  84.                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  85.                                     <mainClass>com.itunic.akka.Master</mainClass>
  86.                                 </transformer>
  87.                             </transformers>
  88.                         </configuration>
  89.                     </execution>
  90.                 </executions>
  91.             </plugin>
  92.         </plugins>
  93.     </build>
  94. </project>

 

 

  • Akka之简单的自定义RPC框架(乞丐版)已关闭评论
  • 156 views
  • A+
所属分类:未分类
avatar