02-RPCServer: 一个简单的RPC demo

本章是一个初学的简单demo
以后的目标就是逐步把这个demo完善

RPCServer-一个简单的RPC demo


本demo包含的内容:

  • RPCClient + RPCServer
  • RPCClient + RPCServer均用netty框架
  • RPCServer 端使用Spring扫描服务
  • RPCClient 通过proxy调用服务
  • 使用protostuff序列化与反序列化请求和响应
  • -

1.protostuff
在网络传输中离不开序列化与反序列化. 在java中, 序列化就是指将java对象转化为二进制数据, 反序列化就是将二进制数据恢复成java对象. 序列化对象的方式有很多种, 比如protobuf, 将对象转化为json, xml 以及protostuff等等方法, 而protostuff因为性能好, 序列化后的体积小, 使用方便等等有点而广受采用. 使用方法:
*添加maven依赖

    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-core</artifactId>
        <version>1.5.3</version>
    </dependency>

    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-runtime</artifactId>
        <version>1.5.3</version>
    </dependency>

*序列化和反序列化方法

    @SuppressWarnings("unchecked")
    public static <T> byte[] serilize(T msg) {
        Class<T> tClass = (Class<T>) msg.getClass();
        Schema<T> schema = RuntimeSchema.getSchema(tClass);

        LinkedBuffer buffer = LinkedBuffer.allocate();
        final byte[] bytes;
        try {
            bytes = ProtostuffIOUtil.toByteArray(msg, schema, buffer);
        } finally {
            buffer.clear();
        }
        return bytes;
    }


    public static <T> T deserialize(byte[] bytes, Class<T> clazz) {
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        T msg = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, msg, schema);
        return msg;
    }

2.Spring扫描Service
*RPCService注解

// 该注解用于标注一个RPCService类, Spring通过扫描该注解来生成service bean
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RPCService {
    Class<?> value();
}

*service_context.xml配置文件
Spring的配置文件, component-scan标识Spring要扫描的的包, 默认情况下, Spring 只扫描@Component @Controller @Service这些Spring定义的注解. include-filter标签可以添加扫描的注解类型.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- -->
    <context:component-scan base-package="top.banyaoqiang.rpcserver.service" >
        <context:include-filter type="annotation" expression="top.banyaoqiang.rpcserver.annotation.RPCService" />
    </context:component-scan>
</beans>

*RPCService定义

// 服务接口
public interface UserAuthorityService {
    ...
}
package top.banyaoqiang.rpcserver.service;

// service实现
@RPCService(UserAuthorityService.class)
public class UserAuthorityServiceImpl implements UserAuthorityService {

    ...
}

*Spring扫描注解方法
ApplicationContext 可以用来扫描装配java bean, Spring提供了几种获取context的方法, 比如FileSystemXmlApplicationContext, ClassPathXmlApplicationContext等, 日后会整理Spring相关的笔记, 这里就不多说啦.

public void discoverService() {
    ApplicationContext context = new ClassPathXmlApplicationContext(CONTEXT_XML_PATH);

        // 获取带有RPCService注解的service, 装配成bean
        Map<String, Object> s = context.getBeansWithAnnotation(RPCService.class);

        // 将扫描出的service bean以其接口名称为key, 放入serivces Map保存.
        for (Object o : s.values()) {
            services.put(
                    o.getClass().getAnnotation(RPCService.class).value().getName(),
                    o
            );
        }
}

3.传输协议
*请求和响应
之后还需要更多扩展

public class RPCRequest {
    // 调用的接口名称
    private String interfaceName;

    // 调用的方法名称
    private String methodName;

    // 方法的参数类型
    private Class<?>[] parameterTypes;

    // 传递的参数
    private Object[] parameters;

    // setters 和 getters
    ...
}
public class RPCResponse {

    // 响应代码
    private int status;

    // 方法执行结果
    private Object result;

    // setters 和 getters
    ...
}

*Decoder和Encoder

public class RPCEncoder extends MessageToByteEncoder {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {

        // 将java对象序列化成为byte数组
        byte[] bytes = SerializationUtil.serilize(o);

