RPC(Remote Promote Call) 一种进程间通信方式。允许像调用本地服务一样调用远程服务。
RPC框架的主要目标就是让远程服务调用更简单、透明。RPC框架负责屏蔽底层的传输方式(TCP或者UDP)、序列化方式(XML/JSON/二进制)和通信细节。开发人员在使用的时候只需要了解谁在什么位置提供了什么样的远程服务接口即可,并不需要关心底层通信细节和调用过程。
RPC框架实现的几个核心技术点:
(1)远程提供者需要以某种形式提供服务调用相关的信息,包括但不限于服务接口定义、数据结构、或者中间态的服务定义文件。例如Facebook的 Thrift的IDL文件,Web service的WSDL文件;服务的调用者需要通过一定的图景获取远程服务调用相关的信息。
(2)远程代理对象:服务调用者用的服务实际是远程服务的本地代理。说白了就是通过动态代理来实现。
(3)通信:RPC框架与具体的协议无关。
(4)序列化:毕竟是远程通信,需要将对象转化成二进制流进行传输。不同的RPC框架应用的场景不同,在序列化上也会采取不同的技术
图文理解:
上述图示,源自对以下代码的简单理解:(这是代码提供者,博主的链接,有兴趣可以看一下:https://www.cnblogs.com/ChrisMurphy/p/6550184.html)
package Server;
public interface EchoService {
String echo(String ping);
}
package Server;
public class EchoServiceImpl implements EchoService{
@Override
public String echo(String ping) {
// TODO Auto-generated method stub
return ping !=null?ping+"--> I am ok.":"I am bad.";
}
}
package Exporter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* RPC服务端发布者
* 作为服务端,监听客户端的TCP连接,接收到新的客户端连接之后,将其封装成Task,由线程池执行
* 将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
* 将执行结果对象发序列化,通过Socket发送给客户端
* 远程调用完成之后,释放Socket等连接资源,防止句柄泄露
* @author Administrator
*
*/
public class RpcExporter {
//创建一个可重用固定线程数的线程池
//Runtime.getRuntime().availableProcessors()返回虚拟机可用的处理器数量
static Executor executor=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void exporter(String hostname,int port) throws IOException {
//创建一个监听特定端口的Serversocket,负责接收客户连接请求
ServerSocket server = new ServerSocket();
//绑定主机名端口号
server.bind(new InetSocketAddress(hostname,port));
try{
while(true)
{
executor.execute(new ExporterTask(server.accept()));
}
}finally
{
server.close();
}
}
private static class ExporterTask implements Runnable{
Socket client=null;
public ExporterTask(Socket client){
this.client=client;
}
@Override
public void run() {
// TODO Auto-generated method stub
ObjectInputStream input=null;
ObjectOutputStream output=null;
try{
//获取输入流
input=new ObjectInputStream(client.getInputStream());
//获取调用的接口名
String interfaceName = input.readUTF();
//加载接口
Class<?> service = Class.forName(interfaceName);
//获取调用的方法名
String methodName = input.readUTF();
//获取方法返回类型
Class<?>[] ParameterTypes = (Class<?>[]) input.readObject();
//获取参数
Object[] arguments = (Object[]) input.readObject();
//通过反射获取方法
Method method = service.getMethod(methodName, ParameterTypes);
//通过反射调用方法
Object result = method.invoke(service.newInstance(), arguments);
output = new ObjectOutputStream(client.getOutputStream());
output.writeObject(result);
}catch(Exception e){
e.printStackTrace();
}
finally{
if(output != null)
try{
output.close();
}catch ( IOException e){
e.printStackTrace();
}
if(input !=null)
try{
input.close();
}catch(IOException e){
e.printStackTrace();
}
if(client != null)
try{
client.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
}
}
package Client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
*本地服务代理
*将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
*创建Socket客户端,根据指定地址连接远程服务提供者
*将远程服务调用所需要的接口类,方法名,参数列表等编码参数发送给服务提供者
*同步阻塞等待服务端返回应答,获取应答之后返回
* @author Administrator
*
* @param <S>
*/
public class RpcImporter<S> {
@SuppressWarnings("unchecked")
public S importer(final Class<?> serviceClass,final InetSocketAddress addr)
{
return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[] {serviceClass.getInterfaces()[0]}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO Auto-generated method stub
Socket socket =null;
ObjectOutputStream output = null;
ObjectInputStream input = null;
try{
socket = new Socket();
socket.connect(addr);
//将远程服务调用所需要的接口类,方法名,参数列表等编码参数发送给服务提供者
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceClass.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
//同步阻塞等待服务端返回应答,获取应答之后返回
input= new ObjectInputStream(socket.getInputStream());
return input.readObject();
}
finally{
if(socket != null)
socket.close();
if(output != null)
output.close();
if(input != null)
input.close();
}
}
});
}
}
package test;
import java.net.InetSocketAddress;
import Client.RpcImporter;
import Exporter.RpcExporter;
import Server.EchoService;
import Server.EchoServiceImpl;
public class run {
public static void main(String[] args) {
// TODO Auto-generated method stub
//创建异步发布服务端的线程并启动,用于接受PRC客户端的请求,根据请求参数调用服务实现类,返回结果给客户端
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try{
RpcExporter.exporter("localhost", 8088);
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
//创建客户端服务代理类,构造RPC求情参数,发起RPC调用
RpcImporter<EchoService> importer=new RpcImporter<EchoService>();
EchoService echo = importer.importer(EchoServiceImpl.class, new InetSocketAddress("localhost",8088));
System.out.println(echo.echo("Are u ok?"));
}
}