利用动态代理&反射&socket实现简单的RPC通信

摘 要

利用动态代理&反射&socket实现简单的RPC通信

概述

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。

实现思路

大体思路是这样的:

首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream将请求信息(接口信息,方法,参数)封装,通过socket传输。

其次Producer接到ObjectInputStream,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。

最后Producer利用反射将业务处理完毕后,用ObjectOutputStream将结果封装,通过socket返回数据。Consumer接收到返回数据。

具体代码实现

  1. //RPCServer实现
  2. RPCServer.java
  1. package com.itunic.rpc;
  2. import java.io.IOException;
  3. import java.io.ObjectInputStream;
  4. import java.io.ObjectOutputStream;
  5. import java.lang.reflect.Method;
  6. import java.net.InetSocketAddress;
  7. import java.net.ServerSocket;
  8. import java.net.Socket;
  9. import java.util.Map;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. import java.util.concurrent.ExecutorService;
  12. import java.util.concurrent.Executors;
  13. /**
  14.  * 
  15.  * RPC服务端<br>
  16.  * 基于SocketServer&反射实现
  17.  * 
  18.  * @ClassName RPCServer
  19.  * @author yinbin
  20.  * @website https://itunic.com
  21.  * @Date 2017年6月23日 上午10:59:50
  22.  * @version 1.0.0
  23.  */
  24. public class RPCServer {
  25.     private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
  26.     private static final ExecutorService exec = Executors
  27.             .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  28.     private Integer port;
  29.     private Boolean isRunning = true;
  30.     public RPCServer(int port) {
  31.         this.port = port;
  32.     }
  33.     public void register(Class<?> serviceInterface, Class<?> impl) {
  34.         serviceMap.put(serviceInterface.getName(), impl);
  35.     }
  36.     public void start() throws IOException {
  37.         ServerSocket ss = new ServerSocket();
  38.         try {
  39.             ss.bind(new InetSocketAddress(port));
  40.             while (isRunning) {
  41.                 exec.execute(new ServiceTask(ss.accept()));
  42.             }
  43.         } finally {
  44.             ss.close();
  45.         }
  46.     }
  47.     private class ServiceTask implements Runnable {
  48.         Socket socket = null;
  49.         public ServiceTask(Socket socket) {
  50.             this.socket = socket;
  51.         }
  52.         @Override
  53.         public void run() {
  54.             ObjectInputStream input = null;
  55.             ObjectOutputStream output = null;
  56.             try {
  57.                 input = new ObjectInputStream(socket.getInputStream());
  58.                 String serviceName = input.readUTF();
  59.                 String methodName = input.readUTF();
  60.                 Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
  61.                 Object[] arguments = (Object[]) input.readObject();
  62.                 Class<?> serviceClass = serviceMap.get(serviceName);
  63.                 if (serviceClass == null) {
  64.                     throw new ClassNotFoundException(serviceName + " not found");
  65.                 }
  66.                 Method method = serviceClass.getMethod(methodName, parameterTypes);
  67.                 Object result = method.invoke(serviceClass.newInstance(), arguments);
  68.                 output = new ObjectOutputStream(socket.getOutputStream());
  69.                 output.writeObject(result);
  70.                 output.flush();
  71.             } catch (Exception e) {
  72.                 // TODO 自动生成的 catch 块
  73.                 e.printStackTrace();
  74.             } finally {
  75.                 if (null != output) {
  76.                     try {
  77.                         output.close();
  78.                     } catch (IOException e) {
  79.                         // TODO 自动生成的 catch 块
  80.                         e.printStackTrace();
  81.                     }
  82.                 }
  83.                 if (input != null) {
  84.                     try {
  85.                         input.close();
  86.                     } catch (IOException e) {
  87.                         e.printStackTrace();
  88.                     }
  89.                 }
  90.                 if (socket != null) {
  91.                     try {
  92.                         socket.close();
  93.                     } catch (IOException e) {
  94.                         e.printStackTrace();
  95.                     }
  96.                 }
  97.             }
  98.         }
  99.     }
  100. }
  1. //RPCClient实现
  2. RPCClient.java
  1. package com.itunic.rpc;
  2. import java.io.ObjectInputStream;
  3. import java.io.ObjectOutputStream;
  4. import java.lang.reflect.InvocationHandler;
  5. import java.lang.reflect.Method;
  6. import java.lang.reflect.Proxy;
  7. import java.net.InetSocketAddress;
  8. import java.net.Socket;
  9. /**
  10.  * 
  11.  * RPC客户端 基于动态代理&socket实现
  12.  * 
  13.  * @ClassName RPCClient
  14.  * @author yinbin
  15.  * @website https://itunic.com
  16.  * @Date 2017年6月23日 上午10:59:04
  17.  * @version 1.0.0
  18.  */
  19. public class RPCClient {
  20.     @SuppressWarnings("unchecked")
  21.     public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
  22.         return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
  23.                 new InvocationHandler() {
  24.                     @Override
  25.                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  26.                         Socket socket = null;
  27.                         ObjectOutputStream output = null;
  28.                         ObjectInputStream input = null;
  29.                         try {
  30.                             socket = new Socket();
  31.                             socket.connect(addr);
  32.                             output = new ObjectOutputStream(socket.getOutputStream());
  33.                             output.writeUTF(serviceInterface.getName());
  34.                             output.writeUTF(method.getName());
  35.                             output.writeObject(method.getParameterTypes());
  36.                             output.writeObject(args);
  37.                             input = new ObjectInputStream(socket.getInputStream());
  38.                             return input.readObject();
  39.                         } finally {
  40.                             if (socket != null)
  41.                                 socket.close();
  42.                             if (output != null)
  43.                                 output.close();
  44.                             if (input != null)
  45.                                 input.close();
  46.                         }
  47.                     }
  48.                 });
  49.     }
  50. }
  1. package com.itunic.rpc;
  2. import java.io.IOException;
  3. /**
  4.  * 
  5.  *  RPC服务端启动
  6.  * @ClassName RPCServerAction
  7.  * @author c
  8.  * @Date 2017年6月23日 下午5:12:27
  9.  * @version 1.0.0
  10.  */
  11. public class RPCServerAction {
  12.     public static void main(String[] args) throws IOException {
  13.         RPCServer server = new RPCServer(8888);
  14.         /**
  15.          * 注册相关的接口实现类
  16.          */
  17.         server.register(TestHello.class, TestHelloImpl.class);
  18.         server.start();
  19.     }
  20. }

总结

RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。

  • 利用动态代理&反射&socket实现简单的RPC通信已关闭评论
  • 201 views
  • A+
所属分类:Java
avatar