        // 写入byte数组长度, 方便反序列化时判断是否接收完整
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);
    }
}
public class RPCDecoder extends ByteToMessageDecoder {
    private Class<?> clazz;

    public RPCDecoder(Class<?> clazz) {
        this.clazz = clazz;
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        // 数据未接收完毕, 直接返回
        if (byteBuf.readableBytes() < 4) return;
        byteBuf.markReaderIndex();
        int len = byteBuf.readInt();
        if (byteBuf.readableBytes() < len) {
            byteBuf.resetReaderIndex();
            return;
        }

        // 数据接收完整, 反序列化成为java对象
        byte[] bytes = new byte[len];
        byteBuf.readBytes(bytes);
        Object msg = SerializationUtil.deserialize(bytes, clazz);
        list.add(msg);
    }
}

4.RPCServer
*RPCServerHandler

public class RPCServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if (msg instanceof RPCRequest) {
                RPCRequest request = (RPCRequest) msg;
                Object service = ServiceRegister.get(request.getInterfaceName());
                if (service == null) throw new ClassNotFoundException(
                        "Service " + request.getInterfaceName() + " not found"
                );
                // 获取请求执行的Method
                Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
                Object result = method.invoke(service, request.getParameters());

                // 生成响应并发送给客户端
                RPCResponse response = new RPCResponse(200, result);
                ctx.writeAndFlush().sync();
            } else throw new Exception("Not a RPCRequest!");
        } finally {
            // 这个demo为了简单, 在每次收到请求之后就会直接把链接关掉
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

*RPCServer
代码非常简单, 在pipeline上添加前面定义好的handler, 然后启动server就可以啦.

public class NettyServerCenter {
    private int port;

    public NettyServerCenter(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        ServiceRegister.register();

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(
                                    new RPCEncoder(),
                                    new RPCDecoder(RPCRequest.class),
                                    new RPCServerHandler()
                            );
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyServerCenter(54321).run();
    }
}

5.RPCClient
*Client handler, 创建Service代理

public class RPCClient extends ChannelInboundHandlerAdapter {
    private static final String SERVER_ADDRESS = "localhost";
    private static final int PORT = 54321;

    private static RPCResponse response;

    private RPCClient() {}

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RPCResponse) response = (RPCResponse) msg;
    }

    // 创建代理
    @SuppressWarnings("unchecked")
    public static <T> T create(final Class<?> serviceInterface, int a) {
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
            new Class<?>[] {serviceInterface}, (proxy, method, args) -> {

                // 在代理上调用方法时, 将会被定向到该invoke方法
                EventLoopGroup worker = new NioEventLoopGroup();
                try {
                    Bootstrap b = new Bootstrap();
                    b.group(worker);
                    b.option(ChannelOption.SO_KEEPALIVE, true);
                    b.channel(NioSocketChannel.class);
                    b.handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(
                                new RPCEncoder(),
                                new RPCDecoder(RPCResponse.class),
                                new RPCClient()
                            );
                        }
                    });

                    ChannelFuture f = b.connect(SERVER_ADDRESS, PORT);
                    f.sync();
                    Channel channel = f.channel();

                    RPCRequest request = new RPCRequest();
                    request.setInterfaceName(serviceInterface.getName());
                    request.setMethodName(method.getName());
                    request.setParameterTypes(method.getParameterTypes());
                    request.setParameters(args);

                    channel.writeAndFlush(request).sync();
                    channel.closeFuture().sync();
                } catch (Exception e) {
                    MyLog.log(e.getMessage());
                } finally {
                    worker.shutdownGracefully();
                }
            return response.getResult();
        });
    }
}

*调用service

public static void main(String[] args) {
    // 在client本地只有UserAuthorityService接口, 并没有具体的实现类UserAuthorityServiceImpl
    // 通过代理将调用请求发送到RPCServer, 在client看起来就像在调用本地的service一样
    UserAuthorityService service = RPCClient.create(UserAuthorityService.class);
    UserInfo info = service.getUserInfo(1, "admin");
    System.out.println(info);
}

猜你喜欢

转载自blog.csdn.net/cfmy_ban/article/details/81743028