rpc系列-rpc02

版权声明:本文为博主(李孟lm)原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_19968255/article/details/82896520

承接rpc系列-rpc01:https://blog.csdn.net/qq_19968255/article/details/82894381

示例

1.结构

2.代码

客户端:

rpc-client

/**
 * 框架的RPC 客户端(用于发送 RPC 请求)
 */
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {

	private static final Logger logger = LoggerFactory
			.getLogger(RpcClient.class);

	private String host;
	private int port;

	private RpcResponse response;

	private final Object obj = new Object();

	public RpcClient(String host, int port) {
		this.host = host;
		this.port = port;
	}
	/**
	 * 链接服务端,发送消息
	 */
	public RpcResponse send(RpcRequest request) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(group).channel(NioSocketChannel.class)
					.handler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel channel)
								throws Exception {
							// 向pipeline中添加编码、解码、业务处理的handler
							channel.pipeline()
									.addLast(new RpcEncoder(RpcRequest.class))  //out-1
									.addLast(new RpcDecoder(RpcResponse.class)) //in-1
									.addLast(RpcClient.this);                   //in-2
						}
					}).option(ChannelOption.SO_KEEPALIVE, true);
			// 链接服务器
			ChannelFuture future = bootstrap.connect(host, port).sync();
			//将request对象写入outbundle处理后发出(即RpcEncoder编码器)
			future.channel().writeAndFlush(request).sync();

			// 用线程等待的方式决定是否关闭连接
			// 其意义是:先在此阻塞,等待获取到服务端的返回后,被唤醒,从而关闭网络连接
			synchronized (obj) {
				obj.wait();
			}
			if (response != null) {
				future.channel().closeFuture().sync();
			}
			return response;
		} finally {
			group.shutdownGracefully();
		}
	}

	/**
	 * 读取服务端的返回结果
	 */
	@Override
	public void channelRead0(ChannelHandlerContext ctx, RpcResponse response)
			throws Exception {
		this.response = response;

		synchronized (obj) {
			obj.notifyAll();
		}
	}

	/**
	 * 异常处理
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		logger.error("client caught exception", cause);
		ctx.close();
	}
}
/**
 * RPC 代理(用于创建 RPC 服务代理)
 */
public class RpcProxy {
	//服务地址
	private String serverAddress;
	//自动加载,查找服务
	private ServiceDiscovery serviceDiscovery;

	public RpcProxy(String serverAddress) {
		this.serverAddress = serverAddress;
	}

	public RpcProxy(ServiceDiscovery serviceDiscovery) {
		this.serviceDiscovery = serviceDiscovery;
	}

	/**
	 * 创建代理
	 * 
	 * @param interfaceClass
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T create(Class<?> interfaceClass) {
		return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
				new Class<?>[] { interfaceClass }, new InvocationHandler() {
					public Object invoke(Object proxy, Method method,
							Object[] args) throws Throwable {
						//创建RpcRequest,封装被代理类的属性
						RpcRequest request = new RpcRequest();
						request.setRequestId(UUID.randomUUID().toString());
						//拿到声明这个方法的业务接口名称
						request.setClassName(method.getDeclaringClass()
								.getName());
						request.setMethodName(method.getName());
						request.setParameterTypes(method.getParameterTypes());
						request.setParameters(args);
						//查找服务
						if (serviceDiscovery != null) {
							serverAddress = serviceDiscovery.discover();
						}
						//随机获取服务的地址
						String[] array = serverAddress.split(":");
						String host = array[0];
						int port = Integer.parseInt(array[1]);
						//创建Netty实现的RpcClient,链接服务端
						RpcClient client = new RpcClient(host, port);
						//通过netty向服务端发送请求
						RpcResponse response = client.send(request);
						//返回信息
						if (response.isError()) {
							throw response.getError();
						} else {
							return response.getResult();
						}
					}
				});
	}
}

服务端:

rpc-server

/**
 * 处理具体的业务调用
 * 通过构造时传入的“业务接口及实现”handlerMap,来调用客户端所请求的业务方法
 * 并将业务方法返回值封装成response对象写入下一个handler(即编码handler——RpcEncoder)
 */
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {

	private static final Logger logger = LoggerFactory
			.getLogger(RpcHandler.class);

	private final Map<String, Object> handlerMap;

	RpcHandler(Map<String, Object> handlerMap) {
		this.handlerMap = handlerMap;
	}

	/**
	 * 接收消息,处理消息,返回结果,
	 */
	@Override
	public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request)
			throws Exception {
		RpcResponse response = new RpcResponse();
		response.setRequestId(request.getRequestId());
		try {
			//根据request来处理具体的业务调用
			Object result = handle(request);
			response.setResult(result);
		} catch (Throwable t) {
			response.setError(t);
		}
		//写入 outbundle(即RpcEncoder)进行下一步处理(即编码)后发送到channel中给客户端
		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
	}

