在Reference初始化时提到,在RegistryProtocol中创建customers节点,会订阅对应接口的providers,那么这时会从zookeeper中获取到providers下的所有子节点(提供者url)。这时在订阅时以及节点发生改变时,都会进行通知:
- 保存本地缓存文件(提供者url)
- 刷新本地缓存的invoker列表
- 根据url创建invoker,并保存在本地缓存中。protocol.refer(serviceType, url)
在前面提到protocol是使用ExtensionLoader.getExtensionLoader,dubbo协议产生的最终获取到是DubboProtocol。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) {
optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url,
getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
DubboInvoker
先看基类,主要是针对一些属性的设置
public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
if(destroyed) {
throw new RpcException("");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
//设置是否异步属性
if (getUrl().getMethodParameter(invocation.getMethodName(), "async",
false)){
invocation.setAttachment("async", "true");
}
//幂等操作:异步操作默认添加invocation id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation);
}
}
public class DubboInvoker<T> extends AbstractInvoker<T> {
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment("path", getUrl().getPath());
inv.setAttachment("version", version);
//取出客户端连接
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
//是否是异步
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, "timeout",1000);
//立即返回
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, "send", false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
//异步
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
//超时时间
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
//获取客户端状态是否是连接中
public boolean isAvailable() {
if (!super.isAvailable())
return false;
for (ExchangeClient client : clients){
if (client.isConnected() &&
!client.hasAttribute("channel.readonly")){
return true ;
}
}
return false;
}
}
创建客户端连接
private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter("connections", 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
//创建共享连接
if (service_share_connect){
clients[i] = getSharedClient(url);
} else {
//每个服务都创建连接
clients[i] = initClient(url);
}
}
return clients;
}
/**
*获取共享连接
*/
private ExchangeClient getSharedClient(URL url){
String key = url.getAddress();
//如果缓存中已经有创建过这个连接,并且连接没有关闭,那么就直接返回
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if ( client != null ){
if ( !client.isClosed()){
//记录连接数
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
//初始化连接
ExchangeClient exchagneclient = initClient(url);
//主要是记录这个连接的连接数
client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
/**
* 创建新连接.
*/
private ExchangeClient initClient(URL url) {
String str = url.getParameter("client",
url.getParameter("server", "netty"));
String version = url.getParameter("dubbo");
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter("codec", Version.isCompatibleVersion() &&
compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
//默认开启heartbeat
url = url.addParameterIfAbsent("heartbeat","60000");
// BIO存在严重性能问题,暂时不允许使用
if (str != null && str.length() > 0 && ! ExtensionLoader.
getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException(""));
}
ExchangeClient client ;
//设置连接应该是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
client = new LazyConnectExchangeClient(url ,requestHandler);
} else {
//创建连接
client = Exchangers.connect(url ,requestHandler);
}
return client;
}
创建连接
client = Exchangers.connect(url ,requestHandler);
dubbo://10.118.22.29:20710/com.test.ITestService?anyhost=true&application=testservice&check=false&codec=dubbo
public static ExchangeServer connect(URL url, ExchangeHandler handler)
throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter("exchanger", "header");
return ExtensionLoader.getExtensionLoader(Exchanger.class).
getExtension(type);
}
HeaderExchanger
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) {
return new HeaderExchangeClient(Transporters.connect(url, new
DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchangeClient
主要就是提供了心跳机制.
public class HeaderExchangeClient implements ExchangeClient {
public HeaderExchangeClient(Client client){
//....
startHeatbeatTimer();
}
//启动定时心跳
private void startHeatbeatTimer() {
stopHeartbeatTimer();
if ( heartbeat > 0 ) {
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(
HeaderExchangeClient.this );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS );
}
}
private void stopHeartbeatTimer() {
if (heatbeatTimer != null && ! heatbeatTimer.isCancelled()) {
try {
heatbeatTimer.cancel(true);
scheduled.purge();
} catch ( Throwable e ) {
if (logger.isWarnEnabled()) {
logger.warn(e.getMessage(), e);
}
}
}
heatbeatTimer =null;
}
}
具体见dubbo源码分析9– netty创建服务端、客户端以及handler