简介:什么是RPC?
RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。
RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。
Netty如何实现轻量级RPC?
将为您揭晓开发轻量级分布式 RPC 框架的具体过程,该框架基于 TCP 协议,提供了 NIO 特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。
首先我们需要的技术栈:
- Spring:它是最强大的依赖注入框架,也是业界的权威标准。
- Netty:它使 NIO 编程更加容易,屏蔽了 Java 底层的 NIO 细节。
- Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
- ZooKeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。
1:首先我们编写rpc服务接口。
rpc服务接和一般的接口没有啥区别,简单的服务接口,传入不同的参数。返回数据。
如下:
/**
* @author twjitm- [Created on 2018-08-20 10:24]
* @jdk java version "1.8.0_77"
*/
public interface IHelloWorld {
String getHelloWorld(int number);
}
实现类:
/**
* @author twjitm - [Created on 2018-08-20 10:26]
* @jdk java version "1.8.0_77"
* 这个地方一定是注入的是业务对象的接口~~~~~~~~~~~~~~~~
*/
@NettyRpcServiceAnnotation(IHelloWorld.class)
@Repository
public class HelloWorldImpl implements IHelloWorld {
private Logger logger = LoggerUtils.getLogger(HelloWorldImpl.class);
@Override
public String getHelloWorld(int number) {
StringBuilder builder = new StringBuilder();
for (int i = number; i > 0; i--) {
builder.append("helloworld");
builder.append(i);
}
System.out.println("rpc 远程调用方法成功。。。。。,即将返回给远程客户端调用");
return builder.toString();
}
}
其中NettyRpcServiceAnnotation 注解表示将这个接口是一个远程服务调用接口。注入一个class类对象。NettyRpcServiceAnnotation .java 如下表示:
/**
* Created by IntelliJ IDEA.
* User: twjitm Date: 2018/8/19 Time: 13:19
* https://blog.csdn.net/baidu_23086307
* rpc service annotation
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface NettyRpcServiceAnnotation {
Class<?> value();
}
2、编写RPC协议编解码
我们都知道,Netty提供了很多优秀的编解码器,我们能够利用系统提供的编解码快速的编写网络层应用,把更多的心思关注到我们的业务中,同样,我们编写一套适合我们自己的rpc协议编解码器。
- 2.1编码器
正如前面说的,我们利用Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
所以我们编写一套序列化工厂。
/**
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 9:23
* https://blog.csdn.net/baidu_23086307
* netty net rpc serialize ,use rpc message
*/
@Service
public class NettyProtoBufRpcSerialize implements INettyRpcSerialize {
Logger logger = LoggerUtils.getLogger(NettyProtoBufRpcSerialize.class);
private Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
private Objenesis objenesis = new ObjenesisStd(true);
@SuppressWarnings("unchecked")
private <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
if (schema != null) {
cachedSchema.put(cls, schema);
}
}
return schema;
}
/**
* 序列化(对象 -> 字节数组)
*/
@Override
@SuppressWarnings("unchecked")
public <T> byte[] serialize(T obj) {
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
logger.info("SERIALIZE RPC MESSAGE SUCCESSFUL CLASS IS " +obj.getClass());
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化(字节数组 -> 对象)
*/
@Override
public <T> T deserialize(byte[] data, Class<T> cls) {
try {
T message = (T) objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
logger.info("DESERIALIZE RPC MESSAGE SUCCESSFUL CLASS IS " +cls.getClass());
return message;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
/**
* 生成对象
*/
public <T> T newInstance(Class<T> cls) {
try {
T message = (T) objenesis.newInstance(cls);
return message;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
rpc协议编码器 NettyNetMessageRPCDecoder
/**
* @author twjitm- [Created on 2018-08-17 18:37]
* @jdk java version "1.8.0_77"
*/
public class NettyNetMessageRPCDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public NettyNetMessageRPCDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
/*if (dataLength <= 0) {
ctx.close();
}*/
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
NettyProtoBufRpcSerialize serialize = SpringServiceManager.getSpringLoadService().getNettyProtoBufRpcSerialize();
Object obj = serialize.deserialize(data, genericClass);
out.add(obj);
}
}
2.2rpc解码器
rpc解码器和编码器是对应的,我们必须要有一套完整的编码器和解码器。缺一不可。
/**
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 9:55
* https://blog.csdn.net/baidu_23086307
*/
public class NettyNetMessageRPCEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public NettyNetMessageRPCEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
NettyProtoBufRpcSerialize serialize = SpringServiceManager.getSpringLoadService().getNettyProtoBufRpcSerialize();
byte[] data = serialize.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
到此我们准备工作做完了,我们需要编写rpc服务器了。rpc服务器和Netty编写普通服务器其实差别不是很大。只不过在编码器和解码器使用的不同而已。
3、rpc服务器
rpc服务器为了提供远程调用接口所做的服务。客户端发起rpc远程调用的时候必须要开放对应的服务器端口。否者没法访问rpc服务。所以,我们编写一套基于netty的rpc服务器。利用Netty提供的网络支持。所以很容易编写。而且代码非常简洁。
下面rpc服务器
为了提高代码的复用,我们将服务抽取成一个接口,传入不同的ip,port等基层信息,构造一个服务器。
package com.twjitm.core.bootstrap.tcp;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.core.bootstrap.AbstractNettyGameBootstrapService;
import com.twjitm.core.utils.logs.LoggerUtils;
import com.twjitm.threads.thread.NettyThreadNameFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.log4j.Logger;
import java.net.InetSocketAddress;
/**
* 抽象的tcp协议服务启动类,
* 本类提供了启动tcp协议服务的抽象类,主要用来实现启动启动服务器,
* 而具体的操作需要由子类来实现。实现类需要将基本信息传递进来
* 才可以成功的启动服务。
* <h3>不要把这个类和{@link com.twjitm.core.bootstrap.udp.AbstractNettyGameBootstrapUdpService},
* {@link com.twjitm.core.bootstrap.http.AbstractNettyGameBootstrapHttpService}类相互调用<h3/>
*
* <h3>服务器启动过程<h3/>
* <pre>
* {@code}
* listenIntoGroup = new NioEventLoopGroup(1, bossNettyThreadNameFactory);
* progressGroup = new NioEventLoopGroup(0, workerNettyThreadNameFactory);
* ServerBootstrap bootstrap = new ServerBootstrap();
* bootstrap.group(listenIntoGroup, progressGroup)
* .channel(NioServerSocketChannel.class)
* .childHandler(channelInitializer)
* .option(ChannelOption.SO_BACKLOG, 128)
* .childOption(ChannelOption.SO_KEEPALIVE, true);
* ChannelFuture channelFuture;
* try {
* channelFuture = bootstrap.bind(this.serverIp, this.serverPort).sync();
* logger.info("[---------------------" + serverName + " SERVICE START IS SUCCESSFUL IP=[" + serverIp + "]LISTENER PORT NUMBER IS :[" + serverPort + "]------------]");
* channelFuture.channel().closeFuture().sync();
* } catch (InterruptedException e) {
* logger.error(serverName + "START HAVE ERROR ,WILL STOP");
* SpringServiceManager.shutdown();
* e.printStackTrace();
* logger.error(e);
* } finally {
* listenIntoGroup.shutdownGracefully();
* progressGroup.shutdownGracefully();
* logger.info(serverName + "SERVER WORLD STOP");
* }
*
* @author 文江
* @date 2018/4/16
*/
public abstract class AbstractNettyGameBootstrapTcpService extends AbstractNettyGameBootstrapService {
private static Logger logger = LoggerUtils.getLogger(AbstractNettyGameBootstrapTcpService.class);
private int serverPort;
private String serverIp;
private String serverName;
private NettyThreadNameFactory bossNettyThreadNameFactory;
private NettyThreadNameFactory workerNettyThreadNameFactory;
private ChannelInitializer channelInitializer;
private EventLoopGroup listenIntoGroup;
private EventLoopGroup progressGroup;
public AbstractNettyGameBootstrapTcpService(int serverPort,
String serverIp,
String bossTreadName,
String workerTreadName,
ChannelInitializer channelInitializer,
String serverName) {
super(serverPort, new InetSocketAddress(serverIp, serverPort));
this.serverIp = serverIp;
this.serverPort = serverPort;
this.bossNettyThreadNameFactory = new NettyThreadNameFactory(bossTreadName);
this.workerNettyThreadNameFactory = new NettyThreadNameFactory(workerTreadName);
this.channelInitializer = channelInitializer;
this.serverName = serverName;
}
@Override
public void startServer() {
listenIntoGroup = new NioEventLoopGroup(1, bossNettyThreadNameFactory);
progressGroup = new NioEventLoopGroup(0, workerNettyThreadNameFactory);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(listenIntoGroup, progressGroup)
.channel(NioServerSocketChannel.class)
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture;
try {
channelFuture = bootstrap.bind(this.serverIp, this.serverPort).sync();
logger.info("[---------------------" + serverName + " SERVICE START IS SUCCESSFUL IP=[" + serverIp + "]LISTENER PORT NUMBER IS :[" + serverPort + "]------------]");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error(serverName + "START HAVE ERROR ,WILL STOP");
SpringServiceManager.shutdown();
e.printStackTrace();
logger.error(e);
} finally {
listenIntoGroup.shutdownGracefully();
progressGroup.shutdownGracefully();
logger.info(serverName + "SERVER WORLD STOP");
}
}
@Override
public void stopServer() throws Throwable {
listenIntoGroup.shutdownGracefully();
progressGroup.shutdownGracefully();
}
}
rpc服务器初始化
package com.twjitm.core.bootstrap.rpc;
import com.twjitm.core.bootstrap.tcp.AbstractNettyGameBootstrapTcpService;
import io.netty.channel.ChannelInitializer;
/**
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 10:03
* https://blog.csdn.net/baidu_23086307
*/
public class NettyGameBootstrapRpcService extends AbstractNettyGameBootstrapTcpService {
public NettyGameBootstrapRpcService(int serverPort, String serverIp, String bossTreadName, String workerTreadName, ChannelInitializer channelInitializer,String serverName) {
super(serverPort, serverIp, bossTreadName, workerTreadName, channelInitializer,serverName);
}
@Override
public void startServer() {
super.startServer();
}
@Override
public void stopServer() throws Throwable {
super.stopServer();
}
}
每个服务都有自己的initial 用于初始化一些组件。同样我们也为rpc服务器编写一个initial。
/**
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 10:19
* https://blog.csdn.net/baidu_23086307
*/
public class NettyRpcMessageServerInitializer extends ChannelInitializer<NioSocketChannel> {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
int maxLength = Integer.MAX_VALUE;
pipeline.addLast("frame", new LengthFieldBasedFrameDecoder(maxLength, 0, 4, 0, 0));
pipeline.addLast("decoder", new NettyNetMessageRPCDecoder(NettyRpcRequestMessage.class));
pipeline.addLast("encoder", new NettyNetMessageRPCEncoder(NettyRpcResponseMessage.class));
int readerIdleTimeSeconds = 0;
int writerIdleTimeSeconds = 0;
int allIdleTimeSeconds = GlobalConstants.NettyNet.SESSION_HEART_ALL_TIMEOUT;
pipeline.addLast("idleStateHandler", new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds));
pipeline.addLast("logger", new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("handler", new NettyNetRPCServerHandler());
}
}
对应的handler NettyNetRPCServerHandler,handler主要的功能是将收到的rpc请求消息分发给不同的处理器,通过反射注解的方式。把每条rpc请求对应的业务逻辑代码对应起来。最后将结果返回NettyRpcRequestMessage 。
package com.twjitm.core.common.handler.rpc;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcResponseMessage;
import com.twjitm.core.common.service.rpc.service.NettyRemoteRpcHandlerService;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.core.utils.logs.LoggerUtils;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.log4j.Logger;
/**
* rpc远程登陆后远程消息处理器,
* 主要用于客户端请求之后消息处理。需要分发到具体的rpc服务类中
* <p>
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 10:22
* https://blog.csdn.net/baidu_23086307
* <p>
* handler rpc message
*/
public class NettyNetRPCServerHandler extends SimpleChannelInboundHandler<NettyRpcRequestMessage> {
Logger logger = LoggerUtils.getLogger(NettyNetRPCServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, final NettyRpcRequestMessage request) throws Exception {
NettyRemoteRpcHandlerService remoteRpcHandlerService = SpringServiceManager.getSpringLoadService().getNettyRemoteRpcHandlerService();
remoteRpcHandlerService.submit(() -> {
if (logger.isDebugEnabled()) {
logger.debug("RECEIVE REQUEST " + request.getRequestId());
}
NettyRpcResponseMessage response = new NettyRpcResponseMessage();
response.setRequestId(request.getRequestId());
try {
Object result = SpringServiceManager.getSpringLoadService().getDispatcherService().dispatcher(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t.toString());
logger.error("RPC SERVER HANDLE REQUEST ERROR", t);
}
ctx.writeAndFlush(response).addListener((ChannelFutureListener) channelFuture -> {
if (logger.isDebugEnabled()) {
logger.debug("SEND RESPONSE FOR REQUEST " + request.getRequestId());
}
});
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//if(logger.isErrorEnabled()) {
logger.error("SERVER CAUGHT EXCEPTION", cause);
//}
ctx.close();
}
}
4、rpc客户端
rpc客户端就是发起rpc请求的服务器的一方,我们把任何发起rpc请求的服务器都在这一时刻当做rpc客户端。rpc客户端发起rpc调用的方式一般有两种:
1 、同步方式调用:同步方式调用主要是rpc请求线程必须得阻塞到远程调用结果返回来才继续执行。
2、 异步调用方式:异步调用主要是rpc请求线程不会阻塞,而是通过回调接口的调用方式来获取返回结果。最后将结果返回给调用线程。
针对于这两种方式,在不同的业务场合选这不同的调用方式。当我们调用线程不考虑耗时的情况下我们选择同步调用的方式。使得程序上面来说要简单。当我们对时间要求特别高的时候我们应该选择异步调用。不要阻塞调用线程。
对于rpc客户,我们先封装一套请求原则,即请求头和响应头。
4.1.1、rpc请求头
package com.twjitm.core.common.netstack.entity.rpc;
/**
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 10:10
* https://blog.csdn.net/baidu_23086307
*/
public class NettyRpcRequestMessage {
/**
* 请求id
*/
private String requestId;
/**
* class 名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 请求参数类型
*/
private Class<?>[] parameterTypes;
/**
* 请求参数
*/
private Object[] parameters;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
4.1.2、rpc响应头
package com.twjitm.core.common.netstack.entity.rpc;
/**
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 10:15
* https://blog.csdn.net/baidu_23086307
*/
public class NettyRpcResponseMessage {
/**
* 请求id
*/
private String requestId;
/**
* 错误码
*/
private String error;
/**
* 返回值
*/
private Object result;
public String getRequestId() {
return requestId;
}
public String getError() {
return error;
}
public Object getResult() {
return result;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public void setError(String error) {
this.error = error;
}
public void setResult(Object result) {
this.result = result;
}
public boolean isError() {
return error != null;
}
}
4.2、rpc客户端具体实现:
4.2.1:总体概述
由于rpc客户端代码稍微多一点,我们先把包目录结构给出,这样阅读的时候都比较清晰。
由上往下简单介绍一下:
AbstractNettyRpcConnectManager 连接管理对象,我们通过zookeeper分布式服务发现。连获得远程服务器信息。
package com.twjitm.core.common.service.rpc.client;
import com.twjitm.core.common.config.global.NettyGameServiceConfig;
import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
import com.twjitm.core.common.service.rpc.network.NettyRpcClient;
import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
import com.twjitm.core.common.service.rpc.server.NettySdServer;
import com.twjitm.core.common.zookeeper.NettyZookeeperNodeInfo;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.core.utils.logs.LoggerUtils;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author twjitm - [Created on 2018-08-20 10:57]
* @jdk java version "1.8.0_77"
* rpc 抽象连接管理器
*/
public abstract class AbstractNettyRpcConnectManager {
private Logger logger = LoggerUtils.getLogger(AbstractNettyRpcConnectManager.class);
private ThreadPoolExecutor threadPoolExecutor;
private ReentrantLock lock = new ReentrantLock();
private Map<Integer, NettyRpcClient> serverNodes = new HashMap<>();
private AtomicInteger roundRobin = new AtomicInteger();
public void initManager() {
NettyGameServiceConfigService config = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
NettyGameServiceConfig serviceConfig = config.getNettyGameServiceConfig();
serviceConfig.getAsyncThreadPoolMaxSize();
threadPoolExecutor = new ThreadPoolExecutor(serviceConfig.getRpcConnectThreadSize(),
serviceConfig.getRpcConnectThreadSize(),
600L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(65536));
}
/**
* 本服务器自己配置在本地的rpc
* @param allServerAddress
* @throws InterruptedException
*/
public void initServers(List<NettySdServer> allServerAddress) throws InterruptedException {
lock.lock();
if (allServerAddress != null) {
for (NettySdServer sdServer : allServerAddress) {
if (serverNodes.containsKey(sdServer.getServerId())) {
continue;
}
NettyRpcNodeInfo rpcNodeInfo = new NettyRpcNodeInfo();
rpcNodeInfo.setServerId(String.valueOf(sdServer.getServerId()));
rpcNodeInfo.setHost(sdServer.getIp());
rpcNodeInfo.setPort(String.valueOf(sdServer.getRpcPort()));
NettyRpcClient rpcClient = new NettyRpcClient(rpcNodeInfo, threadPoolExecutor);
serverNodes.put(sdServer.getServerId(), rpcClient);
}
}
lock.unlock();
}
/**
* 选择远程rpc
* @param serverId
* @return
*/
public NettyRpcClient getNettyRpcClientByServerId(int serverId) {
if (serverId == 0) {
List<NettyRpcClient> handlers = new ArrayList(this.serverNodes.values());
int size = handlers.size();
int index = (roundRobin.getAndAdd(1) + size) % size;
return handlers.get(index);
} else {
try {
NettyRpcClient rpcClient = this.serverNodes.get(serverId);
return rpcClient;
} catch (Exception e) {
logger.error("WAITING FOR AVAILABLE NODE IS INTERRUPTED! ");
logger.error(e);
throw new RuntimeException("CAN'T CONNECT ANY SERVERS!", e);
}
}
}
public void stop() {
for (NettyRpcClient rpcClient : serverNodes.values()) {
rpcClient.close();
}
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown();
}
}
/**
* 通过zookeeper发现的rpc
* @param nettyZookeeperNodeInfoList
*/
public void initNettyZookeeperRpcServers(List<NettyZookeeperNodeInfo> nettyZookeeperNodeInfoList) {
//增加同步,当前
synchronized (this) {
if (nettyZookeeperNodeInfoList != null) {
//不能把自己添加到这里面,应为自己已经添加进去了,应该添加别的的服务器信息
for (NettyZookeeperNodeInfo zooKeeperNodeInfo : nettyZookeeperNodeInfoList) {
if (serverNodes.containsKey(zooKeeperNodeInfo.getServerId())) {
continue;
}
NettyRpcNodeInfo rpcNodeInfo = new NettyRpcNodeInfo();
rpcNodeInfo.setServerId(zooKeeperNodeInfo.getServerId());
rpcNodeInfo.setHost(zooKeeperNodeInfo.getHost());
rpcNodeInfo.setPort(zooKeeperNodeInfo.getPort());
NettyRpcClient rpcClient = new NettyRpcClient(rpcNodeInfo, threadPoolExecutor);
serverNodes.put(Integer.parseInt(zooKeeperNodeInfo.getServerId()), rpcClient);
}
}
}
}
}
异步回调接口
package com.twjitm.core.common.service.rpc.client;
/**
* rpc远程回调接口
* @author twjitm - [Created on 2018-08-20 11:46]
* @jdk java version "1.8.0_77"
*
*/
public interface NettyAsyncRPCCallback {
void success(Object result);
void fail(Exception e);
}
rpc上下文信息持有者。
package com.twjitm.core.common.service.rpc.client;
/**
* rpc信息上下文持有者
*
* @author twjitm - [Created on 2018-08-20 12:28]
* @jdk java version "1.8.0_77"
*/
public class NettyRpcContextHolder {
/**
* 采用ThreadLocal 模式实现一个线程安全的上下文切换。
*/
private static final ThreadLocal<NettyRpcContextHolderObject> contextHolder = new
ThreadLocal<NettyRpcContextHolderObject>();
public static NettyRpcContextHolderObject getContext() {
return (NettyRpcContextHolderObject) contextHolder.get();
}
/**
* 通过字符串选择数据源
*
* @param
*/
public static void setContextHolder(NettyRpcContextHolderObject rpcContextHolderObject) {
contextHolder.set(rpcContextHolderObject);
}
}
rpc上下文持有对象。
package com.twjitm.core.common.service.rpc.client;
import com.twjitm.core.common.enums.NettyGameTypeEnum;
/**
* rpc上下文持有者对像。主要包含请求信息,服务器信息,请求类型
* 在rpc请求之前需要将基本信息保存到此对象中,底层需要获取基本消息,来路由
* 到具体的远程服务器上。
*
* @author twjitm - [Created on 2018-08-20 12:31]
* @jdk java version "1.8.0_77"
*/
public class NettyRpcContextHolderObject {
private NettyGameTypeEnum nettyGameTypeEnum;
private int serviceId;
public NettyRpcContextHolderObject(NettyGameTypeEnum nettyGameTypeEnum, int serviceId) {
this.nettyGameTypeEnum = nettyGameTypeEnum;
this.serviceId = serviceId;
}
public NettyGameTypeEnum getNettyGameTypeEnum() {
return nettyGameTypeEnum;
}
public void setNettyGameTypeEnum(NettyGameTypeEnum nettyGameTypeEnum) {
this.nettyGameTypeEnum = nettyGameTypeEnum;
}
public int getServiceId() {
return serviceId;
}
public void setServiceId(int serviceId) {
this.serviceId = serviceId;
}
}
为什么需要rpc上下文持有对象?
在异步调用中,我们需要一个当前调用环境线程。当前调用发起者需要在异步调用中不会去等待rpc返回消息,需要在回调接口中获得消息,当rpc消息返回来是,需要告诉调用者。这个时候就需要用到当前调用的上下文了。
异步接口对象:
package com.twjitm.core.common.service.rpc.client;
import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcResponseMessage;
import com.twjitm.core.common.service.rpc.service.NettyRpcProxyService;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.core.utils.logs.LoggerUtils;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;
/**
* 远程调用线程执行返结果对象,本身{@link NettyRpcFuture}继承
* 了{@link Future}类,所以执行线程必定会阻塞,所以不能再
* 游戏线程使用本对象,即不能再{@link io.netty.channel.EventLoop}所在的线程执行
*
* @author twjitm- [Created on 2018-08-20 11:38]
* @jdk java version "1.8.0_77"
*/
public class NettyRpcFuture implements Future<Object> {
private Logger logger = LoggerUtils.getLogger(NettyRpcFuture.class);
/**
* 同步器
*/
private Sync sync;
/**
* rpc 请求消息
*/
private NettyRpcRequestMessage request;
/**
* rpc返回消息
*/
private NettyRpcResponseMessage response;
/**
* 开始时间
*/
private long startTime;
/**
* 回调接口
*/
private List<NettyAsyncRPCCallback> pendingCallbacks = new ArrayList<NettyAsyncRPCCallback>();
/**
* 结果检测锁
*/
private ReentrantLock lock = new ReentrantLock();
public NettyRpcFuture(NettyRpcRequestMessage request) {
this.sync = new Sync();
this.request = request;
this.startTime = System.currentTimeMillis();
}
/**
* 是否完成 利用同步锁状态{@link AbstractQueuedSynchronizer}
* 来检测是否执行完成。
*
* @return
*/
@Override
public boolean isDone() {
return sync.isDone();
}
/**
* 获取一个返回结果
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@Override
public Object get() throws InterruptedException, ExecutionException {
//阻塞等待,一直等到返回结果到来,处于阻塞状态
sync.acquire(-1);
if (this.response != null) {
return this.response.getResult();
} else {
return null;
}
}
/**
* 获取一个是否具有过时效果的返回结果
*
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
//阻塞到获取锁返回true表示获得了锁,或者被打断抛出异常,或者到超时,返回false表示没有获得锁。
boolean success = sync.tryAcquireNanos(-1, unit.toNanos(timeout));
if (success) {
//获得锁成功,表明已经有结果返回或者还没有提交处理,中断处理了
if (this.response != null) {
return this.response.getResult();
} else {
return null;
}
} else {
throw new RuntimeException("TIMEOUT EXCEPTION. REQUEST ID: " + this.request.getRequestId()
+ ". REQUEST CLASS NAME: " + this.request.getClassName()
+ ". REQUEST METHOD: " + this.request.getMethodName());
}
}
/**
* 是否能够中断一个rpc请求消息
*
* @return
*/
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
/**
* 中断一个rpc请求消息
*
* @param mayInterruptIfRunning 中断的时候是否正在有运行的任务
* @return
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
/**
* 收到rpc消息返回。
* 收到一个rpc消息返回的时候,首先将消息保存到本地,然后将同步锁释放掉。{@link Sync#release(int)}
*
* @param response
*/
public void done(NettyRpcResponseMessage response) {
this.response = response;
sync.release(1);
invokeCallbacks();
// Threshold
long responseTime = System.currentTimeMillis() - startTime;
/**
* 远程调用返回接口最大时长
*/
long responseTimeThreshold = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService().getNettyGameServiceConfig().getRpcTimeOut();
if (responseTime > responseTimeThreshold) {
logger.warn("SERVICE RESPONSE TIME IS TOO SLOW. REQUEST ID = " + response.getRequestId() + ". RESPONSE TIME = " + responseTime + "ms");
}
}
/**
* 是否超时,当网络状态比较差或者负载比较高的时候,一条rpc请求消息可能会延迟,
* 可以利用延迟策略来决定消息是否重新发送处理。
*
* @return
*/
public boolean isTimeout() {
long responseTime = System.currentTimeMillis() - startTime;
NettyGameServiceConfigService gameServerConfigService = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
int timeOut = gameServerConfigService.getNettyGameServiceConfig().getRpcFutureDeleteTimeOut();
if (responseTime >= timeOut) {
return true;
}
return false;
}
/**
* 调用回调函数。当一个rpc 请求消息有多个回调函数调用的时候
* 需要把回调函数接口存放到一个集合中,才用可重入锁{@link ReentrantLock}
* 来解决并发带来的问题
*/
private void invokeCallbacks() {
lock.lock();
try {
for (final NettyAsyncRPCCallback callback : pendingCallbacks) {
runCallback(callback);
}
} finally {
lock.unlock();
}
}
/**
* 添加回调函数,添加回调函数即对当前rpc请求返回结果进行
* 监听处理
*
* @param callback
* @return
*/
public NettyRpcFuture addCallback(NettyAsyncRPCCallback callback) {
lock.lock();
try {
if (isDone()) {
logger.info("远程调用结果已经获取到了。直接回调一个函数");
runCallback(callback);
} else {
logger.info("等待远程执行结果的到来,需要将回调函数放入到队列中");
this.pendingCallbacks.add(callback);
}
//不管怎么样,都要释放锁
} finally {
lock.unlock();
}
return this;
}
/**
* 运行一个回调。如何获得一个执行结果呢?需要在{@link NettyRpcProxyService#submit(Runnable)}
* 的回调函数里面获取返回结果。最后将结果类型是否成功提交给调用线程的{@link NettyAsyncRPCCallback}对象,
* {@link NettyAsyncRPCCallback} 对象自己去实现返回成功的业务逻辑和返回失败的业务逻辑
*
* @param callback
*/
private void runCallback(final NettyAsyncRPCCallback callback) {
final NettyRpcResponseMessage res = this.response;
NettyRpcProxyService nettyRpcProxyService = SpringServiceManager.getSpringLoadService().getNettyRpcProxyService();
nettyRpcProxyService.submit(() -> {
if (!res.isError()) {
callback.success(res.getResult());
} else {
callback.fail(new RuntimeException("RESPONSE ERROR", new Throwable(res.getError())));
}
});
}
/**
* 实现异步回调的关键核心
*/
static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
private final int done = 1;
private final int pending = 0;
@Override
protected boolean tryAcquire(int acquires) {
return getState() == done;
}
/**
* CAS操作,保证原子性
*
* @param releases
* @return
*/
@Override
protected boolean tryRelease(int releases) {
if (getState() == pending) {
if (compareAndSetState(pending, done)) {
return true;
}
}
return false;
}
public boolean isDone() {
getState();
return getState() == done;
}
}
}
这个类比较重要,他是异步等待监听远程返回结果并且将结果写入到异步接口中的核心类,采用ASQ模式。等待远程返回。
4.2.2、动态代理在rpc中的应用
首先我们来定义一个代理接口
package com.twjitm.core.common.service.rpc.client.proxy;
import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
/**
* @author twjitm - [Created on 2018-08-20 14:45]
* @jdk java version "1.8.0_77"
*/
public interface INettyAsyncRpcProxy {
public NettyRpcFuture call(String funcName, Object... args);
}
代理对象
package com.twjitm.core.common.service.rpc.client.proxy;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
import com.twjitm.core.common.service.rpc.client.AbstractNettyRpcConnectManager;
import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolder;
import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolderObject;
import com.twjitm.core.common.service.rpc.network.NettyRpcClient;
import com.twjitm.core.common.service.rpc.service.NettyRpcClientConnectService;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.core.utils.logs.LoggerUtils;
import org.apache.log4j.Logger;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author twjitm- [Created on 2018-08-20 12:15]
* @jdk java version "1.8.0_77"
* 代理对象
*/
public class NettyObjectProxy<T> implements InvocationHandler {
private Logger logger=LoggerUtils.getLogger(NettyObjectProxy.class);
private Class<T> clazz;
private int timeOut;
public NettyObjectProxy(Class<T> clazz, int timeOut) {
this.clazz = clazz;
this.timeOut = timeOut;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
NettyRpcRequestMessage request = new NettyRpcRequestMessage();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
if(logger.isInfoEnabled()) {
logger.debug(method.getName());
logger.debug(method.getDeclaringClass().getName());
for (int i = 0; i < method.getParameterTypes().length; ++i) {
logger.debug(method.getParameterTypes()[i].getName());
}
for (int i = 0; i < args.length; ++i) {
logger.debug(args[i].toString());
}
}
NettyRpcContextHolderObject rpcContextHolderObject = NettyRpcContextHolder.getContext();
NettyRpcClientConnectService rpcClientConnectService = SpringServiceManager.getSpringLoadService().getNettyRpcClientConnectService();
AbstractNettyRpcConnectManager abstractRpcConnectManager = rpcClientConnectService.getNettyRpcConnectManager(rpcContextHolderObject.getNettyGameTypeEnum());
NettyRpcClient rpcClient = abstractRpcConnectManager.getNettyRpcClientByServerId(rpcContextHolderObject.getServiceId());
NettyRpcFuture rpcFuture = rpcClient.sendRequest(request);
if(timeOut > 0){
return rpcFuture.get(timeOut, TimeUnit.MILLISECONDS);
}
return rpcFuture.get();
}
}
异步代理接口对象
package com.twjitm.core.common.service.rpc.client.proxy;
import com.twjitm.core.common.factory.NettyRpcRequestFactory;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
import com.twjitm.core.common.service.rpc.client.AbstractNettyRpcConnectManager;
import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolder;
import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolderObject;
import com.twjitm.core.common.service.rpc.network.NettyRpcClient;
import com.twjitm.core.common.service.rpc.service.NettyRpcClientConnectService;
import com.twjitm.core.common.service.rpc.service.NettyRpcProxyService;
import com.twjitm.core.spring.SpringServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* 异步rpc代理,一个目标异步代理对象,创建过程由{@link NettyRpcProxyService#createProxy(Class)}
* {@link NettyAsyncRpcProxy}代理类主要实现的功能:
* 将所需要的代理方法名称和具体参数通过{@link NettyRpcClient} 传递给远程目标代理服务器。本地方法会立即返回一个
* 空{@link NettyRpcFuture} 对象,通过注入回调函数的方式,监听{@link NettyRpcFuture} 的状态变化,
* 最后通过注入监听回调函数的方式来实现异步rcp远程服务调用
* </p>
* <p>
*
* @author twjitm- [Created on 2018-08-20 14:46]
* @jdk java version "1.8.0_77"
*/
public class NettyAsyncRpcProxy<T> implements INettyAsyncRpcProxy {
private Logger logger=LoggerFactory.getLogger(NettyAsyncRpcProxy.class);
/**
* <p>
* 代理接口类信息:此处的类信息必须是接口的,而不是实现类的。
* 应为代理模式采用的动态代理,动态代理也成为接口代理,具体查看
*
* @see <a href="https://my.oschina.net/u/3296367/blog/1475258 "><h2>动态代理模式详解</h2></a>
* </p>
*/
private Class<T> clazz;
public NettyAsyncRpcProxy(Class<T> clazz) {
this.clazz = clazz;
}
@Override
public NettyRpcFuture call(String funcName, Object... args) {
//获得一个持有对象
NettyRpcContextHolderObject rpcContextHolderObject = NettyRpcContextHolder.getContext();
//联合器服务类
NettyRpcClientConnectService rpcClientConnectService =
SpringServiceManager.getSpringLoadService().getNettyRpcClientConnectService();
//更具类型获取一个rpc联合管理器
AbstractNettyRpcConnectManager abstractRpcConnectManager
= rpcClientConnectService.getNettyRpcConnectManager(
rpcContextHolderObject.getNettyGameTypeEnum());
//根据服务id,获得一个rpc客户端对象,此rpc client对象持有上下文消息的对象
NettyRpcClient rpcClient = abstractRpcConnectManager.getNettyRpcClientByServerId(
rpcContextHolderObject.getServiceId());
//获得一个rpc请求消息生产工厂
NettyRpcRequestFactory rpcRequestFactory =
SpringServiceManager.getSpringLoadService().getNettyRpcRequestFactory();
//构建一个rpc请求
NettyRpcRequestMessage request =
rpcRequestFactory.createNettyRpcRequestMessage(this.clazz.getName(), funcName, args);
//将消息发送出去
NettyRpcFuture rpcFuture = rpcClient.sendRequest(request);
logger.info("正在接通远程服务PRC.是否完成:"+rpcFuture.isDone());
return rpcFuture;
}
}
代理服务:
package com.twjitm.core.common.service.rpc.service;
import com.twjitm.core.common.config.global.GlobalConstants;
import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
import com.twjitm.core.common.service.IService;
import com.twjitm.core.common.service.rpc.client.NettyAsyncRPCCallback;
import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
import com.twjitm.core.common.service.rpc.client.proxy.INettyAsyncRpcProxy;
import com.twjitm.core.common.service.rpc.client.proxy.NettyAsyncRpcProxy;
import com.twjitm.core.common.service.rpc.client.proxy.NettyObjectProxy;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.threads.thread.NettyThreadNameFactory;
import org.springframework.stereotype.Service;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @jdk java version "1.8.0_77"
*/
@Service
public class NettyRpcProxyService implements IService {
/**
* rpc异步消息代理服务执行线程
*/
private static ThreadPoolExecutor threadPoolExecutor;
/**
* 动态代理利用了JDK
* API,{@link Proxy#newProxyInstance(ClassLoader, Class[], InvocationHandler)}
* 动态地在内存中构建代理对象,从而实现对目标对象的代理功能。动态代理又被称为JDK代理或接理。
* <p>
* 静态代理与动态代理的区别主要在:
* <p>
* 静态代理在编译时就已经实现,编译完成后代理类是一个实际的class文件
* 动态代理是在运行时动态生成的,即编译完成后没有实际的class文件,而是在运行时动态生成类字节码,并加载到JVM中
* 特点: 动态代理对象不需要实现接口,但是要求目标对象必须实现接口,否则不能使用动态代理。
*
* @param interfaceClass
* @param <T>
* @return
*/
@SuppressWarnings("unchecked")
public <T> T createProxy(Class<T> interfaceClass) {
NettyGameServiceConfigService gameServerConfigService = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
int timeOut = gameServerConfigService.getNettyGameServiceConfig().getRpcTimeOut();
//创建一个动态代理对象
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new NettyObjectProxy<>(interfaceClass, timeOut)
);
}
/**
* 创建一个异步代理对象
*
* @param interfaceClass
* @param <T>
* @return
*/
public <T> INettyAsyncRpcProxy createAsync(Class<T> interfaceClass) {
return new NettyAsyncRpcProxy<>(interfaceClass);
}
@Override
public String getId() {
return NettyRpcProxyService.class.getSimpleName();
}
@Override
public void startup() throws Exception {
NettyGameServiceConfigService gameServerConfigService = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
int threadSize = gameServerConfigService.getNettyGameServiceConfig().getRpcSendProxyThreadSize();
NettyThreadNameFactory factory=new NettyThreadNameFactory(
GlobalConstants.Thread.RPC_PROXY_MESSAGE_EXECUTOR,false);
threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(65536),factory);
}
@Override
public void shutdown() throws Exception {
threadPoolExecutor.shutdown();
}
/**
* 提交一个任务,rpc异步请求采用子线程来处理请求任务,将请求任务放入到请求队列中,任务线程
* 从任务队列里取出任务执行。在返回future
* 中注入回调函数{@link NettyRpcFuture#addCallback(NettyAsyncRPCCallback)}。
* 返回给调用线程获取返回结果。
*
* @param task
*/
public void submit(Runnable task) {
threadPoolExecutor.submit(task);
}
}
4.3、rpc客户端实体
package com.twjitm.core.common.service.rpc.network;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcResponseMessage;
import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
import com.twjitm.core.common.service.rpc.service.NettyRPCFutureService;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.core.utils.logs.LoggerUtils;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.log4j.Logger;
import java.util.concurrent.ExecutorService;
/**
* @author twjitm- [Created on 2018-08-20 11:01]
* @jdk java version "1.8.0_77"
* netty rpc client entity
*/
public class NettyRpcClient {
private Logger logger = LoggerUtils.getLogger(NettyRpcClient.class);
private NettyRpcClientConnection rpcClientConnection;
public NettyRpcClient(NettyRpcNodeInfo rpcNodeInfo, ExecutorService threadPool) {
rpcClientConnection = new NettyRpcClientConnection(this, rpcNodeInfo, threadPool);
}
/**
* 发送消息,最终会调用rpcClientConnection 的writeRequest方法。其实内部实现
* 则利用netty 的channel对象发送数据
*
* @param request
* @return
*/
public NettyRpcFuture sendRequest(NettyRpcRequestMessage request) {
NettyRpcFuture rpcFuture = new NettyRpcFuture(request);
NettyRPCFutureService rpcFutureService = SpringServiceManager.getSpringLoadService().getNettyRPCFutureService();
rpcFutureService.addNettyRPCFuture(request.getRequestId(), rpcFuture);
//发送数据到远程服务器上
rpcClientConnection.writeRequest(request);
return rpcFuture;
}
public NioSocketChannel getChannel() {
return rpcClientConnection.getChannel();
}
public void close() {
logger.info("RPC CLIENT CLOSE");
if (rpcClientConnection != null) {
rpcClientConnection.close();
}
}
/**
* 本地调用对象处理rpc返回消息
*
* @param rpcResponse
*/
public void handleRpcResponse(NettyRpcResponseMessage rpcResponse) {
String requestId = rpcResponse.getRequestId();
NettyRPCFutureService rpcFutureService = SpringServiceManager.getSpringLoadService().
getNettyRPCFutureService();
NettyRpcFuture rpcFuture = rpcFutureService.getNettyRPCFuture(requestId);
if (rpcFuture != null) {
boolean removeFlag = rpcFutureService.removeNettyRPCFuture(requestId, rpcFuture);
if (removeFlag) {
rpcFuture.done(rpcResponse);
} else {
//表示服务器已经处理过了,可能已经超时了
logger.error("RPCFUTURE IS REMOVE " + requestId);
}
}
}
public NettyRpcClientConnection getRpcClientConnection() {
return rpcClientConnection;
}
}
rpc连接对象:
package com.twjitm.core.common.service.rpc.network;
import com.twjitm.core.common.netstack.entity.rpc.NettyRpcRequestMessage;
import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
import com.twjitm.core.utils.logs.LoggerUtils;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
/**
* rpc消息连接实体,主要是讲封装好的请求包,通过连接对象,启动一个netty本地服务器,
* 然后通过服务器连接到远程服务器。
*
* @author twjtim- [Created on 2018-08-20 11:01]
* @jdk java version "1.8.0_77"
* rpc 连接实体对象
*/
public class NettyRpcClientConnection {
private Logger logger = LoggerUtils.getLogger(NettyRpcClientConnection.class);
private NioSocketChannel channel;
private ReentrantLock statusLock;
/**
* 重连线程池工具
*/
private ExecutorService threadPool;
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
/**
* 是否启用重连
*/
private volatile boolean reConnectOn = true;
private NettyRpcClient nettyRpcClient;
private NettyRpcNodeInfo nettyRpcNodeInfo;
public NettyRpcClientConnection(NettyRpcClient nettyRpcClient, NettyRpcNodeInfo nettyRpcNodeInfo, ExecutorService threadPool) {
if (threadPool == null) {
throw new IllegalArgumentException("ALL PARAMETERS MUST ACCURATE.");
}
this.nettyRpcClient = nettyRpcClient;
this.nettyRpcNodeInfo = nettyRpcNodeInfo;
this.threadPool = threadPool;
this.statusLock = new ReentrantLock();
}
/**
* 创建打开连接,此方法很重要
* 所谓打开连接,其实相当于启动netty客户端程序一样
* 将启动程序封装到NettyRpcServerConnectTask类中,可以看到
* 当提交一个NettyRpcServerConnectTask任务时候,利用java
* 提供的Future类来提交一个任务,我们可以看到这个submit是一个
* 同步阻塞试方法。
*
* @return
*/
public boolean open() {
// 判断是否已经连接
if (isConnected()) {
throw new IllegalStateException("ALREADY CONNECTED. DISCONNECT FIRST.");
}
// 创建Socket连接
try {
InetSocketAddress remotePeer = new InetSocketAddress(nettyRpcNodeInfo.getHost(), nettyRpcNodeInfo.getIntPort());
//连接结束
logger.info("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer);
Future future = threadPool.submit(new NettyRpcServerConnectTask(nettyRpcNodeInfo, eventLoopGroup, nettyRpcClient));
future.get();
if (isConnected()) {
return false;
}
if (logger.isInfoEnabled()) {
logger.info("CONNECT SUCCESS.");
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean isConnected() {
if (channel == null) {
return false;
}
return channel.isActive();
}
/**
* 发送一条消息
*
* @return
*/
public boolean writeRequest(NettyRpcRequestMessage rpcRequestMessage) {
if (!isConnected() && reConnectOn) {
tryReConnect();
if (!isConnected()) {
return false;
}
}
// 发送消息
if (channel != null) {
if (logger.isDebugEnabled()) {
logger.debug("【SEND】" + rpcRequestMessage);
}
channel.writeAndFlush(rpcRequestMessage);
return true;
}
return false;
}
public void tryReConnect() {
statusLock.lock();
try {
if (!isConnected()) {
try {
Future<?> future = threadPool.submit(new ReConnect());
future.get();
} catch (Exception e) {
logger.error("NETTY RPC CLIENT CONNECTION TRY RECONNECT IS ERROR");
logger.error(e);
}
}
} catch (Exception e) {
} finally {
statusLock.unlock();
}
}
/**
* 重连线程内部类
*
* @author Fancy
*/
private class ReConnect implements Runnable {
@Override
public void run() {
try {
open();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.error("RESTART CONNECTION ERROR.");
}
} finally {
// 设置为允许重连
// reConnect = false;
}
}
}
/**
* 启动自动重连
*/
public void setReconnectOn() {
this.reConnectOn = true;
}
/**
* 关闭自动重连
*/
public void setReconnectOff() {
this.reConnectOn = false;
}
public NioSocketChannel getChannel() {
return channel;
}
public void setChannel(NioSocketChannel channel) {
this.channel = channel;
}
public void close() {
if (channel != null) {
channel.close();
}
eventLoopGroup.shutdownGracefully();
}
}
rpc 连接任务:每一个rpc 请求相当于一个网络请求,利用netty 通过的网络层操作,实现rpc网络请求。
package com.twjitm.core.common.service.rpc.network;
import com.twjitm.core.common.handler.rpc.NettyRpcClientServerHandler;
import com.twjitm.core.common.service.rpc.server.NettyRpcNodeInfo;
import com.twjitm.core.initalizer.NettyRpcClientMessageServerInitializer;
import com.twjitm.core.utils.logs.LoggerUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.log4j.Logger;
import java.net.InetSocketAddress;
/**
* @author twjtim- [Created on 2018-08-20 11:11]
* @jdk java version "1.8.0_77"
* 连接到服务器
*/
public class NettyRpcServerConnectTask implements Runnable {
private Logger logger=LoggerUtils.getLogger(NettyRpcServerConnectTask.class);
/**
* 连接地址
*/
private InetSocketAddress remotePeer;
private EventLoopGroup eventLoopGroup;
private NettyRpcClient nettyRpcClient;
public NettyRpcServerConnectTask(
NettyRpcNodeInfo nettyRpcNodeInfo,
EventLoopGroup eventLoopGroup,
NettyRpcClient nettyRpcClient) {
this.eventLoopGroup = eventLoopGroup;
this.nettyRpcClient = nettyRpcClient;
this.remotePeer = new InetSocketAddress(nettyRpcNodeInfo.getHost(), nettyRpcNodeInfo.getIntPort());
}
@Override
public void run() {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.DEBUG))
.handler(new NettyRpcClientMessageServerInitializer());
ChannelFuture channelFuture = b.connect(remotePeer);
channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
if (channelFuture1.isSuccess()) {
logger.info("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer + " SUCCESS");
NettyRpcClientServerHandler handler = channelFuture1.channel().pipeline().get(NettyRpcClientServerHandler.class);
handler.setNettyRpcClient(nettyRpcClient);
nettyRpcClient.getRpcClientConnection().setChannel((NioSocketChannel) channelFuture1.channel());
}else{
logger.debug("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer + "FAIL");
}
});
try {
channelFuture.await();
} catch (InterruptedException e) {
logger.error(e.toString(), e);
}
//连接结束
logger.debug("CONNECT TO REMOTE SERVER. REMOTE PEER = " + remotePeer);
}
}
4.5、rpc路由和服务发现:
首先我们需要开启rpc路由服务器,
package com.twjitm.core.common.service.rpc.service;
import com.twjitm.core.common.config.global.NettyGameServiceConfig;
import com.twjitm.core.common.config.global.NettyGameServiceConfigService;
import com.twjitm.core.common.factory.thread.NettyRpcHandlerThreadPoolFactory;
import com.twjitm.core.common.service.IService;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.threads.utils.ExecutorUtil;
import org.springframework.stereotype.Service;
/**
* Created by IntelliJ IDEA.
* User: 文江 Date: 2018/8/19 Time: 10:29
* https://blog.csdn.net/baidu_23086307
*/
@Service
public class NettyRemoteRpcHandlerService implements IService {
private NettyRpcHandlerThreadPoolFactory rpcHandlerThreadPool;
@Override
public String getId() {
return "NettyRemoteRpcHandlerService";
}
@Override
public void startup() throws Exception {
NettyGameServiceConfigService config = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService();
NettyGameServiceConfig gameConfig = config.getNettyGameServiceConfig();
if (gameConfig.isRpcOpen()) {
//开启服务
rpcHandlerThreadPool= SpringServiceManager.getSpringLoadService().
getNettyRpcHandlerThreadPoolFactory();
rpcHandlerThreadPool.createExecutor(
gameConfig.getRpcConnectThreadSize(),
gameConfig.getRpcSendProxyThreadSize());
}
}
@Override
public void shutdown() throws Exception {
ExecutorUtil.shutdownAndAwaitTermination(SpringServiceManager.getSpringLoadService().getNettyRpcHandlerThreadPoolFactory().getExecutor());
}
public void submit(Runnable runnable) {
if(rpcHandlerThreadPool!=null){
rpcHandlerThreadPool.getExecutor().submit(runnable);
}
}
}
任务调度路由服务器
package com.twjitm.core.common.service.rpc.service;
import com.twjitm.core.common.config.global.GlobalConstants;
import com.twjitm.core.common.config.global.NettyGameRpcConfig;
import com.twjitm.core.common.config.global.NettyGameServiceConfig;
import com.twjitm.core.common.config.rpc.RpcServerConfig;
import com.twjitm.core.common.service.IService;
import com.twjitm.core.common.service.rpc.client.NettyRpcFuture;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.threads.common.executor.NettyOrderThreadPoolExecutor;
import com.twjitm.threads.thread.NettyThreadNameFactory;
import com.twjitm.threads.utils.ExecutorUtil;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
/**
* @author EGLS0807 - [Created on 2018-08-20 11:51]
* @company http://www.g2us.com/
* @jdk java version "1.8.0_77"
*/
@Service
public class NettyRPCFutureService implements IService {
/**
* 定时任务调度器
*/
private ScheduledExecutorService executorService;
private ConcurrentHashMap<String, NettyRpcFuture> pendingRPC = new ConcurrentHashMap<>();
@Override
public String getId() {
return NettyRPCFutureService.class.getSimpleName();
}
@Override
public void startup() throws Exception {
NettyThreadNameFactory nettyThreadNameFactory = new
NettyThreadNameFactory(GlobalConstants.Thread.DETECT_RPC_PEND_ING);
//TODO 优化
NettyGameServiceConfig gameServiceConfig = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService().getNettyGameServiceConfig();
executorService = Executors.newScheduledThreadPool(gameServiceConfig.getRpcConnectThreadSize(), nettyThreadNameFactory);
executorService.scheduleAtFixedRate(() -> {
ConcurrentHashMap<String, NettyRpcFuture> pendingRPC = getPendingRPC();
Set<Map.Entry<String, NettyRpcFuture>> entrySet = pendingRPC.entrySet();
for (Map.Entry<String, NettyRpcFuture> entry : entrySet) {
NettyRpcFuture rpcFuture = entry.getValue();
if(rpcFuture.isTimeout()){
String requestId = entry.getKey();
boolean removeFlag = removeNettyRPCFuture(requestId, rpcFuture);
if(removeFlag) {
// rpcFuture.done(rpcResponse);
}
}
}
}, 1, 1,TimeUnit.MINUTES);
}
private ConcurrentHashMap<String,NettyRpcFuture> getPendingRPC() {
return pendingRPC;
}
@Override
public void shutdown() throws Exception {
ExecutorUtil.shutdownAndAwaitTermination(executorService, 60L, TimeUnit.MILLISECONDS);
}
public void addNettyRPCFuture(String requestId, NettyRpcFuture rpcFuture) {
pendingRPC.put(requestId, rpcFuture);
}
public boolean removeNettyRPCFuture(String requestId, NettyRpcFuture rpcFuture) {
return pendingRPC.remove(requestId, rpcFuture);
}
public NettyRpcFuture getNettyRPCFuture(String requestId) {
return pendingRPC.get(requestId);
}
}
5、测试
package com.twjitm.rpc;
import com.twjitm.TestSpring;
import com.twjitm.core.common.enums.NettyGameTypeEnum;
import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolder;
import com.twjitm.core.common.service.rpc.client.NettyRpcContextHolderObject;
import com.twjitm.core.common.service.rpc.service.NettyRpcProxyService;
import com.twjitm.core.service.rpc.service.IHelloWorld;
import com.twjitm.core.spring.SpringServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author EGLS0807 - [Created on 2018-08-20 15:16]
* @company http://www.g2us.com/
* @jdk java version "1.8.0_77"
*
* 同步阻塞调用
*/
public class HelloWorldServiceTest {
Logger logger=LoggerFactory.getLogger(HelloWorldServiceTest.class);
private NettyRpcProxyService nettyRpcProxyService;
public static void main(String[] args) {
TestSpring.initSpring();
HelloWorldServiceTest helloServiceTest = new HelloWorldServiceTest();
helloServiceTest.init();
helloServiceTest.helloTest1();
helloServiceTest.setTear();
}
private void init() {
nettyRpcProxyService=SpringServiceManager.getSpringLoadService().getNettyRpcProxyService();
}
private void helloTest1() {
IHelloWorld helloWorld = nettyRpcProxyService.createProxy(IHelloWorld.class);
int serverId=9001;
NettyRpcContextHolderObject rpcContextHolderObject =
new NettyRpcContextHolderObject(NettyGameTypeEnum.WORLD, serverId);
NettyRpcContextHolder.setContextHolder(rpcContextHolderObject);
String result = helloWorld.getHelloWorld(5);
logger.info(result);
}
public void setTear(){
if (nettyRpcProxyService != null) {
try {
nettyRpcProxyService.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
由于代码比较多,篇幅受限,源码开源道GitHub,