代码已经上传GIT源代码
1 整体结构图
如图
2 注解
因为基于注解的框架,我们理想的情况是只要我们定义了两个注解
@SService和@SReference
只要是被@SService注解的类就可以被IOC容器加载并且将方法发布到注册中心。
只要是被@SReference注解的属性就会被注入代理类。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Inherited
public @interface SService {
}
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.FIELD, ElementType.METHOD })
public @interface SReference {
}
3 消费者代理工厂
对于每个接口都可生成代理,因为我们选的是JDK的代理。所以必须传入接口,这也跟我们面向接口的RPC框架设计相吻合。
public class ConsumerProxyFactory implements InvocationHandler {
private ConsumerHandler consumerHandler;// spring注入 consumerHandler
/* create()创建工厂bean speakInterface Class<?> interfaceClass = Class.forName(clazz); */
public Object create(Class<?> interfaceClass) {
return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[] { interfaceClass }, this);
}
/**
* 实现InvocationHandler的接口<br>
* TODO: 需要增加失败重试机制
* 遇到的问题,Spring容器可能访问被代理类的的实例的toString()方法
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> clazz = proxy.getClass().getInterfaces()[0];
LogCore.BASE.info("proxy invoke interfaceClass ={},method={},args={}",clazz, method, Arrays.toString(args));
Class<?>[] parameterTypes = method.getParameterTypes();
Request req = new Request();
req.setClazz(clazz);
req.setMethodName(method.getName());
String[] parameterTypeNames = Arrays.stream(parameterTypes).map(Class::getName).toArray(String[]::new);
req.setParameterTypeNames(parameterTypeNames);
req.setArguments(args);
String reqStr = RPCSerializer.INSTANCE.requestFormat(req);
String resb = HttpUtil.sendPost(consumerHandler.getUrl(clazz), reqStr);//调用远程接口
Result result = RPCSerializer.INSTANCE.rsponseParse(resb);
return result.data;
}
4 提供者工厂
将各个服务的实例加载进来,并通过反射可以调用服务实例的方法
ublic class ProviderProxyFactory {
private ProviderHandler providerHandler;
public Map<Class<?>, Object> providers = new ConcurrentHashMap<>();
public ProviderProxyFactory(ProviderHandler providerHandler) {
this.providerHandler = providerHandler;
if (Container.container == null) {
new Thread(() -> {
new HttpContainer(this.providerHandler.getPort(), this::handleHttpContent).start();
}).start();// netty需要另起线程,如果是jetty就不需要
}
}
public void register(Object obj) {
Class<?> interFaceClazz = obj.getClass().getInterfaces()[0];
providers.put(interFaceClazz, obj);
providerHandler.register(interFaceClazz);
LogCore.BASE.info("{} 已经发布,conf={}", interFaceClazz.getSimpleName(), providerHandler);
}
/***
* 主要RPC 逻辑接收请求信息,解析后调用相关方法并返回<br>
* 需要将异常返回给调用者
*/
public String handleHttpContent(String reqStr) {
LogCore.BASE.info("get the reqStr is {}", reqStr);
try {
if (Util.isEmpty(reqStr)) {
return RpcConstants.EMPTY_RETURN;
}
// 将请求参数解析
Request req = RPCSerializer.INSTANCE.requestParse(reqStr);
// 反射请求
// Object result = rpcRequest.invoke(ProviderProxyFactory.getInstance().getBeanByClass(rpcRequest.getClazz()));
Class<?> clazz = req.getClazz();
String methodName = req.getMethodName();
Object[] args = req.getArguments();
String[] parameterTypeNames = req.getParameterTypeNames();
Class<?>[] parameterTypes = Arrays.stream(parameterTypeNames).map(this::classForName)
.toArray(Class[]::new);
Method method = clazz.getMethod(methodName, parameterTypes);
Object bean = getBeanByClass(clazz);
Object result = method.invoke(bean, args);
Result rst = new Result().setData(result);
return RPCSerializer.INSTANCE.responseFormat(rst);
} catch (Exception e) {
LogCore.RPC.error("providerProxyFactory handle error", e);
return e.getMessage();// TODO
}
}
public Object getBeanByClass(Class<?> clazz) throws RpcException {
Object bean = providers.get(clazz);
if (bean != null) {
return bean;
}
throw new RpcException(RpcExceptionCodeEnum.NO_BEAN_FOUND.getCode(), clazz);
}
public Class<?> classForName(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
LogCore.BASE.error("classForName err", className);
return null;
}
}
}
5 netty http通信
可换为tomcat jetty或者改为TCP通信。不过http也可以采用长连接多路复用。因此简化开发选了netty http.
public class NettyHttpServerInboundHandler extends ChannelInboundHandlerAdapter {
private HttpRequest request;
private Function<String, String> httpContentfunc;
public NettyHttpServerInboundHandler(Function<String, String> httpCall) {
this.httpContentfunc = httpCall;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (HttpRequest.class.isInstance(msg)) {
request = (HttpRequest) msg;
String uri = request.uri();
System.out.println("Uri:" + uri);
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
LogCore.BASE.debug("buffer={}," + "{}", buf, buf.readableBytes());
String res = httpContentfunc.apply(buf.toString(CharsetUtil.UTF_8));
buf.release();
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.wrappedBuffer(res.getBytes(CharsetUtil.UTF_8)));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response);
ctx.flush();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LogCore.BASE.error("err", cause);
ctx.close();
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
public class NettyHttpServer {
public void start(int port, Function<String, String> httpCall) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// server端发送的是httpResponse,所以要使用HttpResponseEncoder进行编码
ch.pipeline().addLast(new HttpResponseEncoder());
// server端接收到的是httpRequest,所以要使用HttpRequestDecoder进行解码
ch.pipeline().addLast(new HttpRequestDecoder());
ch.pipeline().addLast(new NettyHttpServerInboundHandler(httpCall));
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
6 zookeeper 服务的注册和发现
public class ConsumerHandler {
public String url;
private ZookeeperClient client;
/** 调用的接口,调用接口的次数 */
private final ConcurrentHashMap<Class<?>, AtomicPositiveInteger> INVOKE_COUNT_MAP = new ConcurrentHashMap<>();
public ConsumerHandler(String url) {
this.url = url;
LogCore.BASE.info("consumerconf invoke zkclient url={}", url);
}
public ConsumerHandler start() {
this.client = new ZookeeperClient(url);
return this;
}
public String getUrl(Class<?> clazz) throws RpcException {
List<String> urlList = getRpcUrls(clazz);
return getCurrentUrl(clazz, urlList);
}
private List<String> getRpcUrls(Class<?> clazz) throws RpcException {
String rootPath = RpcUtil.getZkRootPath(clazz);
List<String> childrenList = client.getChildren(rootPath);
if (Util.isEmpty(childrenList)) {
return new ArrayList<String>(0);
}
return childrenList.stream().filter(Util::notEmpty).map(ph -> client.getData(rootPath + "/" + ph))
.filter(Util::notEmpty).collect(Collectors.toList());
}
private String getCurrentUrl(Class<?> clazz, List<String> urlList) throws RpcException {
final int _count = INVOKE_COUNT_MAP.computeIfAbsent(clazz, k -> new AtomicPositiveInteger())
.getAndIncrement();
return urlList.get(_count % urlList.size());
}
}
public class ProviderHandler {
private String target;
private Integer port;
private ZookeeperClient client;
public ProviderHandler(String target, Integer port) {
this.target = target;
this.port = port;
}
public ProviderHandler start() {
client = new ZookeeperClient(this.target);
return this;
}
public void register(Class<?> clazz) {
String path = RpcUtil.getZkRootPath(clazz);
String childrenPath = path + "/node";
client.createPersistent(path);
client.createEphemeral(childrenPath, getNodeInfo());
}
public String getNodeInfo() {
try {
String info = "http://" + Inet4Address.getLocalHost().getHostAddress() + ":" + port;
LogCore.BASE.info("info={}", info);
return info;
} catch (UnknownHostException e) {
LogCore.RPC.error("getNodeInfo", e);
return null;
}
}
public String getTarget() {
return target;
}
public Integer getPort() {
return port;
}
}
7 序列化
zookeeper使用String。RPC的调用和返回使用JSON
public class ZStringSerializer implements ZkSerializer {
private final Charset charset = Charset.forName("UTF8");
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
return (data == null ? null : ((String) data).getBytes(charset));
}
@Override
public String deserialize(byte[] bytes) throws ZkMarshallingError {
return (bytes == null ? null : new String(bytes, charset));
}
}
public class RPCSerializer implements Formater, Parser {
public static final RPCSerializer INSTANCE = new RPCSerializer();
@Override
public String requestFormat(Request request) {
return JSONUtil.serialize(request);
}
@Override
public String responseFormat(Result response) {
return JSONUtil.serialize(response);
}
@Override
public Request requestParse(String param) throws RpcException {
return JSONUtil.deserialize(param);
}
@Override
public Result rsponseParse(String result) {
return JSONUtil.deserialize(result);
}
}
8 IOC容器 Spring boot
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
使用起来非常简单
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
最后
将一个RPC框架拆分为上述几个组件来实现后。还剩下最重要的问题。如果将他们组成一个框架?
下面的两节,将重点介绍如何将上面的分散的模和IOC容器一起组成易用的RPC框架。