想不到什么好的标题,就直接以日期命名好了。
最近主要在做一些web服务相关的功能,和以前做的分布式开发有些不同,主要是为了简单,前后端的驱动方式只有http方式的一种,而web服务器和control服务器的通信本来是双向的,但如果后端的服务器群组主动向web服务发送数据,无法向前端主动发送。只能改成前端主动驱动查询。团队其他成员由于经验匮乏,对于分布式开发所知甚少,所以如果强行向他们推行后端开发的惯常做法,那他们 就需要学习更多的东西了。这些都将制约整体的架构功能。在这方面,复杂性和可靠性成正比。
暂时抛开这个扫兴的话题,比较令人振奋的是中间用到的两个有趣的技术。zeroMQ和node.js。
zeroMQ的接口相当易用,对使用者暴露的接口模仿了socket函数族,所以学习成本很低。这段时间专门抽出时间去理解这个消息队列。其实现代码非常的精湛。比较值得注意的就是他们对于线程池的处理。初学者对于线程安全用的最多的就是锁,无论是mutex还是读写锁,原子锁,还是条件变量,都会不同条件的造成系统的延迟,zmq特意规避了这个问题。方法其实也非常的容易理解,利用自己设计的数据结构以及reactor模式来应对网络通信以及线程间的通信。对于zmq内部而言,尽可能的避免使用全局变量,对于环境信息的ctx_t类被设计成线程安全的,也就是说可以重入的。但是对于内部的线程处理遵循了自己的设计规则。从接受到用户的调用之后,便从线程池里调用了用户线程专门去用于发送用户的数据,内部处理被设计成了io线程,接受到数据之后的处理属于线程间的通信。内部封装了commonl类专门用来包装命令。zmq的整个对象模型一起来完成对于用户命令的处理。最终真正发送数据,被封装成了engine类,该类专门用于调用最底层的系统函数。
在理解zmq架构的时候,需要有全局的观念而不要具体在某一个类或函数内部,因为当还没有建立起整个系统的处理流程之前,独立的去解析某个类会非常困惑。目前依然在解析之中,对于zmq相关的文档很多;,可以进一步参考。值得注意的是在使用的时候,需要时刻记住zmq是个队列,不然很轻易的就会将其当作普通的socket进行使用,尤其是在网络发生错误的时候,按照处理tcp/ip协议栈的socket函数去理解zmq将出现一些的问题。
用gdb追踪一下send调用:
(gdb) bt
#0 push (this=0x604928) at src/yqueue.hpp:106
#1 zmq::ypipe_t<zmq::msg_t, 256>::write (this=0x604920, value_=..., incomplete_=<optimized out>) at src/ypipe.hpp:87
#2 0x00007ffff7b9db16 in zmq::pipe_t::write (this=0x604990, msg_=msg_@entry=0x7fffffffe100) at src/pipe.cpp:213
#3 0x00007ffff7b92751 in zmq::lb_t::sendpipe (this=0x604220, msg_=0x7fffffffe100, pipe_=0x0) at src/lb.cpp:103
#4 0x00007ffff7ba9328 in zmq::socket_base_t::send (this=this@entry=0x603cf0, msg_=msg_@entry=0x7fffffffe100, flags_=flags_@entry=0) at src/socket_base.cpp:843
#5 0x00007ffff7bc02ec in s_sendmsg (s_=s_@entry=0x603cf0, msg_=msg_@entry=0x7fffffffe100, flags_=flags_@entry=0) at src/zmq.cpp:346
#6 0x00007ffff7bc0395 in zmq_send (s_=0x603cf0, buf_=0x7fffffffe180, len_=1024, flags_=0) at src/zmq.cpp:371
#7 0x0000000000400a8c in main (argc=1, argv=0x7fffffffe688) at zmq_client.c:46
相关函数:
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never
// flushed down the stream.
inline void write (const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();
// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
}
queue.back是个左值:
// Returns reference to the back element of the queue.
// If the queue is empty, behaviour is undefined.
inline T &back ()
{
return back_chunk->values [back_pos];
}
可以看出,是在向双向队列里的末尾添加元素:
// Adds an element to the back end of the queue.
inline void push ()
{
back_chunk = end_chunk;
back_pos = end_pos;
if (++end_pos != N)
return;
chunk_t *sc = spare_chunk.xchg (NULL);
if (sc) {
end_chunk->next = sc;
sc->prev = end_chunk;
} else {
end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
alloc_assert (end_chunk->next);
end_chunk->next->prev = end_chunk;
}
end_chunk = end_chunk->next;
end_pos = 0;
}
可以看出,当调用send函数时,是将数据push 到队列里,而非通过网络发送出去。