一 概述
上个章节已经描述了dubbo发布一个服务,但具体是如何发布服务只是粗略的描述了下,这里将深入描述服务发布时怎么样开启socket监听,即启动netty服务。
二 开启netty服务
上一节发布服务的重点入口代码如下- //通过proxyFactory对象生成接口实现类代理对象Invoker
- Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
- //将Invoker对象封装到protocol协议对象中,同时开启socket服务监听端口,这里socket通信是使用netty框架来处理的
- Exporter<?> exporter = protocol.export(invoker);
- public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
- return new AbstractProxyInvoker<T>(proxy, type, url) {
- @Override
- protected Object doInvoke(T proxy, String methodName,
- Class<?>[] parameterTypes,
- Object[] arguments) throws Throwable {
- Method method = proxy.getClass().getMethod(methodName, parameterTypes);
- return method.invoke(proxy, arguments);
- }
- };
- }
再细看Exporter<?> exporter = protocol.export(invoker);代码 。这里protocol默认当做是DubboProtocol类。
- /**
- *
- * @param invoker 服务的执行体
- * @param <T>
- * @return
- * @throws RpcException
- */
- public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
- URL url = invoker.getUrl();
- // export service.
- String key = serviceKey(url);//根据URL生成一个唯一key值
- DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
- exporterMap.put(key, exporter);//通过map保存当前发布的invoker对象,key为拼装的唯一字符串
- //export an stub service for dispaching event
- Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
- Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
- if (isStubSupportEvent && !isCallbackservice){
- String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
- if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
- if (logger.isWarnEnabled()){
- logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
- "], has set stubproxy support event ,but no stub methods founded."));
- }
- } else {
- stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
- }
- }
- openServer(url);//开启netty服务
- // modified by lishen
- optimizeSerialization(url);
- return exporter;
- }
- /**
- * 开启netty服务器监听,并保存netty服务对象到map中
- * @param url
- */
- private void openServer(URL url) {
- // find server.
- String key = url.getAddress();
- //client 也可以暴露一个只有server可以调用的服务。
- boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
- if (isServer) {
- ExchangeServer server = serverMap.get(key);
- if (server == null) {
- serverMap.put(key, createServer(url));
- } else {
- //server支持reset,配合override功能使用
- server.reset(url);
- }
- }
- }
- private ExchangeServer createServer(URL url) {
- //默认开启server关闭时发送readonly事件
- url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
- //默认开启heartbeat,最近心跳机制,根据代码应该是设置心跳时间吧
- url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
- String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
- if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
- throw new RpcException("Unsupported server type: " + str + ", url: " + url);
- url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
- ExchangeServer server;
- try {
- server = Exchangers.bind(url, requestHandler);
- } catch (RemotingException e) {
- throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
- }
- str = url.getParameter(Constants.CLIENT_KEY);
- if (str != null && str.length() > 0) {
- Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
- if (!supportedTypes.contains(str)) {
- throw new RpcException("Unsupported client type: " + str);
- }
- }
- return server;
- }
- public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
- if (url == null) {
- throw new IllegalArgumentException("url == null");
- }
- if (handler == null) {
- throw new IllegalArgumentException("handler == null");
- }
- url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
- return getExchanger(url).bind(url, handler);
- }
- public static Exchanger getExchanger(URL url) {
- String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
- return getExchanger(type);
- }
- public static Exchanger getExchanger(String type) {
- return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
- }
- public class HeaderExchanger implements Exchanger {
- public static final String NAME = "header";
- public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
- return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
- }
- public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
- return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));//装饰器模式处理
- }
- }
- public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
- if (url == null) {
- throw new IllegalArgumentException("url == null");
- }
- if (handlers == null || handlers.length == 0) {
- throw new IllegalArgumentException("handlers == null");
- }
- ChannelHandler handler;
- if (handlers.length == 1) {
- handler = handlers[0];
- } else {
- handler = new ChannelHandlerDispatcher(handlers);
- }
- return getTransporter().bind(url, handler);
- }
- public static Transporter getTransporter() {
- return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
- }
- public class NettyTransporter implements Transporter {
- public static final String NAME = "netty";
- public Server bind(URL url, ChannelHandler listener) throws RemotingException {
- return new NettyServer(url, listener);
- }
- public Client connect(URL url, ChannelHandler listener) throws RemotingException {
- return new NettyClient(url, listener);
- }
- }
- public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
- super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
- }
- @Override
- protected void doOpen() throws Throwable {
- NettyHelper.setNettyLoggerFactory();
- //启动一个netty服务对象,netty的固定写法不理解可以查查netty资料
- ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
- ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
- ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
- bootstrap = new ServerBootstrap(channelFactory);
- //这里有点难理解,新建了一个NettyHandler对象,传入了一个this。这个this不就是NettyServer对象么?有点奇怪哈,注意当前类的构造函数调用了super方法,
- // 将dubbo协议的专用handler对象赋值给了当前类的父类AbstractPeer的handler属性,不太理解为何要这么写。但最终将NettyServer这个对象设置为netty的处理器了
- final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
- channels = nettyHandler.getChannels();
- // https://issues.jboss.org/browse/NETTY-365
- // https://issues.jboss.org/browse/NETTY-379
- // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() {
- NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
- ChannelPipeline pipeline = Channels.pipeline();
- /*int idleTimeout = getIdleTimeout();
- if (idleTimeout > 10000) {
- pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
- }*/
- pipeline.addLast("decoder", adapter.getDecoder());
- pipeline.addLast("encoder", adapter.getEncoder());
- pipeline.addLast("handler", nettyHandler);//设置netty接受客户端请求后的处理器
- return pipeline;
- }
- });
- // bind
- channel = bootstrap.bind(getBindAddress());
- }
- public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
- super(url, handler);
- localAddress = getUrl().toInetSocketAddress();
- String host = url.getParameter(Constants.ANYHOST_KEY, false)
- || NetUtils.isInvalidLocalHost(getUrl().getHost())
- ? NetUtils.ANYHOST : getUrl().getHost();
- bindAddress = new InetSocketAddress(host, getUrl().getPort());
- this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
- this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
- try {
- doOpen();
- if (logger.isInfoEnabled()) {
- logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
- }
- } catch (Throwable t) {
- throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
- + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
- }
- executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
- .getDefaultExtension().get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
- }
三 响应请求
前面段落已经讲解过设置netty响应客户端的处理器是在NettyServer类的doOpen方法中设置的,如代码:
- final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
- try {
- handler.received(channel, e.getMessage());
- } finally {
- NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
- }
- }
因为在启动netty服务的时候,就将requestHandler对象经过层层包装传递给了NettyServer,再通过NettyServer类的构造函数将它保存到了NettyServer类的终极父类AbstractPeer的handler属性上,AbstractPeer类又实现了ChannelHandler接口,重写了received方法。
所以当netty框架接收到请求时执行messageReceived方法里面的handler.received(channel, e.getMessage()); ,其实执行的是AbstractPeer类的received方法,received然后里面又执行了handler.received(ch, msg); 这里的handler就是DubboProtocol类中创建的requestHandler对象。
感觉特别绕对不对,刚开始也是整理好了几遍才搞清楚里面的流程。
接着继续看duobbo协议类DubboProtocol中的requestHandler是怎么处理请求的。我们重点看它重写的方法received入口,调用了reply方法,首先将参数message参数转换成了Invocation对象,Invocation封装了客户端的请求信息。
1.根据客户端的请求信息找打响应客户端的Invoker对象
2.检测客户端请求的方法,服务端是否存在
3.执行invoker对象的invoke方法,执行完成返回数据给客户端
- //netty响应客户端请求处理对象
- private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
- public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
- if (message instanceof Invocation) {
- Invocation inv = (Invocation) message;
- Invoker<?> invoker = getInvoker(channel, inv);//根据客户端请求信息组装key值从exporterMap取出真正的接口实现类对象
- //如果是callback 需要处理高版本调用低版本的问题
- if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
- String methodsStr = invoker.getUrl().getParameters().get("methods");
- boolean hasMethod = false;
- if (methodsStr == null || methodsStr.indexOf(",") == -1){
- hasMethod = inv.getMethodName().equals(methodsStr);
- } else {
- //循环验证客户端请求的方法,服务端是否存在对应的实现
- String[] methods = methodsStr.split(",");
- for (String method : methods){
- if (inv.getMethodName().equals(method)){
- hasMethod = true;
- break;
- }
- }
- }
- //不存在请求方法实现,返回空或抛出异常
- if (!hasMethod){
- logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
- return null;
- }
- }
- RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
- return invoker.invoke(inv);//执行invoer响应客户端请求
- }
- throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
- }
- /**
- * netty接受客户端请求参数入口
- * @param channel
- * @param message
- * @throws RemotingException
- */
- @Override
- public void received(Channel channel, Object message) throws RemotingException {
- if (message instanceof Invocation) {
- reply((ExchangeChannel) channel, message);//执行服务端处理并返回响应消息
- } else {
- super.received(channel, message);
- }
- }
- @Override
- public void connected(Channel channel) throws RemotingException {
- invoke(channel, Constants.ON_CONNECT_KEY);
- }
- @Override
- public void disconnected(Channel channel) throws RemotingException {
- if(logger.isInfoEnabled()){
- logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
- }
- invoke(channel, Constants.ON_DISCONNECT_KEY);
- }
- private void invoke(Channel channel, String methodKey) {
- Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
- if (invocation != null) {
- try {
- received(channel, invocation);
- } catch (Throwable t) {
- logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
- }
- }
- }
- private Invocation createInvocation(Channel channel, URL url, String methodKey) {
- String method = url.getParameter(methodKey);
- if (method == null || method.length() == 0) {
- return null;
- }
- RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
- invocation.setAttachment(Constants.PATH_KEY, url.getPath());
- invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
- invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
- invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
- if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
- invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
- }
- return invocation;
- }
- };
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
前面说了proxyFactory我们默认就当做是JdkProxyFactory类,getInvoker方法的实现如下,它重写了doInvoke方法,通过反射机制执行了接口实现类的, getInvoker方法反馈的是AbstractProxyInvoker对象。
- public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
- return new AbstractProxyInvoker<T>(proxy, type, url) {
- //重写了AbstractProxyInvoker类的抽象doInvoke方法
- @Override
- protected Object doInvoke(T proxy, String methodName,
- Class<?>[] parameterTypes,
- Object[] arguments) throws Throwable {
- Method method = proxy.getClass().getMethod(methodName, parameterTypes);
- return method.invoke(proxy, arguments);
- }
- };
- }
- public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
- private final T proxy;
- private final Class<T> type;
- private final URL url;
- public AbstractProxyInvoker(T proxy, Class<T> type, URL url){
- if (proxy == null) {
- throw new IllegalArgumentException("proxy == null");
- }
- if (type == null) {
- throw new IllegalArgumentException("interface == null");
- }
- if (! type.isInstance(proxy)) {
- throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
- }
- this.proxy = proxy;
- this.type = type;
- this.url = url;
- }
- public Class<T> getInterface() {
- return type;
- }
- public URL getUrl() {
- return url;
- }
- public boolean isAvailable() {
- return true;
- }
- public void destroy() {
- }
- public Result invoke(Invocation invocation) throws RpcException {
- try {
- //执行doInvoke方法,当前类的doInvoke方法是抽象的,是不是突然明白了,执行的不就是外面重写的doInvoke方法么
- return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
- } catch (InvocationTargetException e) {
- return new RpcResult(e.getTargetException());
- } catch (Throwable e) {
- throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
- protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
- @Override
- public String toString() {
- return getInterface() + " -> " + getUrl()==null?" ":getUrl().toString();
- }
- }
四 总结
如果跟着文章分析代码,您已经知道dubbo框架是你怎么发布一个服务,又是怎么启动底层的netty框架了,然后怎么根据客户端请求传过来的参数定位到对应的服务实现类并执行对应的方法。看似很简单,但是涉及的知识面还是比较广的。比如装饰模式,工厂模式,抽象模版方法模式,netty框架的了解等。下个章节来看看客户端是怎么做来做请求的
转载自:dubbo服务端之netty