首先回过头来看一下FailoverTransport的工厂类FailoverTransportFactory生成FailoverTransport后,
FailoverTransport transport = new FailoverTransport();
添加了两份装饰,
transport = new MutexTransport(transport); transport = new ResponseCorrelator(transport);
PooledTaskRunner—带线程池的任务执行器
我们来看一下FailoverTransport的构造函数,
public FailoverTransport() throws InterruptedIOException { brokerSslContext = SslContext.getCurrentSslContext(); stateTracker.setTrackTransactions(true); // Setup a task that is used to reconnect the a connection async. reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { public boolean iterate() { boolean result = false; boolean buildBackup = true; boolean doReconnect = !disposed; synchronized (backupMutex) { if ((connectedTransport.get() == null || doRebalance) && !disposed) { result = doReconnect(); buildBackup = false; } } if (buildBackup) { buildBackups(); } else { // build backups on the next iteration buildBackup = true; try { reconnectTask.wakeup(); } catch (InterruptedException e) { LOG.debug("Reconnect task has been interrupted.", e); } } return result; } }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); }
前两行对SSL的配置不去管它。
后面那行reconnectTask = …… 才是FailoverTransport失败重连的关键,从属性命名中也可以看出来它是为重连而生的,也是整个Transport的关键实现。
先来分析一下reconnectTask,这里DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner创建了PooledTaskRunner对象并传入一个实现Task接口的匿名内部类对象。
public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) {
this.executor = executor; this.maxIterationsPerRun = maxIterationsPerRun; this.task = task; final Map context = MDCHelper.getCopyOfContextMap(); runable = new Runnable() { public void run() { MDCHelper.setContextMap(context); runningThread = Thread.currentThread(); try { runTask(); } finally { runningThread = null; } } }; }
PooledTaskRunner构造函数中创建了一个Runnable对象,具体在runTask();中完成工作。
final void runTask() {
synchronized (runable) { queued = false; if (shutdown) { iterating = false; runable.notifyAll(); return; } iterating = true; } // Don't synchronize while we are iterating so that // multiple wakeup() calls can be executed concurrently. boolean done = false; try { for (int i = 0; i < maxIterationsPerRun; i++) { if (!task.iterate()) { done = true; break; } } } finally { synchronized( runable ) { iterating = false; runable.notifyAll(); if (shutdown) { queued = false; runable.notifyAll(); return; } // If we could not iterate all the items // then we need to re-queue. if (!done) { queued = true; } if (queued) { executor.execute(runable); } } } }
灰色打底的部分是真正执行任务的部分,这里也可以解释为什么Task接口中iterate方法为什么需要返回一个布尔值了,它用来指示是否可以还需要迭代。
完成工作任务的地方找到了,那么开启工作任务的地方在哪里呢?TaskRunner接口中只有wakeup最像是我们要找的。
public void wakeup() throws InterruptedException { synchronized (runable) { if (queued || shutdown) { return; } queued = true; // The runTask() method will do this for me once we are done // iterating. if (!iterating) { executor.execute(runable); } } }
其中queued和interating这两个变量是为了防止多次重复调用的。