以下重点讲一下Transport transport = createTransport();创建Transport的过程。
protected Transport createTransport() throws JMSException {
try {
return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
}
public static Transport connect(URI location, Executor ex) throws Exception {
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location, ex);
}
这里使用了一个典型的设计模式:工厂方法。
TransportFactory tf = findTransportFactory(location);通过FactoryFinder的内部类StandaloneObjectFactory,找到"META-INF/services/org/apache/activemq/transport/"目录下的具体location.getScheme()对应的配置文件,并实例化对应的工厂类。
然后tf.doConnect(location, ex);通过实例化的具体工厂类获取transport。
不同的底层通讯需要不同的Transport,那就需要不同TransportFactory。在这里我们只需要写一个工厂类继承自TransportFactory,然后在"META-INF/services/org/apache/activemq/transport/"目录下放个配置文件,指向该工厂类就OK了。这种方式还是很值得我们学习的,使用配置的方式新增,而不是在TansportFactory代码中去添加,更好的遵守了OCP原则。
回头再过来看一下类图结构,就会发现另外三种设计模式:Decorator(装饰)、Compositor(合成)和Observer(观察者)模式。
这里的Decorator模式和java.io包中InputStream和OutputStream的Decorator如出一辙,连命名都差不多。
InputStream -- Transport
FilterInputStream -- FilterTransport
BufferedInputStream – TcpTransport
只是这里在Decorator模式的基础上增加了Composite和Observer模式的运用,使系统的扩展性更好。
回到例子中,继续挖掘Transport的创建过程。
根据scheme:failover,找到了FailoverTransportFactory工厂类,调用doConnect方法:
public Transport doConnect(URI location) throws IOException {
try {
Transport transport = createTransport(URISupport.parseComposite(location));
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
} catch (URISyntaxException e) {
throw new IOException("Invalid location: " + location);
}
}
public Transport createTransport(CompositeData compositData) throws IOException {
Map options = compositData.getParameters();
FailoverTransport transport = createTransport(options);
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
transport.add(false,compositData.getComponents());
return transport;
}
public FailoverTransport createTransport(Map parameters) throws IOException {
FailoverTransport transport = new FailoverTransport();
IntrospectionSupport.setProperties(transport, parameters);
return transport;
}
为了配合合成模式的使用,url也使用了合成的方式,如例子中的failover://tcp://localhost:61616就是failover和tcp的合成。可以看到这里的CompositTransport中合成的不是Transport而是url,与经典的合成模式还是有稍许差别。但这并影响它是合成模式的事实,因为合成的url的最终目的还是用于生成对应的Transport 。那么,为什么接口CompositeTransport中add和remove都是数组形式uris?
public interface CompositeTransport extends Transport {
void add(boolean rebalance,URI[] uris);
void remove(boolean rebalance,URI[] uris);
}
另外可以发现TransportFilter继承了TransportListener并且合成了一个TransportListener。为什么装饰对象既是Transport又带上了TransportListener的功能,而且装饰对象中又包含了一个TransportListener?
public class TransportFilter implements TransportListener, Transport {
protected final Transport next;
protected TransportListener transportListener;
public TransportFilter(Transport next) {
this.next = next;
}
public TransportListener getTransportListener() {
return transportListener;
}
public void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null) {
next.setTransportListener(null);
} else {
next.setTransportListener(this);
}
}
……
}
带着这些问题,下面具体分析FailoverTransport。