(1)动态代理就不用说啦,本质就是通过字节码增强技术来代理对象的行为。
(2)通过在客户端对每个请求都保存一个计数为1的CountDownLatch对象实现同步模型。
2.细节:
客户端的每个请求1个SessionID,每个SessionID持有一个WindowData对象,每个WindowData持有一个AutoResetEvent对象,每个AutoResetEvent持有一个记数为1的CountDownLatch对象。
3.其他思考:
在动态代理的时候 ,作者设计时抽象啦一层:MethodCaller层,这层的各个环节大概有:
1.第一步当然是init: 初始化ServiceProxy,本质就是初始化SocketChannel的一些参数。
2.SocketChannel.open()。。。并保存进对象池,注册CountDownLatch到会话中, 启动接收数据工作线程。
3. 发送数据。。
思考:参考mybatis的mapper接口的同步模型。
4.对比s-hsf同步模型:
相对来说,HSF实现比较简单,hsf是每个请求持有一个InvokeFuture(封装一个信号量),通过invokeFuture.getResult(timeout, TimeUnit.MILLISECONDS);等待超时实现,既可以异步也可以同步,灵活。
5. GAEA-58客户端同步模型核心代码:
Server类:
关键代码: socket.registerRec(p.getSessionID());//注册一个windowData对象到当前会话中。
public Protocol request(Protocol p) throws Exception, Throwable { if (state == ServerState.Dead) { logger.warn("This proxy server is unavailable.state:" + state + "+host:" + address); throw new Exception("This proxy server is unavailable.state:" + state + "+host:" + address); } increaseCU(); CSocket socket = null; try { try { socket = this.scoketpool.getSocket(); byte[] data = p.toBytes(socket.isRights(),socket.getDESKey()); socket.registerRec(p.getSessionID()); socket.send(data); } catch (Throwable ex) { logger.error("Server get socket Exception", ex); throw ex; }finally { if(socket != null){ socket.dispose(); } } byte[] buffer = socket.receive(p.getSessionID(), currUserCount); Protocol result = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey()); if (this.state == ServerState.Testing) { relive(); } return result; } catch (IOException ex) { logger.error("io exception", ex); if (socket == null || !socket.connecting()) { if (!test()) { markAsDead(); } } throw ex; } catch (Throwable ex) { logger.error("request other Exception", ex); throw ex; } finally { if (state == state.Testing) { markAsDead(); } if (socket != null) { socket.unregisterRec(p.getSessionID()); } decreaseCU(); } }
CSocket类接收到数据时释放计数器:
关键代码:
WindowData wd = WaitWindows.get(pSessionId);
if (wd != null) {
wd.setData(pak);
wd.getEvent().set();
}
protected void frameHandle() throws Exception { if (handling) { return; } synchronized (receiveLockHelper) { handling = true; try { if (waitDestroy && isIdle()) { logger.info("Shrinking the connection:" + this.toString()); dispose(true); return; } receiveBuffer.clear(); try { int re = channel.read(receiveBuffer); if (re < 0) { this.closeAndDisponse(); logger.error("server is close.this socket will close."); return; } } catch (IOException ex) { _connecting = false; throw ex; } catch (NotYetConnectedException e) { _connecting = false; throw e; } receiveBuffer.flip(); if (receiveBuffer.remaining() == 0) { return; } while (receiveBuffer.remaining() > 0) { byte b = receiveBuffer.get(); receiveData.write(b); if (b == ProtocolConst.P_END_TAG[index]) { index++; if (index == ProtocolConst.P_END_TAG.length) { byte[] pak = receiveData.toByteArray(0, receiveData.size() - ProtocolConst.P_END_TAG.length); int pSessionId = ByteConverter.bytesToIntLittleEndian(pak, SFPStruct.Version + SFPStruct.TotalLen); WindowData wd = WaitWindows.get(pSessionId); if (wd != null) { wd.setData(pak); wd.getEvent().set(); } index = 0; receiveData.reset(); continue; } } else if (index != 0) { if(b == ProtocolConst.P_END_TAG[0]) { index = 1; } else { index = 0; } } } } catch(Exception ex){ index = 0; throw ex; }finally { handling = false; } } }