关于Akka
Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机)。
自定义RPC通信框架(乞丐版)
目标
woker能发送成功注册,并定时发送心跳。
master能成功接收注册,并能接收心跳及完成自检。
大体思路
1、提供一个Master,负责woker的任务分配,注册及销毁。
2、提供一个Woker,负责Master分配的任务。需要定时向Master报告状态
3、Master内部提供自检机制,为其检测过期woker并销毁。
大体思路就是这样。下面开始撸代码:
1,消息模板类
- package com.itunic.akka
- /**
- * Created by itunic.com on 2016/12/12.
- */
- trait RemoteMessage extends Serializable
- //注册消息
- //worker -> master
- case class Register(workerId: String, memorys: Int, cores: Int) extends RemoteMessage
- //返回注册成功信息 master的连接信息
- //master -> worker
- case class Registered(ip: String, port: Int) extends RemoteMessage
- //报活
- //worker -> master
- case class TransferHeartbeat(workerId: String) extends RemoteMessage
- //heartbeat ->worker
- case object SendHeartbeat
- //heartbeat->master
- case object CheckTimeOutWorker
- //master ->worker 重新注册
- case object RegisterAgain
2,Woker实体类
- package com.itunic.akka
- /**
- * Created by itunic.com on 2016/12/13.
- */
- class WorkerInfo(workerId: String, memory: Int, cores: Int) {
- //TODO 最后更新日期
- var lastTime: Long = _
- }
3,核心类:Master
- package com.itunic.akka
- import akka.actor.{Actor, ActorSystem, Props}
- import com.typesafe.config.ConfigFactory
- import scala.concurrent.duration._
- import scala.collection.mutable
- /**
- * Created by root
- */
- class Master(host: String, prot: Int) extends Actor {
- println("constructor invoked")
- var workerMap = new mutable.HashMap[String, WorkerInfo]()
- //检测时间
- val CHECK_TIME = 15000
- override def preStart(): Unit = {
- println("preStart invoked")
- //启动线程
- import context.dispatcher
- context.system.scheduler.schedule(0 millis, CHECK_TIME millis, self, CheckTimeOutWorker)
- }
- // 用于接收消息
- override def receive: Receive = {
- case Register(workerId, memorys, cores) => {
- println("a client connected")
- //判断map是否包含此worker
- if (!workerMap.contains(workerId)) {
- val workerInfo = new WorkerInfo(workerId, memorys, cores)
- workerMap += (workerId -> workerInfo)
- }
- sender ! Registered(host, prot)
- }
- case TransferHeartbeat(workerId) => {
- if (workerMap.contains(workerId)) {
- val info = workerMap(workerId)
- info.lastTime = System.currentTimeMillis()
- } else {
- sender ! RegisterAgain
- }
- }
- //检测
- case CheckTimeOutWorker => {
- val thisTime = System.currentTimeMillis()
- val removeWorker = workerMap.filter(x => thisTime - x._2.lastTime > CHECK_TIME)
- for (i <- removeWorker) {
- workerMap -= i._1
- }
- println(workerMap.size)
- }
- case "hello" => {
- println("hello")
- }
- }
- }
- object Master {
- def main(args: Array[String]) {
- val host = args(0)
- val port = args(1).toInt
- // 准备配置
- val configStr =
- s"""
- |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = "$port"
- """.stripMargin
- val config = ConfigFactory.parseString(configStr)
- //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
- val actorSystem = ActorSystem("MasterSystem", config)
- //创建Actor, 起个名字
- val master = actorSystem.actorOf(Props(new Master(host, port)), "Master") //Master主构造器会执行
- master ! "hello" //发送信息
- actorSystem.awaitTermination() //让进程等待着, 先别结束
- }
- }
4,核心类:Woker
- package com.itunic.akka
- import java.util.UUID
- import scala.concurrent.duration._
- import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
- import com.typesafe.config.ConfigFactory
- import scala.language.postfixOps
- /**
- * Created by itunic.com on 2016/12/13.
- */
- class Worker(val masterHost: String, val masterPort: Int) extends Actor {
- var master: ActorSelection = _
- //worker唯一标识
- val workerId = UUID.randomUUID().toString
- //内存
- val memory = 40000
- //cpu核心数
- val cores = 4
- //心跳时间
- val HEARTBEAT_TIME = 10000
- //建立连接
- override def preStart(): Unit = {
- //在master启动时会打印下面的那个协议, 可以先用这个做一个标志, 连接哪个master
- //继承actor后会有一个context, 可以通过它来连接
- master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //需要有/user, Master要和master那边创建的名字保持一致
- master ! Register(workerId, memory, cores)
- }
- override def receive: Receive = {
- //向Master注册。
- case Registered(host, port) => {
- println("a reply form master" + host + ":" + port)
- //启动线程,进行发送心跳
- import context.dispatcher
- context.system.scheduler.schedule(0 millis, HEARTBEAT_TIME millis, self, SendHeartbeat)
- }
- //发送心跳给Master
- case SendHeartbeat => {
- println("from SendHeartbeat by worker")
- master ! TransferHeartbeat(workerId)
- }
- //重新注册
- case RegisterAgain => {
- master ! Register(workerId, memory, cores)
- }
- }
- }
- object Worker {
- def main(args: Array[String]) {
- val host = args(0)
- val port = args(1).toInt
- val masterHost = args(2)
- val masterPort = args(3).toInt
- // 准备配置
- val configStr =
- s"""
- |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = "$port"
- """.stripMargin
- val config = ConfigFactory.parseString(configStr)
- //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
- val actorSystem = ActorSystem("WorkerSystem", config)
- actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
- actorSystem.awaitTermination()
- }
- }
5,Maven pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.itunic.akka</groupId>
- <artifactId>my-rpc</artifactId>
- <version>1.0-SNAPSHOT</version>
- <properties>
- <maven.compiler.source>1.7</maven.compiler.source>
- <maven.compiler.target>1.7</maven.compiler.target>
- <encoding>UTF-8</encoding>
- <scala.version>2.11.8</scala.version>
- <scala.compat.version>2.11</scala.compat.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor_2.11</artifactId>
- <version>2.3.15</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote_2.11</artifactId>
- <version>2.3.15</version>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- <configuration>
- <args>
- <!--<arg>-make:transitive</arg>-->
- <arg>-dependencyfile</arg>
- <arg>${project.build.directory}/.scala_dependencies</arg>
- </args>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>reference.conf</resource>
- </transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>com.itunic.akka.Master</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>