1. 场景
Web版的文件浏览器,通过它用户可以在浏览器里查看服务器上的目录和文件。这个项目依赖运维部门提供的文件浏览服务,而这个文件浏览服务只支持消息队列(MQ)方式接入。消息队列在互联网大厂中用的非常多,主要用作流量削峰和系统解耦。在这种接入方式中,发送消息和消费结果这两个操作之间是异步的。
class Message {
String id;
String content;
}
// 该方法可以发送消息
void send(Message msg) {
// 省略相关代码
}
// MQ消息返回后会调用该方法
// 该方法的执行线程不同于
// 发送消息的线程
void onMessage(Message msg) {
// 省略相关代码
}
// 处理浏览器发来的请求
Respond handleWebReq(){
//创建一消息
Message msg1 = new Message("1","{...}");
//发送消息
send(msg1);
//如何等待MQ返回的消息呢?
String result = ...;
}
问题:给MQ发送消息的线程是处理Web请求的线程T1,但消费MQ结果的线程并不是线程T1,那线程T1如何等待MQ的返回结果?
2. Guarded Suspension模式
Guarded Suspension,保护性地暂停。
class GuardedObject<T> {
// 受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout = 1;
// 获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while (!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 返回非空的受保护对象
return obj;
}
// 事件通知方法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
3. 扩展Guarded Suspension模式
class GuardedObject<T> {
// 受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout = 2;
// 保存所有GuardedObject
final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();
// 静态方法创建GuardedObject
static <K> GuardedObject create(K key) {
GuardedObject go = new GuardedObject();
gos.put(key, go);
return go;
}
static <K, T> void fireEvent(K key, T obj) {
GuardedObject go = gos.remove(key);
if (go != null) {
go.onChanged(obj);
}
}
// 获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while (!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 返回非空的受保护对象
return obj;
}
// 事件通知方法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
利用扩展后的GuardedObject来解决小灰同学的问题。
// 处理浏览器发来的请求
Respond handleWebReq() {
int id = 序号生成器.get();
// 创建一消息
Message msg1 = new Message(id, "{...}");
// 创建GuardedObject实例
GuardedObject<Message> go = GuardedObject.create(id);
// 发送消息
send(msg1);
// 等待MQ消息
Message r = go.get(t -> t != null);
}
void onMessage(Message msg) {
// 唤醒等待的线程
GuardedObject.fireEvent(msg.id, msg);
}
// TODO 不是很懂