概述
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。
实现思路
大体思路是这样的:
首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream
将请求信息(接口信息,方法,参数)封装,通过socket传输。
其次Producer接到ObjectInputStream
,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。
最后Producer利用反射将业务处理完毕后,用ObjectOutputStream
将结果封装,通过socket返回数据。Consumer接收到返回数据。
具体代码实现
- //RPCServer实现
- RPCServer.java
- package com.itunic.rpc;
- import java.io.IOException;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- import java.lang.reflect.Method;
- import java.net.InetSocketAddress;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- /**
- *
- * RPC服务端<br>
- * 基于SocketServer&反射实现
- *
- * @ClassName RPCServer
- * @author yinbin
- * @website https://itunic.com
- * @Date 2017年6月23日 上午10:59:50
- * @version 1.0.0
- */
- public class RPCServer {
- private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
- private static final ExecutorService exec = Executors
- .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- private Integer port;
- private Boolean isRunning = true;
- public RPCServer(int port) {
- this.port = port;
- }
- public void register(Class<?> serviceInterface, Class<?> impl) {
- serviceMap.put(serviceInterface.getName(), impl);
- }
- public void start() throws IOException {
- ServerSocket ss = new ServerSocket();
- try {
- ss.bind(new InetSocketAddress(port));
- while (isRunning) {
- exec.execute(new ServiceTask(ss.accept()));
- }
- } finally {
- ss.close();
- }
- }
- private class ServiceTask implements Runnable {
- Socket socket = null;
- public ServiceTask(Socket socket) {
- this.socket = socket;
- }
- @Override
- public void run() {
- ObjectInputStream input = null;
- ObjectOutputStream output = null;
- try {
- input = new ObjectInputStream(socket.getInputStream());
- String serviceName = input.readUTF();
- String methodName = input.readUTF();
- Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
- Object[] arguments = (Object[]) input.readObject();
- Class<?> serviceClass = serviceMap.get(serviceName);
- if (serviceClass == null) {
- throw new ClassNotFoundException(serviceName + " not found");
- }
- Method method = serviceClass.getMethod(methodName, parameterTypes);
- Object result = method.invoke(serviceClass.newInstance(), arguments);
- output = new ObjectOutputStream(socket.getOutputStream());
- output.writeObject(result);
- output.flush();
- } catch (Exception e) {
- // TODO 自动生成的 catch 块
- e.printStackTrace();
- } finally {
- if (null != output) {
- try {
- output.close();
- } catch (IOException e) {
- // TODO 自动生成的 catch 块
- e.printStackTrace();
- }
- }
- if (input != null) {
- try {
- input.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
- //RPCClient实现
- RPCClient.java
- package com.itunic.rpc;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
- import java.net.InetSocketAddress;
- import java.net.Socket;
- /**
- *
- * RPC客户端 基于动态代理&socket实现
- *
- * @ClassName RPCClient
- * @author yinbin
- * @website https://itunic.com
- * @Date 2017年6月23日 上午10:59:04
- * @version 1.0.0
- */
- public class RPCClient {
- @SuppressWarnings("unchecked")
- public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
- return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
- new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- Socket socket = null;
- ObjectOutputStream output = null;
- ObjectInputStream input = null;
- try {
- socket = new Socket();
- socket.connect(addr);
- output = new ObjectOutputStream(socket.getOutputStream());
- output.writeUTF(serviceInterface.getName());
- output.writeUTF(method.getName());
- output.writeObject(method.getParameterTypes());
- output.writeObject(args);
- input = new ObjectInputStream(socket.getInputStream());
- return input.readObject();
- } finally {
- if (socket != null)
- socket.close();
- if (output != null)
- output.close();
- if (input != null)
- input.close();
- }
- }
- });
- }
- }
- package com.itunic.rpc;
- import java.io.IOException;
- /**
- *
- * RPC服务端启动
- * @ClassName RPCServerAction
- * @author c
- * @Date 2017年6月23日 下午5:12:27
- * @version 1.0.0
- */
- public class RPCServerAction {
- public static void main(String[] args) throws IOException {
- RPCServer server = new RPCServer(8888);
- /**
- * 注册相关的接口实现类
- */
- server.register(TestHello.class, TestHelloImpl.class);
- server.start();
- }
- }
总结
RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。