Endpoint
Channel,Client,Server都会继承Endpoint:
com.alibaba.dubbo.remoting.Channel
com.alibaba.dubbo.remoting.Client
com.alibaba.dubbo.remoting.Server
public interface Endpoint {
//get url.
URL getUrl();
//get channel handler.
ChannelHandler getChannelHandler();
//get local address.
InetSocketAddress getLocalAddress();
//send message.
void send(Object message) throws RemotingException;
//send message; sent 是否已发送完成
void send(Object message, boolean sent) throws RemotingException;
// close the channel.
void close();
//Graceful close the channel.
void close(int timeout);
void startClose();
// is closed.
boolean isClosed();
}
Channel
相关类
com.alibaba.dubbo.remoting.Client
com.alibaba.dubbo.remoting.Server#getChannels()
com.alibaba.dubbo.remoting.Server#getChannel(InetSocketAddress)
public interface Server extends Endpoint, Resetable {
//is bound.
boolean isBound();
// get channels.
Collection<Channel> getChannels();
// get channel.
Channel getChannel(InetSocketAddress remoteAddress);
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
ChannelHandler
相关类
com.alibaba.dubbo.remoting.Transporter#bind(com.alibaba.dubbo.common.URL, ChannelHandler)
com.alibaba.dubbo.remoting.Transporter#connect(com.alibaba.dubbo.common.URL, ChannelHandler)
public interface ChannelHandler {
/**
* on channel connected.
*
* @param channel channel.
*/
void connected(Channel channel) throws RemotingException;
/**
* on channel disconnected.
*
* @param channel channel.
*/
void disconnected(Channel channel) throws RemotingException;
/**
* on message sent.
*
* @param channel channel.
* @param message message.
*/
void sent(Channel channel, Object message) throws RemotingException;
/**
* on message received.
*
* @param channel channel.
* @param message message.
*/
void received(Channel channel, Object message) throws RemotingException;
/**
* on exception caught.
*
* @param channel channel.
* @param exception exception.
*/
void caught(Channel channel, Throwable exception) throws RemotingException;
}
Server
相关类
com.alibaba.dubbo.remoting.Transporter#bind(com.alibaba.dubbo.common.URL, ChannelHandler)
public interface Server extends Endpoint, Resetable {
//is bound.
boolean isBound();
// get channels.
Collection<Channel> getChannels();
// get channel.
Channel getChannel(InetSocketAddress remoteAddress);
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
Client
com.alibaba.dubbo.remoting.Transporter#connect(com.alibaba.dubbo.common.URL, ChannelHandler)
public interface Client extends Endpoint, Channel, Resetable {
/**
* reconnect.
*/
void reconnect() throws RemotingException;
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
Dispatcher
默认是:AllDispatcher
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
// dispatch the message to threadpool.
@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
// 后两个参数为兼容旧配置
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
Codec2
编码
@SPI
public interface Codec2 {
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
enum DecodeResult {
NEED_MORE_INPUT, SKIP_SOME_INPUT
}
}
结构图
AbstractCodec
/**
* 通过传输数据的大小,默认是8m
* @param channel
* @param size
* @throws IOException
*/
protected static void checkPayload(Channel channel, long size) throws IOException {
int payload = Constants.DEFAULT_PAYLOAD;
if (channel != null && channel.getUrl() != null) {
//从url中获取8m
payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
}
if (payload > 0 && size > payload) {
ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
logger.error(e);
throw e;
}
}
/**
* 获取Serialization,默认是hessian2(spi)
*/
protected Serialization getSerialization(Channel channel) {
return CodecSupport.getSerialization(channel.getUrl());
}
/*
*检查是否是客户端调用
*/
protected boolean isClientSide(Channel channel) {
String side = (String) channel.getAttribute(Constants.SIDE_KEY);
if ("client".equals(side)) {
return true;
} else if ("server".equals(side)) {
return false;
} else {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
boolean client = url.getPort() == address.getPort()
&& NetUtils.filterLocalHost(url.getIp()).equals(
NetUtils.filterLocalHost(address.getAddress()
.getHostAddress()));
channel.setAttribute(Constants.SIDE_KEY, client ? "client"
: "server");
return client;
}
}
protected boolean isServerSide(Channel channel) {
return !isClientSide(channel);
}
}
Decodeable
解码
public interface Decodeable {
public void decode() throws Exception;
}
Transporter
默认是netty
相关类
com.alibaba.dubbo.remoting.Transporters
@SPI("netty")
public interface Transporter {
/**
* Bind a server.
*
* @param url server url
* @param handler
* @return server
* @throws RemotingException
* @see com.alibaba.dubbo.remoting.Transporters#bind(URL, Receiver, ChannelHandler)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
/**
* Connect to a server.
*
* @param url server url
* @param handler
* @return client
* @throws RemotingException
* @see com.alibaba.dubbo.remoting.Transporters#connect(URL, Receiver, ChannelListener)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}