	/**
	 * 根据request来处理具体的业务调用
	 * 调用是通过反射的方式来完成
	 */
	private Object handle(RpcRequest request) throws Throwable {
		String className = request.getClassName();

		//拿到实现类对象
		Object serviceBean = handlerMap.get(className);
		
		//拿到要调用的方法名、参数类型、参数值
		String methodName = request.getMethodName();
		Class<?>[] parameterTypes = request.getParameterTypes();
		Object[] parameters = request.getParameters();
		
		//拿到接口类
		Class<?> forName = Class.forName(className);

		System.out.println(serviceBean.toString()+"  "+ Arrays.toString(parameters));
		//调用实现类对象的指定方法并返回结果
		Method method = forName.getMethod(methodName, parameterTypes);
		return method.invoke(serviceBean, parameters);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		logger.error(cause.getMessage());
		ctx.close();
	}
}
/**
 * 框架的RPC 服务器(用于将用户系统的业务类发布为 RPC 服务)
 * 使用时可由用户通过spring-bean的方式注入到用户的业务系统中
 * 由于本类实现了ApplicationContextAware InitializingBean
 * spring构造本对象时会调用setApplicationContext()方法,从而可以在方法中通过自定义注解获得用户的业务接口和实现
 * 还会调用afterPropertiesSet()方法,在方法中启动netty服务器
 * 顺序
 * 1.setApplicationContext 报错会中间截断
 * 2.afterPropertiesSet
 */
public class RpcServer implements ApplicationContextAware, InitializingBean {

	private static final Logger logger = LoggerFactory
			.getLogger(RpcServer.class);

	private String serverAddress;
	private ServiceRegistry serviceRegistry;

	//用于存储业务接口和实现类的实例对象(由spring所构造)
	private Map<String, Object> handlerMap = new HashMap<String, Object>();

	public RpcServer(String serverAddress) {
		this.serverAddress = serverAddress;
	}

	//服务器绑定的地址和端口由spring在构造本类时从配置文件中传入
	public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
		this.serverAddress = serverAddress;
		//用于向zookeeper注册名称服务的工具类
		this.serviceRegistry = serviceRegistry;
	}

	/**
	 * 通过注解,获取标注了rpc服务注解的业务类的----接口及impl对象,将它放到handlerMap中
	 */
	public void setApplicationContext(ApplicationContext ctx)
			throws BeansException {
		Map<String, Object> serviceBeanMap = ctx
				.getBeansWithAnnotation(RpcService.class);
		if (MapUtils.isNotEmpty(serviceBeanMap)) {
			for (Object serviceBean : serviceBeanMap.values()) {
				//从业务实现类上的自定义注解中获取到value,从来获取到业务接口的全名
				String interfaceName = serviceBean.getClass()
						.getAnnotation(RpcService.class).value().getName();
				handlerMap.put(interfaceName, serviceBean);
			}
		}
	}

	/**
	 * 在此启动netty服务,绑定handle流水线:
	 * 1、接收请求数据进行反序列化得到request对象
	 * 2、根据request中的参数,让RpcHandler从handlerMap中找到对应的业务imple,调用指定方法,获取返回结果
	 * 3、将业务调用结果封装到response并序列化后发往客户端
	 */
	public void afterPropertiesSet() throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap bootstrap = new ServerBootstrap();
			bootstrap
					.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel channel)
								throws Exception {
							channel.pipeline()
									.addLast(new RpcDecoder(RpcRequest.class))// in-1
									.addLast(new RpcEncoder(RpcResponse.class))// out-1
									.addLast(new RpcHandler(handlerMap));// in-2
						}
					}).option(ChannelOption.SO_BACKLOG, 128)
					.childOption(ChannelOption.SO_KEEPALIVE, true);

			
			String[] array = serverAddress.split(":");
			String host = array[0];
			int port = Integer.parseInt(array[1]);

			ChannelFuture future = bootstrap.bind(host, port).sync();
			logger.debug("server started on port {}", port);

			if (serviceRegistry != null) {
				serviceRegistry.register(serverAddress);
			}

			future.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}
}
/**
 * RPC 请求注解(标注在服务实现类上)
 */
@Target({ ElementType.TYPE })//注解用在接口上
@Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
@Component
public @interface RpcService {

	Class<?> value();
}

运行结果:

zkCli数据:

服务端

sample—server调用服务端接口实现:
Hello! World

客户端

服务端返回结果:
Hello! World

代码运行路径:

 

代码下载

地址:https://download.csdn.net/download/qq_19968255/10696211

 

猜你喜欢

转载自blog.csdn.net/qq_19968255/article/details/82896520
RPC
今日推荐