0 准备
- 版本号:0.4.2 be版本 openflowplugin
1、连接入口
//01-连接发起源于初始化 OpenFlowPluginProviderImpl
org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java
initialize() --> startSwitchConnections() --> switchConnectionPrv.startup()
---------------------------------------------------
//02-初始化服务器
org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java
public ListenableFuture<Boolean> startup(); ---》
private ServerFacade createAndConfigureServer();
//03-用于消息解析和通道处理的的TcpChannelInitializer
1.1 核心channelHandler注册代码
此注册位置,包含了序列化器注册,链接成功配置、消息解析处理的多重注册:
-
ConnectionFacade和ConnectionConductor类创建和互相注册引用;
-
注册tls处理器
-
注册编码器;
-
注册解码器;
-
注册消息处理逻辑器;
@Override
protected void initChannel(final SocketChannel ch) {
if (ch.remoteAddress() != null) {
final InetAddress switchAddress = ch.remoteAddress().getAddress();
final int port = ch.localAddress().getPort();
final int remotePort = ch.remoteAddress().getPort();
LOGGER.debug("Incoming connection from (remote address): {}:{} --> :{}",
switchAddress.toString(), remotePort, port);
if (!getSwitchConnectionHandler().accept(switchAddress)) {
ch.disconnect();
LOGGER.debug("Incoming connection rejected");
return;
}
}
LOGGER.debug("Incoming connection accepted - building pipeline");
allChannels.add(ch);
ConnectionFacade connectionFacade = null;
//---------------------------------------------
//1-ConnectionFacade创建
connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, useBarrier());
try {
LOGGER.debug("calling plugin: {}", getSwitchConnectionHandler());
//2-ConnectionConductor初始化
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
connectionFacade.checkListeners();
ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(), new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS));
boolean tlsPresent = false;
// If this channel is configured to support SSL it will only support SSL
if (getTlsConfiguration() != null) {
tlsPresent = true;
final SslContextFactory sslFactory = new SslContextFactory(getTlsConfiguration());
final SSLEngine engine = sslFactory.getServerContext().createSSLEngine();
engine.setNeedClientAuth(true);
engine.setUseClientMode(false);
List<String> suitesList = getTlsConfiguration().getCipherSuites();
if (suitesList != null && !suitesList.isEmpty()) {
LOGGER.debug("Requested Cipher Suites are: {}", suitesList);
String[] suites = suitesList.toArray(new String[suitesList.size()]);
engine.setEnabledCipherSuites(suites);
LOGGER.debug("Cipher suites enabled in SSLEngine are: {}", engine.getEnabledCipherSuites().toString());
}
final SslHandler ssl = new SslHandler(engine);
final Future<Channel> handshakeFuture = ssl.handshakeFuture();
final ConnectionFacade finalConnectionFacade = connectionFacade;
handshakeFuture.addListener(new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(final Future<? super Channel> future) throws Exception {
finalConnectionFacade.fireConnectionReadyNotification();
}
});
ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), ssl);
}
ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(),
new OFFrameDecoder(connectionFacade, tlsPresent));
ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(), new OFVersionDetector());
final OFDecoder ofDecoder = new OFDecoder();
ofDecoder.setDeserializationFactory(getDeserializationFactory());
//注册编码器
ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(), ofDecoder);
final OFEncoder ofEncoder = new OFEncoder();
ofEncoder.setSerializationFactory(getSerializationFactory());
ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofEncoder);
//注册消息处理器
ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
if (!tlsPresent) {
connectionFacade.fireConnectionReadyNotification();
}
} catch (final Exception e) {
LOGGER.warn("Failed to initialize channel", e);
ch.close();
}
}
2 分析ConnectionFacade和ConnectionConductor
根据上面所示,当有设备连接时候,通过netty框架调用initChannel会根据通道生成一个ConnectionFacade和ConnectionConductor,be2版本的代码中没有借助专门的map存储这两个根据设备的类,目测是由netty的channelHandler注册时候引用存储;
-
ConnectionFacade为ConnectionAdapter接口的继承接口,实现类为ConnectionAdapterImpl,含有连接通道的基本信息,包括Channel、InetSocketAddress等,同时又新增了如下监听器成员变量:
- ConnectionReadyListener
- OpenflowProtocolListener
- SystemNotificationsListener
-
ConnectionConductor则为删除提到的监听的实现类,此类实现了如下接口:
- OpenflowProtocolListener--接收消息处理类,包含如下工作:
- multipart消息回复;
- 错误消息处理;
- 端口状态消息处理;
- 心跳消息回复;
- Experimenter消息处理;
- 流表移除消息处理;
- 握手消息处理;
- PacketIn消息处理;
- SystemNotificationsListener--处理设备事件,包括
- 设备断连处理;
- 设备闲置处理;
- ConnectionConductor--连接引导器,提供基本连接方法;
- 初始化
- 获取版本
- 获取连接引导器状态;
- 断连设备;
- 设置回话上下文;
- 获取ConnectionAdapter
- 设置处理队列器;
- 设置错误处理器;
- ConnectionReadyListener--连接成功监听器
- 开启心跳进程;
- HandshakeListener--握手消息监听器
- 处理握手成功;--注册回话
- 处理握手失败;
- 设置握手上下文;
- NotificationEnqueuer--订阅消息排队;
如上阐述,ConnectionConductor类为实际的连接回话处理、消息回复的关键类,后续3章分析以下其消息接收的相关处理流程;
- OpenflowProtocolListener--接收消息处理类,包含如下工作:
3 消息接受处理分析
3.1 注册位置
//01-注册处理handler
ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
3.2 调用流程
org/opendaylight/openflowjava/protocol/impl/core/DelegatingInboundHandler.java
public void channelRead(final ChannelHandlerContext ctx, final Object msg);
-----------------------------------
public void consume(final DataObject message);
-----------------------------------
protected abstract void consumeDeviceMessage(DataObject message);
----------------------------------
实现类:
org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java
根据消息类型,进入各个消息处理和事件处理的监听器;
4 openflow连接连接建立流程
整个连接建立流程,可以分析如下所示:
如上流程图所示,虽然整个连接会话通道的建立过程为异步,但是仍然符合openflow通道创建的一般规则,按照下面顺序完成:
1. 接收hello消息;
2. 发送反向hello消息;
3. 协商版本;
4. 发送心跳消息;
5. 建立通道session;
6. 产生node;
几个关键处理如下所示:
握手信息处理
org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java
监听session生成node
org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java
5 连接通道接口
SessionManagerOFImpl-->ConjunctSessionManager
获取类方式:
OFSessionUtil.getSessionManager();