这个模式可以说在java的源码里面应用很广泛了,各种addListener,future这些,最终都是观察者模式的体现。
1、观察者和被观察者
2、观察者订阅消息,被观察者发布消息
3、订阅则能收到,取消订阅收不到
首先我们看看观察者模式的一个生活中的例子。
/**
* 被观察者
*/
public interface Observerable {
void registerObserver(Observer o);
void removeObserver(Observer o);
void notifyObserver();
}
/**
* 观察者
*/
public interface Observer {
void notify(String message);
}
public static class Girl implements Observerable {
private String message;
List<Observer> observerList;
public Girl() {
observerList = new ArrayList<>();
}
@Override
public void registerObserver(Observer o) {
observerList.add(o);
}
@Override
public void removeObserver(Observer o) {
observerList.remove(o);
}
@Override
public void notifyObserver() {
for (Observer observer : observerList) {
observer.notify(message);
}
}
public void hasBoyFriend() {
message = "女神有男朋友了";
notifyObserver();
}
public void getMarried() {
message = "女神结婚了,你们都死心吧!";
notifyObserver();
}
public void getSingled() {
message = "女神单身了,你们有机会了!";
notifyObserver();
}
}
/**
* 男孩
*/
public static class Boy implements Observer {
public void notify(String message) {
System.out.println("男孩收到消息:" + message);
}
}
/**
* 男人
*/
public static class Man implements Observer {
public void notify(String message) {
System.out.println("男人收到消息:" + message);
}
}
/**
* 老男人
*/
public static class OldMan implements Observer {
public void notify(String message) {
System.out.println("老男人收到消息:" + message);
}
}
public static void main(String[] args) {
Girl girl = new Girl();
Boy boy = new Boy();
Man man = new Man();
OldMan oldMan = new OldMan();
// 通知男孩、男人、老男人,女神有男朋友了
girl.registerObserver(boy);
girl.registerObserver(man);
girl.registerObserver(oldMan);
girl.hasBoyFriend();
System.out.println("====================");
// 通知男孩,男人,女神结婚了
girl.removeObserver(oldMan);
girl.getMarried();
System.out.println("====================");
girl.registerObserver(oldMan);
girl.getSingled();
}
流程就是,观察者注册到被观察者(也就是被观察者的列表里面添加观察者),然后发生了事情,被观察者就调用列表里面观察者的方法。
对于netty,我们首先查看一下我们使用的例子
public void write(NioSocketChannel channel, Object object) {
ChannelFuture channelFuture = channel.writeAndFlush(object);
channelFuture.addListener(future -> {
if (future.isSuccess()) {
} else {
}
});
channelFuture.addListener(future -> {
if (future.isSuccess()) {
} else {
}
});
}
首先创建一个channelFuture,这个就是被观察者,addListener把观察者都加进去。
首先查看被观察者创建的过程,writeAndFlush一直跟进去,可以看到这样一段代码:
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
newPromise()这个函数其实就是创建一个被观察者。newPromise()跟进去就是这样一行代码:
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
ChannelPromise就是被观察者。
创建被观察者之后,就要把观察者添加进去。跟进去ChannelPromise的实现者DefaultChannelPromise的addListener方法,一直跟进去,会看到这样一串有趣的代码:
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
添加listener就添加listener,为什么还要分三段?netty在这里做了一个优化。首先我们要知道的是,listeners定义为一个Object,意味着它可以装换成任意对象。
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
*/
private Object listeners;
那么,如果第一个listener进来,直接定义listeners为listener;第二个进来,就把它定义为DefaultFutureListeners,把之前的listener和现在进来的包装进去,其实这个时候listeners就变成了一个列表,点进去DefaultFutureListeners就能看到如下代码:
@SuppressWarnings("unchecked")
DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
第三次就把新加进来的添加到DefaultFutureListeners里面就行了。有兴趣自己看源码。
把观察者添加进去之后,什么时候通知呢。netty的NioSocketChannel的WriteAndFlush执行完,成功或者失败,都会通知到listener。继续看writeAndFlush方法,跟进去会看到:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
首先看invokeWrite0,这个代码中可能会有失败,会告知listener;最终如果成功在invokeFlush0里面体现,也会通知给listener;
先看invokeWrite0,跟进去看到 write方法,用HeadContext的实现,再跟进去,调用DefaultPromiser的tryFailure方法就能看到notifyListener()了。notifyListeners();
notifyListeners();
这个才是是通知linstener的方法。
再一直跟进去会看到这段代码:
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
l.operationComplete(future)这个方法就会通知到观察者。
同理,invokeFlush0进去,使用HeadContext的flush方法,一直进去,使用NioSocketChannel的doWrite方法,一直进去,看到这样的代码:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
这里有个in.remove()的代码,当读完了之后,就可以删除Buffer,然后通知listener成功了。
in.remove()进去,有个safeSuccess()方法,一直trySuccess(),使用DefaultPromise的trySuccess(),最后又看到这个熟悉的方法了:
@Override
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}
最后都同 invokeWrite0一样。
netty就通过Promise模式或者说ChannelFuture模式实现了观察者模式。netty就通过实现观察者模式达到异步通知,每次写成功写失败,都会回调给listener,也就是回调到future.isSuccess()等方法。其实就是变种的观察者模式。