开篇
这篇文章主要的目的是想分析下dubbo优雅停机的过程,整个文章参考网上很多现成的文章,本着尊重原创的精神会在文章中备注参考信息。
针对阅读dubbo源码,我的感觉是当你一开始钻到细节当中就很容易一叶障目了,所以建议一开始着重梳理整个框架的逻辑而不要陷入细节当中。
优雅停机的原理
说明:
- dubbo的优雅停机是建立在JVM的addShutdownHook回调的机制上的,通过注册回调调用停机的逻辑ProtocolConfig.destroyAll()
- ProtocolConfig.destroyAll()执行逻辑是:1、关闭注册中心;2、关闭发布协议服务。
- 关闭注册中心:AbstractRegistryFactory.destroyAll()。
- 关闭发布的协议服务:protocol.destroy()。
public abstract class AbstractConfig implements Serializable {
static {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (logger.isInfoEnabled()) {
logger.info("Run shutdown hook now.");
}
ProtocolConfig.destroyAll();
}
}, "DubboShutdownHook"));
}
}
public class ProtocolConfig extends AbstractConfig {
public static void destroyAll() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
// 关闭注册中心
AbstractRegistryFactory.destroyAll();
// 关闭所有已发布的协议如dubbo服务
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
for (String protocolName : loader.getLoadedExtensions()) {
try {
Protocol protocol = loader.getLoadedExtension(protocolName);
if (protocol != null) {
protocol.destroy();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
说明:
- 图片来自 Dubbo优雅停机
- B服务作为Provider需要进行优雅停机。
- B服务首先断开和注册中心的连接。
- B服务关闭提供服务的Server端的监听,保证不接受请求。
- B服务关闭引用的C和D服务,保证不再调用下游服务。
优雅停机过程-注册中心关闭
说明:
- 注册中心关闭通过LOCK来保证不重入,此例中以ZookeeperRegistry为例。
- ZookeeperRegistry的关闭顺序:1、关闭注册中心;2、断开和zookeeper的连接。
- 关闭注册中心按照调用链路走到FailbackRegistry,关闭注册中心并停掉重试操作。
- 关闭注册中心按照调用链路走到AbstractRegistry,按照先移除作为provider的URL,再移除作为consumer的订阅的consumer信息。
- 具体的信息看下面的源码,已经按照继承关系组织好了。
public abstract class AbstractRegistryFactory implements RegistryFactory {
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
}
public class ZookeeperRegistry extends FailbackRegistry {
public void destroy() {
// 调用父类FailbackRegistry关闭注册中心
super.destroy();
try {
// 关闭zkClient客户端保证临时provider节点下线
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
public void destroy() {
if (!canDestroy()){
return;
}
super.destroy();
try {
// 首先要明白FailbackRegistry的核心就在于失败重试,所以这一层的关闭只要关闭retryFuture就可以
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
public abstract class AbstractRegistry implements Registry {
public void destroy() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 作为provider,取消所有的服务注册
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
// 从已注册的列表中移除该URL
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 作为consumer,取消所有的订阅关系
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
}
}
优雅停机过程-协议关闭
说明:
- 协议关闭按照以下顺序进行:1、关闭provider端的监听;2、关闭作为consumer的reference的服务;3、调用父类针对exporter对象进行清理。
- 关闭provider端的监听:关闭provider端的监听(server.close)。
- 关闭consumer的服务:关闭dubbo服务引用的服务(client.close)。
- 调用父类清理exporter:清理exporter服务(super.destroy)。
public class DubboProtocol extends AbstractProtocol {
public void destroy() {
// 关停所有的Server,作为provider将不再接收新的请求
for (String key : new ArrayList<String>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo server: " + server.getLocalAddress());
}
server.close(getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
// 关停所有的Client,作为consumer将不再发送新的请求
for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close(getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
// 幽灵客户端的处理逻辑,不清楚幽灵客户端是啥?
for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
ExchangeClient client = ghostClientMap.remove(key);
if (client != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close(getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
stubServiceMethodsMap.clear();
// 调用父类继续进行清理,针对exporter对象进行清理
super.destroy();
}
}
provider监听的close过程
说明:
- provider监听的close过程:关闭心跳检测操作,关闭底层netty服务的监听channel管道。
- 关闭心跳检测操作:doClose()。
- 关闭底层netty监听:server.close(timeout)。
public class HeaderExchangeServer implements ExchangeServer {
public void close(final int timeout) {
startClose();
if (timeout > 0) {
final long max = (long) timeout;
final long start = System.currentTimeMillis();
if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
sendChannelReadOnlyEvent();
}
// 如果还有进行中的任务并且没有到达等待时间的上限,则继续等待
while (HeaderExchangeServer.this.isRunning()
&& System.currentTimeMillis() - start < max) {
try {
// 休息10毫秒再检查
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
// 关闭心跳,停止应答
doClose();
// 关闭通信通道
server.close(timeout);
}
private void doClose() {
// 修改标记位,该标记为设置为true后,provider不再对上游请求做应答
if (!closed.compareAndSet(false, true)) {
return;
}
// 取消心跳的Futrue
stopHeartbeatTimer();
try {
// 关闭心跳的线程池
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public void close() {
if (logger.isInfoEnabled()) {
logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
ExecutorUtil.shutdownNow(executor, 100);
try {
// 设置关闭的标记位
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
// 执行真正的关闭动作
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
protected abstract void doClose() throws Throwable;
}
public class NettyServer extends AbstractServer implements Server {
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (bootstrap != null) {
// release external resource.
bootstrap.releaseExternalResources();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
client的清理过程
说明:
- client的关闭过程本质上是关闭引用服务的channel对象。
- client的关闭顺序按照:设置关闭标记位,关闭心跳检测,关闭通道。
public class HeaderExchangeClient implements ExchangeClient {
public void close(int timeout) {
startClose();
doClose();
channel.close(timeout);
}
public void startClose() {
channel.startClose();
}
private void doClose() {
stopHeartbeatTimer();
}
}
exporter清理过程
说明:
- exporter的清理主要包括invoker和exporter两个对象的清理。
- invoker和exporter两个对象的具体作用暂时还未理清楚,待定。
- exporter的清理最终还是走到了invoker的清理过程当中。
public abstract class AbstractProtocol implements Protocol {
public void destroy() {
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
// 移除invokers
invokers.remove(invoker);
try {
if (logger.isInfoEnabled()) {
logger.info("Destroy reference: " + invoker.getUrl());
}
// 销毁invokers
invoker.destroy();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
for (String key : new ArrayList<String>(exporterMap.keySet())) {
// 移除exporter
Exporter<?> exporter = exporterMap.remove(key);
if (exporter != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Unexport service: " + exporter.getInvoker().getUrl());
}
// 销毁exporter
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
}
public class DubboInvoker<T> extends AbstractInvoker<T> {
public void destroy() {
if (super.isDestroyed()) {
return;
} else {
destroyLock.lock();
try {
if (super.isDestroyed()) {
return;
}
super.destroy();
if (invokers != null) {
invokers.remove(this);
}
for (ExchangeClient client : clients) {
try {
client.close(getShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
} finally {
destroyLock.unlock();
}
}
}
}
public abstract class AbstractExporter<T> implements Exporter<T> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Invoker<T> invoker;
private volatile boolean unexported = false;
public AbstractExporter(Invoker<T> invoker) {
if (invoker == null)
throw new IllegalStateException("service invoker == null");
if (invoker.getInterface() == null)
throw new IllegalStateException("service type == null");
if (invoker.getUrl() == null)
throw new IllegalStateException("service url == null");
this.invoker = invoker;
}
public Invoker<T> getInvoker() {
return invoker;
}
public void unexport() {
if (unexported) {
return;
}
unexported = true;
getInvoker().destroy();
}
public String toString() {
return getInvoker().toString();
}
}