netty是基于NIO(同步非阻塞)开发的网络通信框架;对比传统BIO(阻塞IO),其并发性能有很大提升。而dubbo的底层就是使用netty作为网络框架,本文就手写简单的基于netty的RPC框架。
1 设计步骤
定义一个通用接口,作为服务提供者(provider)和消费者(consumer)之间的操作纽带
创建一个服务提供者,实现通用接口,并返回处理结果;网络方面监听消费者请求
创建一个服务消费者,通过代理模式调用远程服务接口
1.1 程序目录
1.2 定义一个通用接口
public interface TestService {
String hello(String msg);
}
2 服务提供者模块
2.1 接口实现
public class TestServiceImpl implements TestService {
@Override
public String hello(String msg) {
System.out.println("TestServiceImpl中hello被调用,参数:" + msg);
return "你好客户端,我已经收到你的消息:" + msg;
}
}
2.2 定义一个服务启动类
public class ServerBootStrap {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(40004);
nettyServer.init();
}
}
2.3 创建netty服务端
**此步骤是netty常规服务端创建方式**
public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public void init() {
//创建一个用于接收连接的线程组,参数代表线程个数
EventLoopGroup boss = new NioEventLoopGroup(1);
//创建处理操作时间的线程组,没有参数netty会默认线程为内核数*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture ch = serverBootstrap.bind(port).sync();
ch.channel().closeFuture().sync();
} catch (Exception ex) {
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2.4 服务端业务处理Handler
channelRead0方法用于接收客户端传来的信息,同时对数据进行校验
校验成功后,截取有效参数调用服务接口
public class NettyServerHandler extends SimpleChannelInboundHandler {
private static String head = "dubbo#TestServie#";
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端数据:" + msg);
if (msg.toString().startsWith(head)) {
TestService testService = new TestServiceImpl();
String result = testService.hello(msg.toString().substring(head.length()));
ctx.writeAndFlush(result);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("接收到连接请,channelActive被调用:" + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("读取完成");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开连接");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("抛出异常");
ctx.channel().close();
}
}
3 消费者模块
3.1 创建消费者启动程序
public class ClientBootStrap {
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient("127.0.0.1", 40004);
String head = "dubbo#TestServie#";
// nettyClient.init();
TestService service = (TestService) nettyClient.getBean(TestService.class, head);
String result = service.hello("你好,我是服务消费者");
System.out.println("调用返回了结果:" + result);
}
}
3.2 创建消费者网络通信模块
通过代理模式调用
public class NettyClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler nettyClientHandler;
private String host;
private int port;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
//编写一个代理 请求服务提供者接口
public Object getBean(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{
serviceClass}, ((proxy, method, args) -> {
System.out.println("开始执行代理");
if (nettyClientHandler == null)
init();
System.out.println("设置代理参数");
nettyClientHandler.setPara(providerName + args[0].toString());
return executorService.submit(nettyClientHandler).get();
}));
}
private static void init() {
System.out.println("开始执行init方法");
nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(nettyClientHandler);
}
});
bootstrap.connect("127.0.0.1", 40004).sync();
// future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// worker.shutdownGracefully();
// System.out.println("执行结束");
}
}
}
3.3 创建消费者业务处理handler
成员变量para: 为调用远程接口服务的参数
成员变量result::为调用远程服务器接口返回结果
需要注意的是该handller实现了Callable接口中call()方法;
执行步骤为:
1、连接建立成功后执行channelActive方法
2、执行call方法发送数据到服务端,同时阻塞线程
3、服务端返回结果后执行channelRead0方法,唤醒线程,
4、执行call方法中wait()后面的步骤,返回结果
public class NettyClientHandler extends SimpleChannelInboundHandler implements Callable {
private ChannelHandlerContext context;
private String para;
private String result;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开连接");
}
@Override
public synchronized Object call() throws Exception {
System.out.println("发送call消息:" + para);
context.writeAndFlush(para);
wait();
return result;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
result = o.toString();
System.out.println("收到服务端的返回消息:" + o);
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("客户端发生异常");
}
void setPara(String str) {
this.para = str;
}
}